Skip to content

Commit

Permalink
Allow to measure create/restore snapshot speed (#1013)
Browse files Browse the repository at this point in the history
With this commit we add the ability to measure throughput (in byte/s)
of the create snapshot and restore snapshot operations (the latter is
piggybacked on shard recovery in Elasticsearch). To have Rally report
the actual throughput of the server-side operation, the respective
runners don't take the response time of Elasticsearch into account but
rather how long the operation actually took. We also provide additional
meta-data how many bytes have been snapshotted / recovered. All this
let's us calculate the throughput in byte/s which is more meaningful
than the actual response time of the request.
  • Loading branch information
danielmitterdorfer committed Jun 15, 2020
1 parent 6961f87 commit 9f73d5d
Show file tree
Hide file tree
Showing 7 changed files with 364 additions and 118 deletions.
4 changes: 4 additions & 0 deletions docs/migrate.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ Pipelines from-sources-complete and from-sources-skip-build are deprecated

Rally 2.0.1 caches source artifacts automatically in ``~/.rally/benchmarks/distributions/src``. Therefore, it is not necessary anymore to explicitly skip the build with ``--pipeline=from-sources-skip-build``. Specify ``--pipeline=from-sources`` instead. See the :doc:`pipeline reference documentation </pipelines>` for more details.

wait-for-recovery requires an ``index`` parameter
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Previously, the ``wait-for-recovery`` operation checked all indices but with Rally 2.0.1 an ``index`` parameter is required and only that index (or index pattern) is checked.

Migrating to Rally 2.0.0
------------------------
Expand Down
39 changes: 17 additions & 22 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,20 @@ This is an administrative operation. Metrics are not reported by default. Report

This operation is :ref:`retryable <track_operations>`.

create-snapshot
~~~~~~~~~~~~~~~

With the operation ``create-snapshot`` you can `create a snapshot <https://www.elastic.co/guide/en/elasticsearch/reference/current/snapshots-take-snapshot.html>`_. The ``create-snapshot`` operation supports the following parameters:

* ``repository`` (mandatory): The name of the snapshot repository to use.
* ``snapshot`` (mandatory): The name of the snapshot to create.
* ``body`` (mandatory): The body of the create snapshot request.
* ``wait-for-completion`` (optional, defaults to ``False``): Whether this call should return immediately or block until the snapshot is created.
* ``request-params`` (optional): A structure containing HTTP request parameters.

.. note::
When ``wait-for-completion`` is set to ``true`` Rally will report the achieved throughput in byte/s.

restore-snapshot
~~~~~~~~~~~~~~~~

Expand All @@ -1125,34 +1139,15 @@ With the operation ``restore-snapshot`` you can restore a snapshot from an alrea
"request_timeout": 7200
}

However, this might not work if a proxy is in between the client and Elasticsearch and the proxy has a shorter request timeout configured than the client. In this case, keep the default value for ``wait-for-completion`` and instead add a ``wait-for-recovery`` runner in the next step. This has the additional advantage that you'll get a progress report while the snapshot is being restored.

This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.
However, this might not work if a proxy is in between the client and Elasticsearch and the proxy has a shorter request timeout configured than the client. In this case, keep the default value for ``wait-for-completion`` and instead add a ``wait-for-recovery`` runner in the next step.

wait-for-recovery
~~~~~~~~~~~~~~~~~

With the operation ``wait-for-recovery`` you can wait until an ongoing shard recovery finishes. The ``wait-for-recovery`` operation supports the following parameters:

* ``completion-recheck-attempts`` (optional, defaults to 3): It might be possible that the `index recovery API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-recovery.html>`_ reports that there are no active shard recoveries when a new one might be scheduled shortly afterwards. Therefore, this operation will check several times whether there are still no active recoveries. In between those attempts, it will wait for a time period specified by ``completion-recheck-wait-period``.
* ``completion-recheck-wait-period`` (optional, defaults to 2 seconds): Time in seconds to wait in between consecutive attempts.

.. warning::

By default this operation will run unthrottled (like any other) but you should limit the number of calls by specifying one of the ``target-throughput`` or ``target-interval`` properties on the corresponding task::

{
"operation": {
"operation-type": "wait-for-recovery",
"completion-recheck-attempts": 2,
"completion-recheck-wait-period": 5
},
"target-interval": 10
}

In this example, Rally will check the progress of shard recovery every ten seconds (as specified by ``target-throughput``). When the index recovery API reports that there are no active recoveries, it will still check this twice (``completion-recheck-attempts``), waiting for five seconds in between those calls (``completion-recheck-wait-period``).

This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.
* ``index`` (mandatory): The name of the index or an index pattern which is being recovered.
* ``completion-recheck-wait-period`` (optional, defaults to 1 seconds): Time in seconds to wait in between consecutive attempts.

This operation is :ref:`retryable <track_operations>`.

Expand Down
14 changes: 11 additions & 3 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1304,9 +1304,17 @@ async def __call__(self, *args, **kwargs):
processing_start = time.perf_counter()
total_ops, total_ops_unit, request_meta_data = await execute_single(runner, self.es, params, self.on_error)
processing_end = time.perf_counter()
stop = request_context["request_end"]
service_time = request_context["request_end"] - request_context["request_start"]
processing_time = processing_end - processing_start
stop = request_context["request_end"]
# allow runners to override service time with request metadata in *very* specific cases. By default we
# will determine this by measuring actual times.
service_time = request_meta_data.pop("service_time",
request_context["request_end"] - request_context["request_start"])
# also allow runners to override the time period. This is relevant for throughput calculation and we
# assume *if* this is overridden via request meta data, the corresponding operation is executed only
# once but not for several iterations. Otherwise, the time period would need to monotonically increase
# as is evident by the regular calculation (stop - total_start).
time_period = request_meta_data.pop("time_period", stop - total_start)
# Do not calculate latency separately when we don't throttle throughput. This metric is just confusing then.
latency = stop - absolute_expected_schedule_time if throughput_throttled else service_time
# If this task completes the parent task we should *not* check for completion by another client but
Expand All @@ -1328,7 +1336,7 @@ async def __call__(self, *args, **kwargs):
self.sampler.add(self.task, self.client_id, sample_type, request_meta_data,
convert.seconds_to_ms(latency), convert.seconds_to_ms(service_time),
convert.seconds_to_ms(processing_time), total_ops, total_ops_unit,
(stop - total_start), progress)
time_period, progress)

if completed:
self.logger.info("Task is considered completed due to external event.")
Expand Down
124 changes: 76 additions & 48 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def register_default_runners():
# This is an administrative operation but there is no need for a retry here as we don't issue a request
register_runner(track.OperationType.Sleep.name, Sleep(), async_runner=True)
# these requests should not be retried as they are not idempotent
register_runner(track.OperationType.CreateSnapshot.name, CreateSnapshot(), async_runner=True)
register_runner(track.OperationType.RestoreSnapshot.name, RestoreSnapshot(), async_runner=True)
# We treat the following as administrative commands and thus already start to wrap them in a retry.
register_runner(track.OperationType.ClusterHealth.name, Retry(ClusterHealth()), async_runner=True)
Expand Down Expand Up @@ -1449,6 +1450,43 @@ def __repr__(self, *args, **kwargs):
return "create-snapshot-repository"


class CreateSnapshot(Runner):
"""
Creates a new snapshot repository
"""
async def __call__(self, es, params):
request_params = params.get("request-params", {})
wait_for_completion = params.get("wait-for-completion", False)
repository = mandatory(params, "repository", repr(self))
snapshot = mandatory(params, "snapshot", repr(self))
body = mandatory(params, "body", repr(self))
response = await es.snapshot.create(repository=repository,
snapshot=snapshot,
body=body,
params=request_params,
wait_for_completion=wait_for_completion)

# We can derive a more useful throughput metric if the snapshot has successfully completed
if wait_for_completion and response.get("snapshot", {}).get("state") == "SUCCESS":
stats_response = await es.snapshot.status(repository=repository,
snapshot=snapshot)
size = stats_response["snapshots"][0]["stats"]["total"]["size_in_bytes"]
# while the actual service time as determined by Rally should be pretty accurate, the actual time it took
# to restore allows for a correct calculation of achieved throughput.
time_in_millis = stats_response["snapshots"][0]["stats"]["time_in_millis"]
time_in_seconds = time_in_millis / 1000
return {
"weight": size,
"unit": "byte",
"success": True,
"service_time": time_in_seconds,
"time_period": time_in_seconds
}

def __repr__(self, *args, **kwargs):
return "create-snapshot"


class RestoreSnapshot(Runner):
"""
Restores a snapshot from an already registered repository
Expand All @@ -1466,58 +1504,48 @@ def __repr__(self, *args, **kwargs):


class IndicesRecovery(Runner):
def __init__(self):
super().__init__()
self._completed = False
self._percent_completed = 0.0
self._last_recovered = None

@property
def completed(self):
return self._completed

@property
def percent_completed(self):
return self._percent_completed

async def __call__(self, es, params):
remaining_attempts = params.get("completion-recheck-attempts", 3)
wait_period = params.get("completion-recheck-wait-period", 2)
response = None
while not response and remaining_attempts > 0:
response = await es.indices.recovery(active_only=True)
remaining_attempts -= 1
# This might also happen if all recoveries have just finished and we happen to call the API
# before the next recovery is scheduled.
index = mandatory(params, "index", repr(self))
wait_period = params.get("completion-recheck-wait-period", 1)

all_shards_done = False
total_recovered = 0
total_start_millis = sys.maxsize
total_end_millis = 0

# wait until recovery is done
while not all_shards_done:
response = await es.indices.recovery(index=index)
# This might happen if we happen to call the API before the next recovery is scheduled.
if not response:
self.logger.debug("Empty index recovery response for [%s].", index)
else:
# check whether all shards are done
all_shards_done = True
total_recovered = 0
total_start_millis = sys.maxsize
total_end_millis = 0
for _, idx_data in response.items():
for _, shard_data in idx_data.items():
for shard in shard_data:
all_shards_done = all_shards_done and (shard["stage"] == "DONE")
total_start_millis = min(total_start_millis, shard["start_time_in_millis"])
total_end_millis = max(total_end_millis, shard["stop_time_in_millis"])
idx_size = shard["index"]["size"]
total_recovered += idx_size["recovered_in_bytes"]
self.logger.debug("All shards done for [%s]: [%s].", index, all_shards_done)

if not all_shards_done:
await asyncio.sleep(wait_period)

if not response:
self._completed = True
self._percent_completed = 1.0
self._last_recovered = None
return 0, "bytes"
else:
recovered = 0
total_size = 0
for _, idx_data in response.items():
for _, shard_data in idx_data.items():
for shard in shard_data:
idx_size = shard["index"]["size"]
recovered += idx_size["recovered_in_bytes"]
total_size += idx_size["total_in_bytes"]
# translog is not in size but rather in absolute numbers. Ignore it for progress reporting.
# translog = shard_data["translog"]
# we only consider it completed if we get an empty response
self._completed = False
self._percent_completed = recovered / total_size
# this is cumulative so we need to consider the data from last time
if self._last_recovered:
newly_recovered = max(recovered - self._last_recovered, 0)
else:
newly_recovered = recovered
self._last_recovered = recovered
return newly_recovered, "bytes"
response_time_in_seconds = (total_end_millis - total_start_millis) / 1000
return {
"weight": total_recovered,
"unit": "byte",
"success": True,
"service_time": response_time_in_seconds,
"time_period": response_time_in_seconds
}

def __repr__(self, *args, **kwargs):
return "wait-for-recovery"
Expand Down
8 changes: 6 additions & 2 deletions esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,9 @@ class OperationType(Enum):
Search = 3
Bulk = 4
RawRequest = 5
WaitForRecovery = 6
CreateSnapshot = 7

# administrative actions
ForceMerge = 1001
ClusterHealth = 1002
Expand All @@ -440,8 +443,7 @@ class OperationType(Enum):
DeleteSnapshotRepository = 1019
CreateSnapshotRepository = 1020
RestoreSnapshot = 1021
WaitForRecovery = 1022
PutSettings = 1023
PutSettings = 1022

@property
def admin_op(self):
Expand Down Expand Up @@ -501,6 +503,8 @@ def from_hyphenated_string(cls, v):
return OperationType.DeleteSnapshotRepository
elif v == "create-snapshot-repository":
return OperationType.CreateSnapshotRepository
elif v == "create-snapshot":
return OperationType.CreateSnapshot
elif v == "restore-snapshot":
return OperationType.RestoreSnapshot
elif v == "wait-for-recovery":
Expand Down
67 changes: 66 additions & 1 deletion tests/driver/driver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,15 @@ def percent_completed(self):
async def __call__(self, es, params):
self.iterations_left -= 1

class RunnerOverridingTimes:
async def __call__(self, es, params):
return {
"weight": 1,
"unit": "ops",
"service_time": 1.23,
"time_period": 1.23
}

def __init__(self, methodName):
super().__init__(methodName)
self.runner_with_progress = None
Expand All @@ -991,7 +1000,9 @@ def context_managed(self, mock):
def setUp(self):
runner.register_default_runners()
self.runner_with_progress = AsyncExecutorTests.RunnerWithProgress()
self.runner_overriding_times = AsyncExecutorTests.RunnerOverridingTimes()
runner.register_runner("unit-test-recovery", self.runner_with_progress, async_runner=True)
runner.register_runner("override-times", self.runner_overriding_times, async_runner=True)

@mock.patch("elasticsearch.Elasticsearch")
@run_async
Expand Down Expand Up @@ -1063,7 +1074,6 @@ async def test_execute_schedule_with_progress_determined_by_runner(self, es):
"request_start": 0,
"request_end": 10
}
es.bulk.return_value = as_future(io.StringIO('{"errors": false, "took": 8}'))

params.register_param_source_for_name("driver-test-param-source", DriverTestParamSource)
test_track = track.Track(name="unittest", description="unittest track",
Expand Down Expand Up @@ -1116,6 +1126,61 @@ async def test_execute_schedule_with_progress_determined_by_runner(self, es):
self.assertEqual(1, sample.total_ops)
self.assertEqual("ops", sample.total_ops_unit)

@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_execute_schedule_runner_overrides_times(self, es):
es.init_request_context.return_value = {
"request_start": 0,
"request_end": 10
}

params.register_param_source_for_name("driver-test-param-source", DriverTestParamSource)
test_track = track.Track(name="unittest", description="unittest track",
indices=None,
challenges=None)

task = track.Task("override-times", track.Operation("override-times", operation_type="override-times", params={
# we need this because DriverTestParamSource does not know that we only have one iteration and hence
# size() returns incorrect results
"size": 1
},
param_source="driver-test-param-source"),
warmup_iterations=0, iterations=1, clients=1)
param_source = track.operation_parameters(test_track, task)
schedule = driver.schedule_for(task, 0, param_source)

sampler = driver.Sampler(start_timestamp=time.perf_counter())
cancel = threading.Event()
complete = threading.Event()

execute_schedule = driver.AsyncExecutor(client_id=0,
task=task,
schedule=schedule,
es={
"default": es
},
sampler=sampler,
cancel=cancel,
complete=complete,
on_error="continue")
await execute_schedule()

samples = sampler.samples

self.assertFalse(complete.is_set(), "Executor should not auto-complete a normal task")
self.assertEqual(1, len(samples))
sample = samples[0]
self.assertEqual(0, sample.client_id)
self.assertEqual(task, sample.task)
# we don't have any warmup samples
self.assertEqual(metrics.SampleType.Normal, sample.sample_type)
self.assertEqual(sample.latency_ms, sample.service_time_ms)
self.assertEqual(1, sample.total_ops)
self.assertEqual("ops", sample.total_ops_unit)
self.assertEqual(1230, sample.service_time_ms)
self.assertEqual(1.23, sample.time_period)


@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_execute_schedule_throughput_throttled(self, es):
Expand Down
Loading

0 comments on commit 9f73d5d

Please sign in to comment.