From 1de6ff84b88943d9d0c2eb86f151017672e9996c Mon Sep 17 00:00:00 2001 From: Evgenia Badyanova Date: Tue, 18 May 2021 14:37:26 -0400 Subject: [PATCH] Telemetry device to record shard allocation (#1258) Closes https://github.com/elastic/rally/issues/1242 --- docs/telemetry.rst | 23 ++++++ esrally/driver/driver.py | 1 + esrally/telemetry.py | 107 ++++++++++++++++++++++++- tests/telemetry_test.py | 167 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 297 insertions(+), 1 deletion(-) diff --git a/docs/telemetry.rst b/docs/telemetry.rst index 266ec5bbb..71d2cacdd 100644 --- a/docs/telemetry.rst +++ b/docs/telemetry.rst @@ -28,6 +28,7 @@ You probably want to gain additional insights from a race. Therefore, we have ad segment-stats Segment Stats Determines segment stats at the end of the benchmark. transform-stats Transform Stats Regularly samples transform stats searchable-snapshots-stats Searchable Snapshots Stats Regularly samples searchable snapshots stats + shard-stats Shard Stats Regularly samples nodes stats at shard level Keep in mind that each telemetry device may incur a runtime overhead which can skew results. @@ -165,3 +166,25 @@ Supported telemetry parameters: * ``searchable-snapshots-stats-indices`` (default: None): A string with the index/index pattern, or list of indices/index patterns that searchable snapshots stats should additionally be collected from. If unset, only cluster level stats will be collected. * ``searchable-snapshots-stats-sample-interval`` (default 1): A positive number greater than zero denoting the sampling interval in seconds. + +shard-stats +-------------- + +The shard-stats telemetry device regularly calls the `cluster nodes-stats API with level=shard parameter `_ and records one metrics document per shard. + +Example of a recorded document:: + + { + "name": "shard-stats", + "shard-id": "0", + "index": "geonames", + "primary": true, + "docs": 1000, + "store": 212027, + "segments-count": 8, + "node": "rally0" + } + +Supported telemetry parameters: + +* ``shard-stats-sample-interval`` (default 60): A positive number greater than zero denoting the sampling interval in seconds. diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 43f27d2cb..b8fb250e6 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -570,6 +570,7 @@ def prepare_telemetry(self, es, enable): telemetry.SegmentStats(log_root, es_default), telemetry.CcrStats(telemetry_params, es, self.metrics_store), telemetry.RecoveryStats(telemetry_params, es, self.metrics_store), + telemetry.ShardStats(telemetry_params, es, self.metrics_store), telemetry.TransformStats(telemetry_params, es, self.metrics_store), telemetry.SearchableSnapshotsStats(telemetry_params, es, self.metrics_store) ] diff --git a/esrally/telemetry.py b/esrally/telemetry.py index 1ed95f02e..3eca0089a 100644 --- a/esrally/telemetry.py +++ b/esrally/telemetry.py @@ -34,7 +34,7 @@ def list_telemetry(): devices = [[device.command, device.human_name, device.help] for device in [JitCompiler, Gc, FlightRecorder, Heapdump, NodeStats, RecoveryStats, CcrStats, SegmentStats, TransformStats, - SearchableSnapshotsStats]] + SearchableSnapshotsStats, ShardStats]] console.println(tabulate.tabulate(devices, ["Command", "Name", "Description"])) console.println("\nKeep in mind that each telemetry device may incur a runtime overhead which can skew results.") @@ -568,6 +568,111 @@ def record(self): self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=shard_metadata) +class ShardStats(TelemetryDevice): + """ + Collects and pushes shard stats for the specified cluster to the metric store. + """ + internal = False + command = "shard-stats" + human_name = "Shard Stats" + help = "Regularly samples nodes stats at shard level" + + def __init__(self, telemetry_params, clients, metrics_store): + """ + :param telemetry_params: The configuration object for telemetry_params. + May optionally specify: + ``shard-stats-sample-interval``: positive integer controlling the sampling interval. Default: 60 seconds. + :param clients: A dict of clients to all clusters. + :param metrics_store: The configured metrics store we write to. + """ + super().__init__() + + self.telemetry_params = telemetry_params + self.clients = clients + self.sample_interval = telemetry_params.get("shard-stats-sample-interval", 60) + if self.sample_interval <= 0: + raise exceptions.SystemSetupError( + f"The telemetry parameter 'shard-stats-sample-interval' must be greater than zero but was {self.sample_interval}.") + + self.metrics_store = metrics_store + self.samplers = [] + + def on_benchmark_start(self): + for cluster_name in self.specified_cluster_names: + recorder = ShardStatsRecorder(cluster_name, self.clients[cluster_name], self.metrics_store, self.sample_interval) + sampler = SamplerThread(recorder) + self.samplers.append(sampler) + sampler.setDaemon(True) + # we don't require starting recorders precisely at the same time + sampler.start() + + def on_benchmark_stop(self): + if self.samplers: + for sampler in self.samplers: + sampler.finish() + + +class ShardStatsRecorder: + """ + Collects and pushes shard stats for the specified cluster to the metric store. + """ + + def __init__(self, cluster_name, client, metrics_store, sample_interval): + """ + :param cluster_name: The cluster_name that the client connects to, as specified in target.hosts. + :param client: The Elasticsearch client for this cluster. + :param metrics_store: The configured metrics store we write to. + :param sample_interval: integer controlling the interval, in seconds, between collecting samples. + """ + + self.cluster_name = cluster_name + self.client = client + self.metrics_store = metrics_store + self.sample_interval = sample_interval + self.logger = logging.getLogger(__name__) + + def __str__(self): + return "shard stats" + + def record(self): + """ + Collect node-stats?level=shards and push to metrics store. + """ + # pylint: disable=import-outside-toplevel + import elasticsearch + try: + sample = self.client.nodes.stats(metric="_all", level="shards") + except elasticsearch.TransportError: + msg = f"A transport error occurred while collecting shard stats on cluster [{self.cluster_name}]" + self.logger.exception(msg) + raise exceptions.RallyError(msg) + + shard_metadata = { + "cluster": self.cluster_name + } + + for node_stats in sample["nodes"].values(): + node_name = node_stats["name"] + collected_node_stats = collections.OrderedDict() + collected_node_stats["name"] = "shard-stats" + shard_stats = node_stats["indices"].get("shards") + + for index_name, stats in shard_stats.items(): + for curr_shard in stats: + for shard_id, curr_stats in curr_shard.items(): + doc = { + "name": "shard-stats", + "shard-id": shard_id, + "index": index_name, + "primary": curr_stats.get("routing", {}).get("primary"), + "docs": curr_stats.get("docs", {}).get("count", -1), + "store": curr_stats.get("store", {}).get("size_in_bytes", -1), + "segments-count": curr_stats.get("segments", {}).get("count", -1), + "node": node_name + } + self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=shard_metadata) + + class NodeStats(TelemetryDevice): """ Gathers different node stats. diff --git a/tests/telemetry_test.py b/tests/telemetry_test.py index 2b6b43d5a..f2ce41aff 100644 --- a/tests/telemetry_test.py +++ b/tests/telemetry_test.py @@ -882,6 +882,173 @@ def test_stores_multi_index_multi_shard_stats(self, metrics_store_put_doc): ], any_order=True) +class ShardStatsTests(TestCase): + + @mock.patch("esrally.metrics.EsMetricsStore.put_doc") + def test_stores_single_shard_stats(self, metrics_store_put_doc): + node_stats_response = { + "cluster_name": "elasticsearch", + "nodes": { + "Zbl_e8EyRXmiR47gbHgPfg": { + "timestamp": 1524379617017, + "name": "rally0", + "transport_address": "127.0.0.1:9300", + "host": "127.0.0.1", + "ip": "127.0.0.1:9300", + "roles": [ + "master", + "data", + "ingest" + ], + "indices": { + "docs": { + "count": 76892364, + "deleted": 324530 + }, + "store": { + "size_in_bytes": 983409834 + }, + "shards" : { + "geonames": [ + { + "0" : { + "routing" : { + "state" : "STARTED", + "primary" : True, + "node" : "gHQpKO-IT5uo8WVTPmAZXw", + "relocating_node" : None + }, + "docs" : { + "count" : 1000, + "deleted" : 0 + }, + "store" : { + "size_in_bytes" : 212027, + "reserved_in_bytes" : 0 + }, + "segments" : { + "count" : 8, + "memory_in_bytes" : 46872, + "terms_memory_in_bytes" : 13056, + "stored_fields_memory_in_bytes" : 3904, + "term_vectors_memory_in_bytes" : 0, + "norms_memory_in_bytes" : 0, + "points_memory_in_bytes" : 0, + "doc_values_memory_in_bytes" : 29912, + "index_writer_memory_in_bytes" : 0, + "version_map_memory_in_bytes" : 0, + "fixed_bit_set_memory_in_bytes" : 0, + "max_unsafe_auto_id_timestamp" : -1, + "file_sizes" : { } + } + } + } + ] + } + } + } + } + } + + client = Client(nodes=SubClient(stats=node_stats_response)) + cfg = create_config() + metrics_store = metrics.EsMetricsStore(cfg) + recorder = telemetry.ShardStatsRecorder(cluster_name="remote", + client=client, + metrics_store=metrics_store, + sample_interval=53) + recorder.record() + + shard_metadata = { + "cluster": "remote" + } + + metrics_store_put_doc.assert_has_calls([ + mock.call({ + "name": "shard-stats", + "shard-id": "0", + "index": "geonames", + "primary": True, + "docs": 1000, + "store": 212027, + "segments-count": 8, + "node": "rally0" + }, level=MetaInfoScope.cluster, meta_data=shard_metadata) + ], any_order=True) + + @mock.patch("esrally.metrics.EsMetricsStore.put_doc") + def test_missing_properties_shard_stats(self, metrics_store_put_doc): + node_stats_response = { + "cluster_name": "elasticsearch", + "nodes": { + "Zbl_e8EyRXmiR47gbHgPfg": { + "timestamp": 1524379617017, + "name": "rally0", + "transport_address": "127.0.0.1:9300", + "host": "127.0.0.1", + "ip": "127.0.0.1:9300", + "roles": [ + "master", + "data", + "ingest" + ], + "indices": { + "docs": { + "count": 76892364, + "deleted": 324530 + }, + "store": { + "size_in_bytes": 983409834 + }, + "shards" : { + "geonames": [ + { + "0" : { + "routing" : { + "state" : "STARTED", + "primary" : True, + "node" : "gHQpKO-IT5uo8WVTPmAZXw", + "relocating_node" : None + }, + "store" : { + "size_in_bytes" : 212027, + "reserved_in_bytes" : 0 + } + } + } + ] + } + } + } + } + } + + client = Client(nodes=SubClient(stats=node_stats_response)) + cfg = create_config() + metrics_store = metrics.EsMetricsStore(cfg) + recorder = telemetry.ShardStatsRecorder(cluster_name="remote", + client=client, + metrics_store=metrics_store, + sample_interval=53) + recorder.record() + + shard_metadata = { + "cluster": "remote" + } + + metrics_store_put_doc.assert_has_calls([ + mock.call({ + "name": "shard-stats", + "shard-id": "0", + "index": "geonames", + "primary": True, + "docs": -1, + "store": 212027, + "segments-count": -1, + "node": "rally0" + }, level=MetaInfoScope.cluster, meta_data=shard_metadata) + ], any_order=True) + class TestSearchableSnapshotsStats: response_fragment_total = [ {