Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to filter tasks by tag #1154

Merged
merged 2 commits into from
Jan 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion docs/command_line_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,15 @@ You can use ``--include-tasks`` to specify a comma-separated list of tasks that

Tasks will be executed in the order that are defined in the challenge, not in the order they are defined in the command.

.. note::

Task filters are case-sensitive.

**Examples**:

* Execute only the tasks with the name ``index`` and ``term``: ``--include-tasks="index,term"``
* Execute only tasks of type ``search``: ``--include-tasks="type:search"``
* Execute only tasks that contain the tag ``read-op``: ``--include-tasks="tag:read-op"``
* You can also mix and match: ``--include-tasks="index,type:search"``

``exclude-tasks``
Expand All @@ -344,7 +349,8 @@ You can use ``--exclude-tasks`` to specify a comma-separated list of tasks that

* Skip any tasks with the name ``index`` and ``term``: ``--exclude-tasks="index,term"``
* Skip any tasks of type ``search``: ``--exclude-tasks="type:search"``
* You can also mix and match: ``--exclude-tasks="index,type:search"``
* Skip any tasks that contain the tag ``setup``: ``--exclude-tasks="tag:setup"``
* You can also mix and match: ``--exclude-tasks="index,type:search,tag:setup"``

``team-repository``
~~~~~~~~~~~~~~~~~~~
Expand Down
1 change: 1 addition & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ schedule
The ``schedule`` element contains a list of tasks that are executed by Rally, i.e. it describes the workload. Each task consists of the following properties:

* ``name`` (optional): This property defines an explicit name for the given task. By default the operation's name is implicitly used as the task name but if the same operation is run multiple times, a unique task name must be specified using this property.
* ``tags`` (optional): This property defines one or more tags for the given task. This can be used for :ref:`task filtering <clr_include_tasks>`, e.g. with ``--exclude-tasks="tag:setup"`` all tasks except the ones that contain the tag ``setup`` are executed.
* ``operation`` (mandatory): This property refers either to the name of an operation that has been defined in the ``operations`` section or directly defines an operation inline.
* ``clients`` (optional, defaults to 1): The number of clients that should execute a task concurrently.
* ``warmup-iterations`` (optional, defaults to 0): Number of iterations that each client should execute to warmup the benchmark candidate. Warmup iterations will not show up in the measurement results.
Expand Down
3 changes: 3 additions & 0 deletions esrally/track/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,8 @@ def _filters_from_filtered_tasks(self, filtered_tasks):
elif len(spec) == 2:
if spec[0] == "type":
filters.append(track.TaskOpTypeFilter(spec[1]))
elif spec[0] == "tag":
filters.append(track.TaskTagFilter(spec[1]))
else:
raise exceptions.SystemSetupError(f"Invalid format for filtered tasks: [{t}]. "
f"Expected [type] but got [{spec[0]}].")
Expand Down Expand Up @@ -1458,6 +1460,7 @@ def parse_task(self, task_spec, ops, challenge_name, default_warmup_iterations=N
task_name = self._r(task_spec, "name", error_ctx=op.name, mandatory=False, default_value=op.name)
task = track.Task(name=task_name,
operation=op,
tags=self._r(task_spec, "tags", error_ctx=op.name, mandatory=False),
meta_data=self._r(task_spec, "meta", error_ctx=op.name, mandatory=False),
warmup_iterations=self._r(task_spec, "warmup-iterations", error_ctx=op.name, mandatory=False,
default_value=default_warmup_iterations),
Expand Down
31 changes: 27 additions & 4 deletions esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ def __eq__(self, other):
return isinstance(other, type(self)) and self.name == other.name

def __str__(self, *args, **kwargs):
return "filter for task name [%s]" % self.name
return f"filter for task name [{self.name}]"


class TaskOpTypeFilter:
Expand All @@ -721,7 +721,24 @@ def __eq__(self, other):
return isinstance(other, type(self)) and self.op_type == other.op_type

def __str__(self, *args, **kwargs):
return "filter for operation type [%s]" % self.op_type
return f"filter for operation type [{self.op_type}]"


class TaskTagFilter:
def __init__(self, tag_name):
self.tag_name = tag_name

def matches(self, task):
return self.tag_name in task.tags
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to mean case sensitive matching? Is this what we want? I assume include-tasks is currently case sensitive?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, this behavior is consistent with current behavior, i.e. all other task filters are also case-sensitive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed an additional note in the docs about case sensitivity of task filters in 6cd13b2.


def __hash__(self):
return hash(self.tag_name)

def __eq__(self, other):
return isinstance(other, type(self)) and self.tag_name == other.tag_name

def __str__(self, *args, **kwargs):
return f"filter for tasks tagged [{self.tag_name}]"


class Singleton(type):
Expand Down Expand Up @@ -785,10 +802,16 @@ def __eq__(self, other):
class Task:
THROUGHPUT_PATTERN = re.compile(r"(?P<value>(\d*\.)?\d+)\s(?P<unit>\w+/s)")

def __init__(self, name, operation, meta_data=None, warmup_iterations=None, iterations=None, warmup_time_period=None,
time_period=None, clients=1, completes_parent=False, schedule=None, params=None):
def __init__(self, name, operation, tags=None, meta_data=None, warmup_iterations=None, iterations=None,
warmup_time_period=None, time_period=None, clients=1, completes_parent=False, schedule=None, params=None):
self.name = name
self.operation = operation
if isinstance(tags, str):
self.tags = [tags]
elif tags:
self.tags = tags
else:
self.tags = []
self.meta_data = meta_data if meta_data else {}
self.warmup_iterations = warmup_iterations
self.iterations = iterations
Expand Down
34 changes: 31 additions & 3 deletions tests/track/loader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1409,25 +1409,53 @@ def test_filters_tasks(self):
},
{
"operation": "cluster-stats"
},
{
"parallel": {
"tasks": [
{
"name": "query-filtered",
"tags": "include-me",
"operation": "match-all",
},
{
"name": "index-4",
"tags": ["include-me", "bulk-task"],
"operation": "bulk-index",
},
{
"name": "index-5",
"operation": "bulk-index",
}
]
}
},
{
"name": "final-cluster-stats",
"operation": "cluster-stats",
"tags": "include-me"
}
]
}
]
}
reader = loader.TrackSpecificationReader()
full_track = reader("unittest", track_specification, "/mappings")
self.assertEqual(5, len(full_track.challenges[0].schedule))
self.assertEqual(7, len(full_track.challenges[0].schedule))

filtered = self.filter(full_track, include_tasks=["index-3",
"type:search",
# Filtering should also work for non-core operation types.
"type:custom-operation-type"])
"type:custom-operation-type",
"tag:include-me"])

schedule = filtered.challenges[0].schedule
self.assertEqual(3, len(schedule))
self.assertEqual(5, len(schedule))
self.assertEqual(["index-3", "match-all-parallel"], [t.name for t in schedule[0].tasks])
self.assertEqual("match-all-serial", schedule[1].name)
self.assertEqual("cluster-stats", schedule[2].name)
self.assertEqual(["query-filtered", "index-4"], [t.name for t in schedule[3].tasks])
self.assertEqual("final-cluster-stats", schedule[4].name)

def test_filters_exclude_tasks(self):
track_specification = {
Expand Down
11 changes: 9 additions & 2 deletions tests/track/track_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,14 @@ class TaskFilterTests(TestCase):
def create_index_task(self):
return track.Task("create-index-task",
track.Operation("create-index-op",
operation_type=track.OperationType.CreateIndex.to_hyphenated_string()))
operation_type=track.OperationType.CreateIndex.to_hyphenated_string()),
tags=["write-op", "admin-op"])

def search_task(self):
return track.Task("search-task",
track.Operation("search-op",
operation_type=track.OperationType.Search.to_hyphenated_string()))
operation_type=track.OperationType.Search.to_hyphenated_string()),
tags="read-op")

def test_task_name_filter(self):
f = track.TaskNameFilter("create-index-task")
Expand All @@ -252,6 +254,11 @@ def test_task_op_type_filter(self):
self.assertTrue(f.matches(self.create_index_task()))
self.assertFalse(f.matches(self.search_task()))

def test_task_tag_filter(self):
f = track.TaskTagFilter(tag_name="write-op")
self.assertTrue(f.matches(self.create_index_task()))
self.assertFalse(f.matches(self.search_task()))


class TaskTests(TestCase):
def task(self, schedule=None, target_throughput=None, target_interval=None):
Expand Down