Skip to content

Commit

Permalink
Update DiskIo telemetry device to persist the counters (#731)
Browse files Browse the repository at this point in the history
Ensure that DiskIo telemetry does not rely on Rally being a parent
process of Elasticsearch and persists the disk counters at the beginning
of a benchmark and can read it again afterwards.

Relates to #697
  • Loading branch information
ebadyano committed Jul 25, 2019
1 parent aed6776 commit fe1ff28
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 16 deletions.
5 changes: 3 additions & 2 deletions esrally/mechanic/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,13 @@ def start(self, node_configurations):
node_name = node_configuration.node_name
host_name = node_configuration.ip
binary_path = node_configuration.binary_path
node_telemetry_dir = os.path.join(node_configuration.node_root_path, "telemetry")
self.binary_paths[node_name] = binary_path
self._start_process(binary_path)
# only support a subset of telemetry for Docker hosts
# (specifically, we do not allow users to enable any devices)
node_telemetry = [
telemetry.DiskIo(self.metrics_store, len(node_configurations)),
telemetry.DiskIo(self.metrics_store, len(node_configurations), node_telemetry_dir, node_name),
telemetry.NodeEnvironmentInfo(self.metrics_store)
]
t = telemetry.Telemetry(devices=node_telemetry)
Expand Down Expand Up @@ -306,7 +307,7 @@ def _start_node(self, node_configuration, node_count_on_host):
enabled_devices = self.cfg.opts("mechanic", "telemetry.devices")
telemetry_params = self.cfg.opts("mechanic", "telemetry.params")
node_telemetry = [
telemetry.DiskIo(self.metrics_store, node_count_on_host),
telemetry.DiskIo(self.metrics_store, node_count_on_host, node_telemetry_dir, node_name),
telemetry.NodeEnvironmentInfo(self.metrics_store),
telemetry.IndexSize(data_paths, self.metrics_store),
telemetry.MergeParts(self.metrics_store, node_configuration.log_path),
Expand Down
44 changes: 31 additions & 13 deletions esrally/mechanic/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import subprocess
import tabulate
import threading
import json

from esrally import metrics, time, exceptions
from esrally.utils import io, sysstats, console, versions, opts
Expand Down Expand Up @@ -774,10 +775,12 @@ class DiskIo(InternalTelemetryDevice):
"""
Gathers disk I/O stats.
"""
def __init__(self, metrics_store, node_count_on_host):
def __init__(self, metrics_store, node_count_on_host, log_root, node_name):
super().__init__()
self.metrics_store = metrics_store
self.node_count_on_host = node_count_on_host
self.log_root = log_root
self.node_name = node_name
self.node = None
self.process = None
self.disk_start = None
Expand All @@ -790,40 +793,55 @@ def attach_to_node(self, node):
def on_benchmark_start(self):
if self.process is not None:
self.process_start = sysstats.process_io_counters(self.process)
read_bytes = 0
write_bytes = 0
io.ensure_dir(self.log_root)
tmp_io_file = os.path.join(self.log_root, "{}.io".format(self.node_name))
if self.process_start:
read_bytes = self.process_start.read_bytes
write_bytes = self.process_start.write_bytes
self.logger.info("Using more accurate process-based I/O counters.")
else:
try:
self.disk_start = sysstats.disk_io_counters()
read_bytes = self.disk_start.read_bytes
write_bytes = self.disk_start.write_bytes
self.logger.warning("Process I/O counters are not supported on this platform. Falling back to less accurate disk "
"I/O counters.")
except RuntimeError:
self.logger.exception("Could not determine I/O stats at benchmark start.")
with open(tmp_io_file, "wt", encoding="utf-8") as f:
json.dump({"pid": self.node.pid, "read_bytes": read_bytes, "write_bytes": write_bytes}, f)

def on_benchmark_stop(self):
if self.process is not None:
# Be aware the semantics of write counts etc. are different for disk and process statistics.
# Thus we're conservative and only report I/O bytes now.
# noinspection PyBroadException
try:
io.ensure_dir(self.log_root)
tmp_io_file = os.path.join(self.log_root, "{}.io".format(self.node_name))
with open(tmp_io_file, "rt", encoding="utf-8") as f:
io_stats = json.load(f)
os.remove(tmp_io_file)
self.process = sysstats.setup_process_stats(io_stats["pid"])
process_end = sysstats.process_io_counters(self.process)
disk_end = sysstats.disk_io_counters()
# we have process-based disk counters, no need to worry how many nodes are on this host
if self.process_start:
process_end = sysstats.process_io_counters(self.process)
read_bytes = process_end.read_bytes - self.process_start.read_bytes
write_bytes = process_end.write_bytes - self.process_start.write_bytes
elif self.disk_start:
if process_end:
read_bytes = process_end.read_bytes - io_stats["read_bytes"]
write_bytes = process_end.write_bytes - io_stats["write_bytes"]
elif disk_end:
if self.node_count_on_host > 1:
self.logger.info("There are [%d] nodes on this host and Rally fell back to disk I/O counters. Attributing [1/%d] "
"of total I/O to [%s].", self.node_count_on_host, self.node_count_on_host, self.node.node_name)
"of total I/O to [%s].", self.node_count_on_host, self.node_count_on_host, self.node_name)

disk_end = sysstats.disk_io_counters()
read_bytes = (disk_end.read_bytes - self.disk_start.read_bytes) // self.node_count_on_host
write_bytes = (disk_end.write_bytes - self.disk_start.write_bytes) // self.node_count_on_host
read_bytes = (disk_end.read_bytes - io_stats['read_bytes']) // self.node_count_on_host
write_bytes = (disk_end.write_bytes - io_stats['write_bytes']) // self.node_count_on_host
else:
raise RuntimeError("Neither process nor disk I/O counters are available")

self.metrics_store.put_count_node_level(self.node.node_name, "disk_io_write_bytes", write_bytes, "byte")
self.metrics_store.put_count_node_level(self.node.node_name, "disk_io_read_bytes", read_bytes, "byte")
self.metrics_store.put_count_node_level(self.node_name, "disk_io_write_bytes", write_bytes, "byte")
self.metrics_store.put_count_node_level(self.node_name, "disk_io_read_bytes", read_bytes, "byte")
# Catching RuntimeException is not sufficient as psutil might raise AccessDenied et.al. which is derived from Exception
except BaseException:
self.logger.exception("Could not determine I/O stats at benchmark end.")
Expand Down
68 changes: 67 additions & 1 deletion tests/mechanic/telemetry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import random
import collections
import tempfile
import unittest.mock as mock
import elasticsearch

Expand All @@ -25,6 +26,7 @@
from esrally.mechanic import telemetry, team, cluster
from esrally.metrics import MetaInfoScope
from esrally.utils import console
from collections import namedtuple


def create_config():
Expand Down Expand Up @@ -2140,6 +2142,71 @@ def test_resilient_if_error_response(self):
self.assertIsNone(n.ip)


class DiskIoTests(TestCase):

@mock.patch("esrally.utils.sysstats.process_io_counters")
@mock.patch("esrally.metrics.EsMetricsStore.put_count_node_level")
def test_diskio_process_io_counters(self, metrics_store_node_count, process_io_counters):
Diskio = namedtuple("Diskio", "read_bytes write_bytes")
process_start = Diskio(10, 10)
process_stop = Diskio(11, 11)
process_io_counters.side_effect = [process_start, process_stop]

tmp_dir = tempfile.mkdtemp()
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)

device = telemetry.DiskIo(metrics_store, node_count_on_host=1, log_root=tmp_dir, node_name="rally0")
t = telemetry.Telemetry(enabled_devices=[], devices=[device])
node = cluster.Node(pid=None, host_name="localhost", node_name="rally0", telemetry=t)
t.attach_to_node(node)
t.on_benchmark_start()
device2 = telemetry.DiskIo(metrics_store, node_count_on_host=1, log_root=tmp_dir, node_name="rally0")
t2 = telemetry.Telemetry(enabled_devices=[], devices=[device2])
t2.on_benchmark_stop()
t2.detach_from_node(node, running=True)
t2.detach_from_node(node, running=False)

metrics_store_node_count.assert_has_calls([
mock.call("rally0", "disk_io_write_bytes", 1, "byte"),
mock.call("rally0", "disk_io_read_bytes", 1, "byte")

])

@mock.patch("esrally.utils.sysstats.disk_io_counters")
@mock.patch("esrally.utils.sysstats.process_io_counters")
@mock.patch("esrally.metrics.EsMetricsStore.put_count_node_level")
def test_diskio_disk_io_counters(self, metrics_store_node_count, process_io_counters, disk_io_counters):
Diskio = namedtuple("Diskio", "read_bytes write_bytes")
process_start = Diskio(10, 10)
process_stop = Diskio(13, 13)
disk_io_counters.side_effect = [process_start, process_stop]
process_io_counters.side_effect = [None, None]

tmp_dir = tempfile.mkdtemp()
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)

device = telemetry.DiskIo(metrics_store, node_count_on_host=2, log_root=tmp_dir, node_name="rally0")
t = telemetry.Telemetry(enabled_devices=[], devices=[device])
node = cluster.Node(pid=None, host_name="localhost", node_name="rally0", telemetry=t)
t.attach_to_node(node)
t.on_benchmark_start()
device2 = telemetry.DiskIo(metrics_store, node_count_on_host=2, log_root=tmp_dir, node_name="rally0")
t2 = telemetry.Telemetry(enabled_devices=[], devices=[device2])
t2.on_benchmark_stop()
t2.detach_from_node(node, running=True)
t2.detach_from_node(node, running=False)

# expected result is 1 byte because there are two nodes on the machine. Result is calculated
# with total_bytes / node_count
metrics_store_node_count.assert_has_calls([
mock.call("rally0", "disk_io_write_bytes", 1, "byte"),
mock.call("rally0", "disk_io_read_bytes", 1, "byte")

])


class JvmStatsSummaryTests(TestCase):
@mock.patch("esrally.metrics.EsMetricsStore.put_doc")
@mock.patch("esrally.metrics.EsMetricsStore.put_value_cluster_level")
Expand Down Expand Up @@ -2787,7 +2854,6 @@ def test_result_is_stored(self, es, metrics_store_put_doc):
}, level=metrics.MetaInfoScope.cluster)
])


class IndexSizeTests(TestCase):
@mock.patch("esrally.utils.io.get_size")
@mock.patch("esrally.metrics.EsMetricsStore.put_count_node_level")
Expand Down

0 comments on commit fe1ff28

Please sign in to comment.