Skip to content

Commit

Permalink
Telemetry device to record shard allocation (#1258)
Browse files Browse the repository at this point in the history
Closes #1242
  • Loading branch information
ebadyano committed May 18, 2021
1 parent 0e86042 commit 1de6ff8
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 1 deletion.
23 changes: 23 additions & 0 deletions docs/telemetry.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ You probably want to gain additional insights from a race. Therefore, we have ad
segment-stats Segment Stats Determines segment stats at the end of the benchmark.
transform-stats Transform Stats Regularly samples transform stats
searchable-snapshots-stats Searchable Snapshots Stats Regularly samples searchable snapshots stats
shard-stats Shard Stats Regularly samples nodes stats at shard level

Keep in mind that each telemetry device may incur a runtime overhead which can skew results.

Expand Down Expand Up @@ -165,3 +166,25 @@ Supported telemetry parameters:

* ``searchable-snapshots-stats-indices`` (default: None): A string with the index/index pattern, or list of indices/index patterns that searchable snapshots stats should additionally be collected from. If unset, only cluster level stats will be collected.
* ``searchable-snapshots-stats-sample-interval`` (default 1): A positive number greater than zero denoting the sampling interval in seconds.

shard-stats
--------------

The shard-stats telemetry device regularly calls the `cluster nodes-stats API with level=shard parameter <https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-nodes-stats.html>`_ and records one metrics document per shard.

Example of a recorded document::

{
"name": "shard-stats",
"shard-id": "0",
"index": "geonames",
"primary": true,
"docs": 1000,
"store": 212027,
"segments-count": 8,
"node": "rally0"
}

Supported telemetry parameters:

* ``shard-stats-sample-interval`` (default 60): A positive number greater than zero denoting the sampling interval in seconds.
1 change: 1 addition & 0 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ def prepare_telemetry(self, es, enable):
telemetry.SegmentStats(log_root, es_default),
telemetry.CcrStats(telemetry_params, es, self.metrics_store),
telemetry.RecoveryStats(telemetry_params, es, self.metrics_store),
telemetry.ShardStats(telemetry_params, es, self.metrics_store),
telemetry.TransformStats(telemetry_params, es, self.metrics_store),
telemetry.SearchableSnapshotsStats(telemetry_params, es, self.metrics_store)
]
Expand Down
107 changes: 106 additions & 1 deletion esrally/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def list_telemetry():
devices = [[device.command, device.human_name, device.help] for device in [JitCompiler, Gc, FlightRecorder,
Heapdump, NodeStats, RecoveryStats,
CcrStats, SegmentStats, TransformStats,
SearchableSnapshotsStats]]
SearchableSnapshotsStats, ShardStats]]
console.println(tabulate.tabulate(devices, ["Command", "Name", "Description"]))
console.println("\nKeep in mind that each telemetry device may incur a runtime overhead which can skew results.")

Expand Down Expand Up @@ -568,6 +568,111 @@ def record(self):
self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=shard_metadata)


class ShardStats(TelemetryDevice):
"""
Collects and pushes shard stats for the specified cluster to the metric store.
"""
internal = False
command = "shard-stats"
human_name = "Shard Stats"
help = "Regularly samples nodes stats at shard level"

def __init__(self, telemetry_params, clients, metrics_store):
"""
:param telemetry_params: The configuration object for telemetry_params.
May optionally specify:
``shard-stats-sample-interval``: positive integer controlling the sampling interval. Default: 60 seconds.
:param clients: A dict of clients to all clusters.
:param metrics_store: The configured metrics store we write to.
"""
super().__init__()

self.telemetry_params = telemetry_params
self.clients = clients
self.sample_interval = telemetry_params.get("shard-stats-sample-interval", 60)
if self.sample_interval <= 0:
raise exceptions.SystemSetupError(
f"The telemetry parameter 'shard-stats-sample-interval' must be greater than zero but was {self.sample_interval}.")

self.metrics_store = metrics_store
self.samplers = []

def on_benchmark_start(self):
for cluster_name in self.specified_cluster_names:
recorder = ShardStatsRecorder(cluster_name, self.clients[cluster_name], self.metrics_store, self.sample_interval)
sampler = SamplerThread(recorder)
self.samplers.append(sampler)
sampler.setDaemon(True)
# we don't require starting recorders precisely at the same time
sampler.start()

def on_benchmark_stop(self):
if self.samplers:
for sampler in self.samplers:
sampler.finish()


class ShardStatsRecorder:
"""
Collects and pushes shard stats for the specified cluster to the metric store.
"""

def __init__(self, cluster_name, client, metrics_store, sample_interval):
"""
:param cluster_name: The cluster_name that the client connects to, as specified in target.hosts.
:param client: The Elasticsearch client for this cluster.
:param metrics_store: The configured metrics store we write to.
:param sample_interval: integer controlling the interval, in seconds, between collecting samples.
"""

self.cluster_name = cluster_name
self.client = client
self.metrics_store = metrics_store
self.sample_interval = sample_interval
self.logger = logging.getLogger(__name__)

def __str__(self):
return "shard stats"

def record(self):
"""
Collect node-stats?level=shards and push to metrics store.
"""
# pylint: disable=import-outside-toplevel
import elasticsearch
try:
sample = self.client.nodes.stats(metric="_all", level="shards")
except elasticsearch.TransportError:
msg = f"A transport error occurred while collecting shard stats on cluster [{self.cluster_name}]"
self.logger.exception(msg)
raise exceptions.RallyError(msg)

shard_metadata = {
"cluster": self.cluster_name
}

for node_stats in sample["nodes"].values():
node_name = node_stats["name"]
collected_node_stats = collections.OrderedDict()
collected_node_stats["name"] = "shard-stats"
shard_stats = node_stats["indices"].get("shards")

for index_name, stats in shard_stats.items():
for curr_shard in stats:
for shard_id, curr_stats in curr_shard.items():
doc = {
"name": "shard-stats",
"shard-id": shard_id,
"index": index_name,
"primary": curr_stats.get("routing", {}).get("primary"),
"docs": curr_stats.get("docs", {}).get("count", -1),
"store": curr_stats.get("store", {}).get("size_in_bytes", -1),
"segments-count": curr_stats.get("segments", {}).get("count", -1),
"node": node_name
}
self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=shard_metadata)


class NodeStats(TelemetryDevice):
"""
Gathers different node stats.
Expand Down
167 changes: 167 additions & 0 deletions tests/telemetry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,173 @@ def test_stores_multi_index_multi_shard_stats(self, metrics_store_put_doc):
], any_order=True)


class ShardStatsTests(TestCase):

@mock.patch("esrally.metrics.EsMetricsStore.put_doc")
def test_stores_single_shard_stats(self, metrics_store_put_doc):
node_stats_response = {
"cluster_name": "elasticsearch",
"nodes": {
"Zbl_e8EyRXmiR47gbHgPfg": {
"timestamp": 1524379617017,
"name": "rally0",
"transport_address": "127.0.0.1:9300",
"host": "127.0.0.1",
"ip": "127.0.0.1:9300",
"roles": [
"master",
"data",
"ingest"
],
"indices": {
"docs": {
"count": 76892364,
"deleted": 324530
},
"store": {
"size_in_bytes": 983409834
},
"shards" : {
"geonames": [
{
"0" : {
"routing" : {
"state" : "STARTED",
"primary" : True,
"node" : "gHQpKO-IT5uo8WVTPmAZXw",
"relocating_node" : None
},
"docs" : {
"count" : 1000,
"deleted" : 0
},
"store" : {
"size_in_bytes" : 212027,
"reserved_in_bytes" : 0
},
"segments" : {
"count" : 8,
"memory_in_bytes" : 46872,
"terms_memory_in_bytes" : 13056,
"stored_fields_memory_in_bytes" : 3904,
"term_vectors_memory_in_bytes" : 0,
"norms_memory_in_bytes" : 0,
"points_memory_in_bytes" : 0,
"doc_values_memory_in_bytes" : 29912,
"index_writer_memory_in_bytes" : 0,
"version_map_memory_in_bytes" : 0,
"fixed_bit_set_memory_in_bytes" : 0,
"max_unsafe_auto_id_timestamp" : -1,
"file_sizes" : { }
}
}
}
]
}
}
}
}
}

client = Client(nodes=SubClient(stats=node_stats_response))
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
recorder = telemetry.ShardStatsRecorder(cluster_name="remote",
client=client,
metrics_store=metrics_store,
sample_interval=53)
recorder.record()

shard_metadata = {
"cluster": "remote"
}

metrics_store_put_doc.assert_has_calls([
mock.call({
"name": "shard-stats",
"shard-id": "0",
"index": "geonames",
"primary": True,
"docs": 1000,
"store": 212027,
"segments-count": 8,
"node": "rally0"
}, level=MetaInfoScope.cluster, meta_data=shard_metadata)
], any_order=True)

@mock.patch("esrally.metrics.EsMetricsStore.put_doc")
def test_missing_properties_shard_stats(self, metrics_store_put_doc):
node_stats_response = {
"cluster_name": "elasticsearch",
"nodes": {
"Zbl_e8EyRXmiR47gbHgPfg": {
"timestamp": 1524379617017,
"name": "rally0",
"transport_address": "127.0.0.1:9300",
"host": "127.0.0.1",
"ip": "127.0.0.1:9300",
"roles": [
"master",
"data",
"ingest"
],
"indices": {
"docs": {
"count": 76892364,
"deleted": 324530
},
"store": {
"size_in_bytes": 983409834
},
"shards" : {
"geonames": [
{
"0" : {
"routing" : {
"state" : "STARTED",
"primary" : True,
"node" : "gHQpKO-IT5uo8WVTPmAZXw",
"relocating_node" : None
},
"store" : {
"size_in_bytes" : 212027,
"reserved_in_bytes" : 0
}
}
}
]
}
}
}
}
}

client = Client(nodes=SubClient(stats=node_stats_response))
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
recorder = telemetry.ShardStatsRecorder(cluster_name="remote",
client=client,
metrics_store=metrics_store,
sample_interval=53)
recorder.record()

shard_metadata = {
"cluster": "remote"
}

metrics_store_put_doc.assert_has_calls([
mock.call({
"name": "shard-stats",
"shard-id": "0",
"index": "geonames",
"primary": True,
"docs": -1,
"store": 212027,
"segments-count": -1,
"node": "rally0"
}, level=MetaInfoScope.cluster, meta_data=shard_metadata)
], any_order=True)

class TestSearchableSnapshotsStats:
response_fragment_total = [
{
Expand Down

0 comments on commit 1de6ff8

Please sign in to comment.