Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve input actions when yielding in bulk helpers #980

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions elasticsearch/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .errors import BulkIndexError, ScanError
from .actions import expand_action, streaming_bulk, bulk, parallel_bulk
from .actions import streaming_chunks, parallel_chunks
from .actions import scan, reindex
from .actions import _chunk_actions, _process_bulk_chunk
143 changes: 124 additions & 19 deletions elasticsearch/helpers/actions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from functools import partial
from operator import methodcaller
import time

Expand Down Expand Up @@ -53,14 +54,21 @@ def expand_action(data):
return action, data.get("_source", data)


def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer):
def _chunk_actions(
actions,
chunk_size,
max_chunk_bytes,
serializer,
expand_action_callback=expand_action
):
"""
Split actions into chunks by number or size, serialize them into strings in
the process.
"""
bulk_actions, bulk_data = [], []
size, action_count = 0, 0
for action, data in actions:
for input_action in actions:
action, data = expand_action_callback(input_action)
raw_data, raw_action = data, action
action = serializer.dumps(action)
# +1 to account for the trailing new line character
Expand All @@ -81,9 +89,9 @@ def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer):
bulk_actions.append(action)
if data is not None:
bulk_actions.append(data)
bulk_data.append((raw_action, raw_data))
bulk_data.append((input_action, raw_action, raw_data))
else:
bulk_data.append((raw_action,))
bulk_data.append((input_action, raw_action,))

size += cur_size
action_count += 1
Expand Down Expand Up @@ -121,10 +129,12 @@ def _process_bulk_chunk(

for data in bulk_data:
# collect all the information about failed actions
op_type, action = data[0].copy().popitem()
op_type, action = data[1].copy().popitem()
info = {"error": err_message, "status": e.status_code, "exception": e}
# include original input action
info["action"] = data[0]
if op_type != "delete":
info["data"] = data[1]
info["data"] = data[2]
info.update(action)
exc_errors.append({op_type: info})

Expand All @@ -142,11 +152,13 @@ def _process_bulk_chunk(
for data, (op_type, item) in zip(
bulk_data, map(methodcaller("popitem"), resp["items"])
):
# include original input action
item["action"] = data[0]
ok = 200 <= item.get("status", 500) < 300
if not ok and raise_on_error:
# include original document source
if len(data) > 1:
item["data"] = data[1]
if len(data) > 2:
item["data"] = data[2]
errors.append({op_type: item})

if ok or not errors:
Expand All @@ -173,7 +185,6 @@ def streaming_bulk(
*args,
**kwargs
):

"""
Streaming bulk consumes actions from the iterable passed in and yields
results per action. For non-streaming usecases use
Expand Down Expand Up @@ -206,11 +217,64 @@ def streaming_bulk(
:arg max_backoff: maximum number of seconds a retry will wait
:arg yield_ok: if set to False will skip successful documents in the output
"""
actions = map(expand_action_callback, actions)
chunker = partial(
_chunk_actions,
chunk_size=chunk_size,
max_chunk_bytes=max_chunk_bytes,
serializer=client.transport.serializer,
expand_action_callback=expand_action_callback
)

for bulk_data, bulk_actions in _chunk_actions(
actions, chunk_size, max_chunk_bytes, client.transport.serializer
for item in streaming_chunks(
client,
actions,
chunker,
raise_on_error=raise_on_error,
raise_on_exception=raise_on_exception,
max_retries=max_retries,
initial_backoff=initial_backoff,
max_backoff=max_backoff,
yield_ok=yield_ok,
*args,
**kwargs
):
yield item


def streaming_chunks(
client,
actions,
chunker,
raise_on_error=True,
raise_on_exception=True,
max_retries=0,
initial_backoff=2,
max_backoff=600,
yield_ok=True,
*args,
**kwargs
):
"""
Implementation of the ``streaming_bulk`` helper, chunking actions using
given chunker function.

:arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
:arg actions: iterable containing the actions to be executed
:arg chunker: function to chunk actions into separate ``bulk`` calls,
should yield tuples of raw data and serialized actions.
:arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`)
from the execution of the last chunk when some occur. By default we raise.
:arg raise_on_exception: if ``False`` then don't propagate exceptions from
call to ``bulk`` and just report the items that failed as failed.
:arg max_retries: maximum number of times a document will be retried when
``429`` is received, set to 0 (default) for no retries on ``429``
:arg initial_backoff: number of seconds we should wait before the first
retry. Any subsequent retries will be powers of ``initial_backoff *
2**retry_number``
:arg max_backoff: maximum number of seconds a retry will wait
:arg yield_ok: if set to False will skip successful documents in the output
"""
for bulk_data, bulk_actions in chunker(actions):

for attempt in range(max_retries + 1):
to_retry, to_retry_data = [], []
Expand Down Expand Up @@ -241,9 +305,9 @@ def streaming_bulk(
and (attempt + 1) <= max_retries
):
# _process_bulk_chunk expects strings so we need to
# re-serialize the data
# re-serialize the expanded action and data
to_retry.extend(
map(client.transport.serializer.dumps, data)
map(client.transport.serializer.dumps, data[1:])
)
to_retry_data.append(data)
else:
Expand Down Expand Up @@ -338,12 +402,55 @@ def parallel_bulk(
:arg queue_size: size of the task queue between the main thread (producing
chunks to send) and the processing threads.
"""
chunker = partial(
_chunk_actions,
chunk_size=chunk_size,
max_chunk_bytes=max_chunk_bytes,
serializer=client.transport.serializer,
expand_action_callback=expand_action_callback
)

for item in parallel_chunks(
client,
actions,
chunker,
thread_count=thread_count,
queue_size=queue_size,
*args,
**kwargs
):
yield item


def parallel_chunks(
client,
actions,
chunker,
thread_count=4,
queue_size=4,
*args,
**kwargs
):
"""
Implementation of the ``parallel_bulk`` helper, chunking actions using
given chunker function.

:arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
:arg actions: iterator containing the actions
:arg chunker: function to chunk actions into separate ``bulk`` calls,
should yield tuples of raw data and serialized actions.
:arg thread_count: size of the threadpool to use for the bulk requests
:arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`)
from the execution of the last chunk when some occur. By default we raise.
:arg raise_on_exception: if ``False`` then don't propagate exceptions from
call to ``bulk`` and just report the items that failed as failed.
:arg queue_size: size of the task queue between the main thread (producing
chunks to send) and the processing threads.
"""
# Avoid importing multiprocessing unless parallel_bulk is used
# to avoid exceptions on restricted environments like App Engine
from multiprocessing.pool import ThreadPool

actions = map(expand_action_callback, actions)

class BlockingPool(ThreadPool):
def _setup_queues(self):
super(BlockingPool, self)._setup_queues()
Expand All @@ -361,9 +468,7 @@ def _setup_queues(self):
client, bulk_chunk[1], bulk_chunk[0], *args, **kwargs
)
),
_chunk_actions(
actions, chunk_size, max_chunk_bytes, client.transport.serializer
),
chunker(actions)
):
for item in result:
yield item
Expand Down
2 changes: 1 addition & 1 deletion test_elasticsearch/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def test_chunk_sent_from_different_threads(self, _process_bulk_chunk):
class TestChunkActions(TestCase):
def setUp(self):
super(TestChunkActions, self).setUp()
self.actions = [({"index": {}}, {"some": u"datá", "i": i}) for i in range(100)]
self.actions = [{"_op_type": "index", "some": u"datá", "i": i} for i in range(100)]

def test_chunks_are_chopped_by_byte_size(self):
self.assertEquals(
Expand Down
51 changes: 46 additions & 5 deletions test_elasticsearch/test_server/test_helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from mock import patch
from mock import patch, MagicMock

from elasticsearch import helpers, TransportError
from elasticsearch.helpers import ScanError
Expand Down Expand Up @@ -56,10 +56,13 @@ def test_all_errors_from_chunk_are_raised_on_failure(self):
self.client.cluster.health(wait_for_status="yellow")

try:
for ok, item in helpers.streaming_bulk(
self.client, [{"a": "b"}, {"a": "c"}], index="i", raise_on_error=True
):
docs = [{"a": "b"}, {"a": "c"}]
for i, (ok, item) in enumerate(helpers.streaming_bulk(
self.client, docs, index="i", raise_on_error=True
)):
self.assertTrue(ok)
op_type, info = item.popitem()
self.assertEquals(info["action"], docs[i])
except helpers.BulkIndexError as e:
self.assertEquals(2, len(e.errors))
else:
Expand All @@ -81,8 +84,10 @@ def test_different_op_types(self):
"doc": {"answer": 42},
},
]
for ok, item in helpers.streaming_bulk(self.client, docs):
for i, (ok, item) in enumerate(helpers.streaming_bulk(self.client, docs)):
self.assertTrue(ok)
op_type, info = item.popitem()
self.assertEquals(info["action"], docs[i])

self.assertFalse(self.client.exists(index="i", id=45))
self.assertEquals({"answer": 42}, self.client.get(index="i", id=42)["_source"])
Expand Down Expand Up @@ -117,6 +122,7 @@ def test_transport_error_can_becaught(self):
"_index": "i",
"_type": "_doc",
"_id": 45,
"action": docs[1],
"data": {"f": "v"},
"error": "TransportError(599, 'Error!')",
"status": 599,
Expand Down Expand Up @@ -147,6 +153,7 @@ def test_rejected_documents_are_retried(self):
)
self.assertEquals(3, len(results))
self.assertEquals([True, True, True], [r[0] for r in results])
self.assertEquals(results[1][1]["index"]["action"], docs[1])
self.client.indices.refresh(index="i")
res = self.client.search(index="i")
self.assertEquals({"value": 3, "relation": "eq"}, res["hits"]["total"])
Expand Down Expand Up @@ -175,6 +182,7 @@ def test_rejected_documents_are_retried_at_most_max_retries_times(self):
)
self.assertEquals(3, len(results))
self.assertEquals([False, True, True], [r[0] for r in results])
self.assertEquals(results[0][1]["index"]["action"], docs[0])
self.client.indices.refresh(index="i")
res = self.client.search(index="i")
self.assertEquals({"value": 2, "relation": "eq"}, res["hits"]["total"])
Expand Down Expand Up @@ -203,6 +211,39 @@ def streaming_bulk():
self.assertEquals(4, failing_client._called)


class TestStreamingChunks(ElasticsearchTestCase):
def simple_chunker(self, actions):
for item in actions:
raw_action = {
"index": {
"_id": item["id"]
}
}
data = {
"x": item["x"]
}
action_lines = list(map(
self.client.transport.serializer.dumps, (raw_action, data)
))
yield [(item, raw_action, data)], action_lines

def test_actions_chunker(self):
actions = [{"id": 1, "x": "A"}, {"id": 2, "x": "B"}]
actions_gen = (action for action in actions)

mock_chunker = MagicMock()
mock_chunker.side_effect = self.simple_chunker

for i, (ok, item) in enumerate(helpers.streaming_chunks(
self.client, actions_gen, mock_chunker, index="test-index"
)):
self.assertTrue(ok)
self.assertEquals(item["index"]["_id"], str(actions[i]["id"]))
self.assertEquals(item["index"]["action"], actions[i])

mock_chunker.assert_called_once_with(actions_gen)


class TestBulk(ElasticsearchTestCase):
def test_bulk_works_with_single_item(self):
docs = [{"answer": 42, "_id": 1}]
Expand Down