From 5324e0989d375e3f24c3a5d1bac1e0f23f7b7206 Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Tue, 29 May 2018 13:05:01 +0200 Subject: [PATCH] Allow to store custom metric document format With this commit we add a new method `put_doc` to the metrics store which allows users to store custom documents instead of the pre-defined format. Closes #506 --- esrally/metrics.py | 72 ++++++++++++++++++++++++++++++----- tests/metrics_test.py | 88 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 150 insertions(+), 10 deletions(-) diff --git a/esrally/metrics.py b/esrally/metrics.py index 937f54e24..6017c4b51 100644 --- a/esrally/metrics.py +++ b/esrally/metrics.py @@ -445,8 +445,8 @@ def put_count_cluster_level(self, name, count, unit=None, task=None, operation=N Defaults to None. The metrics store will derive the timestamp automatically. :param meta_data: A dict, containing additional key-value pairs. Defaults to None. """ - self._put(MetaInfoScope.cluster, None, name, count, unit, task, operation, operation_type, sample_type, absolute_time, - relative_time, meta_data) + self._put_metric(MetaInfoScope.cluster, None, name, count, unit, task, operation, operation_type, sample_type, absolute_time, + relative_time, meta_data) def put_count_node_level(self, node_name, name, count, unit=None, task=None, operation=None, operation_type=None, sample_type=SampleType.Normal, absolute_time=None, relative_time=None, meta_data=None): @@ -467,8 +467,8 @@ def put_count_node_level(self, node_name, name, count, unit=None, task=None, ope Defaults to None. The metrics store will derive the timestamp automatically. :param meta_data: A dict, containing additional key-value pairs. Defaults to None. """ - self._put(MetaInfoScope.node, node_name, name, count, unit, task, operation, operation_type, sample_type, absolute_time, - relative_time, meta_data) + self._put_metric(MetaInfoScope.node, node_name, name, count, unit, task, operation, operation_type, sample_type, absolute_time, + relative_time, meta_data) # should be a float def put_value_cluster_level(self, name, value, unit, task=None, operation=None, operation_type=None, sample_type=SampleType.Normal, @@ -489,8 +489,8 @@ def put_value_cluster_level(self, name, value, unit, task=None, operation=None, Defaults to None. The metrics store will derive the timestamp automatically. :param meta_data: A dict, containing additional key-value pairs. Defaults to None. """ - self._put(MetaInfoScope.cluster, None, name, value, unit, task, operation, operation_type, sample_type, absolute_time, - relative_time, meta_data) + self._put_metric(MetaInfoScope.cluster, None, name, value, unit, task, operation, operation_type, sample_type, absolute_time, + relative_time, meta_data) def put_value_node_level(self, node_name, name, value, unit, task=None, operation=None, operation_type=None, sample_type=SampleType.Normal, absolute_time=None, relative_time=None, meta_data=None): @@ -511,11 +511,11 @@ def put_value_node_level(self, node_name, name, value, unit, task=None, operatio Defaults to None. The metrics store will derive the timestamp automatically. :param meta_data: A dict, containing additional key-value pairs. Defaults to None. """ - self._put(MetaInfoScope.node, node_name, name, value, unit, task, operation, operation_type, sample_type, absolute_time, - relative_time, meta_data) + self._put_metric(MetaInfoScope.node, node_name, name, value, unit, task, operation, operation_type, sample_type, absolute_time, + relative_time, meta_data) - def _put(self, level, level_key, name, value, unit, task, operation, operation_type, sample_type, absolute_time=None, - relative_time=None, meta_data=None): + def _put_metric(self, level, level_key, name, value, unit, task, operation, operation_type, sample_type, absolute_time=None, + relative_time=None, meta_data=None): if level == MetaInfoScope.cluster: meta = self._meta_info[MetaInfoScope.cluster].copy() elif level == MetaInfoScope.node: @@ -560,6 +560,58 @@ def _put(self, level, level_key, name, value, unit, task, operation, operation_t assert self.lap is not None, "Attempting to store [%s] without a lap." % doc self._add(doc) + def put_doc(self, doc, level=None, node_name=None, meta_data=None, absolute_time=None, relative_time=None): + """ + Adds a new document to the metrics store. It will merge additional properties into the doc such as timestamps or track info. + + :param doc: The raw document as a ``dict``. Ownership is transferred to the metrics store (i.e. don't reuse that object). + :param level: Whether these are cluster or node-level metrics. May be ``None`` if not applicable. + :param node_name: The name of the node in case metrics are on node level. + :param meta_data: A dict, containing additional key-value pairs. Defaults to None. + :param absolute_time: The absolute timestamp in seconds since epoch when this metric record is stored. Defaults to None. The metrics + store will derive the timestamp automatically. + :param relative_time: The relative timestamp in seconds since the start of the benchmark when this metric record is stored. + Defaults to None. The metrics store will derive the timestamp automatically. + """ + if level == MetaInfoScope.cluster: + meta = self._meta_info[MetaInfoScope.cluster].copy() + elif level == MetaInfoScope.node: + meta = self._meta_info[MetaInfoScope.cluster].copy() + if node_name in self._meta_info[MetaInfoScope.node]: + meta.update(self._meta_info[MetaInfoScope.node][node_name]) + elif level is None: + meta = None + else: + raise exceptions.SystemSetupError("Unknown meta info level [{}]".format(level)) + + if meta and meta_data: + meta.update(meta_data) + + if absolute_time is None: + absolute_time = self._clock.now() + if relative_time is None: + relative_time = self._stop_watch.split_time() + + doc.update({ + "@timestamp": time.to_epoch_millis(absolute_time), + "relative-time": int(relative_time * 1000 * 1000), + "trial-id": self._trial_id, + "trial-timestamp": self._trial_timestamp, + "environment": self._environment_name, + "track": self._track, + "lap": self._lap, + "challenge": self._challenge, + "car": self._car_name, + + }) + if meta: + doc["meta"] = meta + if self._track_params: + doc["track-params"] = self._track_params + + assert self.lap is not None, "Attempting to store [%s] without a lap." % doc + self._add(doc) + def bulk_add(self, memento): """ Adds raw metrics store documents previously created with #to_externalizable() diff --git a/tests/metrics_test.py b/tests/metrics_test.py index ea1224ca4..efca704e7 100644 --- a/tests/metrics_test.py +++ b/tests/metrics_test.py @@ -288,6 +288,94 @@ def test_put_value_with_meta_info(self): self.es_mock.create_index.assert_called_with(index="rally-metrics-2016-01") self.es_mock.bulk_index.assert_called_with(index="rally-metrics-2016-01", doc_type="metrics", items=[expected_doc]) + def test_put_doc_no_meta_data(self): + self.metrics_store.open(EsMetricsTests.TRIAL_ID, EsMetricsTests.TRIAL_TIMESTAMP, "test", "append", "defaults", create=True) + self.metrics_store.lap = 1 + + self.metrics_store.put_doc(doc={ + "name": "custom_metric", + "total": 1234567, + "per-shard": [17, 18, 1289, 273, 222], + "unit": "byte" + }) + expected_doc = { + "@timestamp": StaticClock.NOW * 1000, + "trial-id": EsMetricsTests.TRIAL_ID, + "trial-timestamp": "20160131T000000Z", + "relative-time": 0, + "environment": "unittest", + "track": "test", + "track-params": { + "shard-count": 3 + }, + "lap": 1, + "challenge": "append", + "car": "defaults", + "name": "custom_metric", + "total": 1234567, + "per-shard": [17, 18, 1289, 273, 222], + "unit": "byte" + } + self.metrics_store.close() + self.es_mock.exists.assert_called_with(index="rally-metrics-2016-01") + self.es_mock.create_index.assert_called_with(index="rally-metrics-2016-01") + self.es_mock.bulk_index.assert_called_with(index="rally-metrics-2016-01", doc_type="metrics", items=[expected_doc]) + + def test_put_doc_with_metadata(self): + # add a user-defined tag + self.cfg.add(config.Scope.application, "race", "user.tag", "intention:testing,disk_type:hdd") + self.metrics_store.open(EsMetricsTests.TRIAL_ID, EsMetricsTests.TRIAL_TIMESTAMP, "test", "append", "defaults", create=True) + self.metrics_store.lap = 1 + + # Ensure we also merge in cluster level meta info + self.metrics_store.add_meta_info(metrics.MetaInfoScope.cluster, None, "source_revision", "abc123") + self.metrics_store.add_meta_info(metrics.MetaInfoScope.node, "node0", "os_name", "Darwin") + self.metrics_store.add_meta_info(metrics.MetaInfoScope.node, "node0", "os_version", "15.4.0") + # Ensure we separate node level info by node + self.metrics_store.add_meta_info(metrics.MetaInfoScope.node, "node1", "os_name", "Linux") + self.metrics_store.add_meta_info(metrics.MetaInfoScope.node, "node1", "os_version", "4.2.0-18-generic") + + self.metrics_store.put_doc(doc={ + "name": "custom_metric", + "total": 1234567, + "per-shard": [17, 18, 1289, 273, 222], + "unit": "byte" + }, level=metrics.MetaInfoScope.node, + node_name="node0", + meta_data={ + "node_type": "hot" + }) + expected_doc = { + "@timestamp": StaticClock.NOW * 1000, + "trial-id": EsMetricsTests.TRIAL_ID, + "trial-timestamp": "20160131T000000Z", + "relative-time": 0, + "environment": "unittest", + "track": "test", + "track-params": { + "shard-count": 3 + }, + "lap": 1, + "challenge": "append", + "car": "defaults", + "name": "custom_metric", + "total": 1234567, + "per-shard": [17, 18, 1289, 273, 222], + "unit": "byte", + "meta": { + "tag_intention": "testing", + "tag_disk_type": "hdd", + "source_revision": "abc123", + "os_name": "Darwin", + "os_version": "15.4.0", + "node_type": "hot" + } + } + self.metrics_store.close() + self.es_mock.exists.assert_called_with(index="rally-metrics-2016-01") + self.es_mock.create_index.assert_called_with(index="rally-metrics-2016-01") + self.es_mock.bulk_index.assert_called_with(index="rally-metrics-2016-01", doc_type="metrics", items=[expected_doc]) + def test_get_value(self): throughput = 5000 search_result = {