Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve input actions when yielding in bulk helpers #980

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

lundberg
Copy link

@lundberg lundberg commented Jul 13, 2019

This PR solves the issue of the streaming_bulk and parallel_bulk helpers discarding original input actions while exhausting the generator. Somewhat related to #940 and probably solves that issue as well.

Use case
Let's say you have a lot of items, too many for memory, that you want to index and report on. For example; a XML stream-parsing generator, or an unresolved django queryset.
To index those items with the current api, you pass that generator as actions argument along with an expand_action_callback function to construct the actual documents.

Problem is that if you want to take further actions, like reporting, to the items in the generator after being index (or failed), the original input item (xml element or django model in this example) is not yielded back by the bulk helpers, resulting in an exhausted generator with items lost in translation, unable to report on.

The bulk helpers therefore needs to yield each input item along with the current ES action result.

Example

def parse(filename):
    # open file, iter parse xml and yield elements

def make_document(element):
    return {
        "_index": "articles",
        "_type": "article",
        "_id": element["id"],
        "_source": {
            "title": element["title"],
            # ...
        },
    }

def index(elements):
    for ok, result in parallel_bulk(client, elements, expand_action_callback=make_document):        
        yield ok, result["index"]["action"]  # Yielding original input element for further actions

def report(elements):
    for ok, element in elements:
        # report on parsed and indexed element
        yield element

elements = parse("data.xml")
elements = index(elements)
elements = report(elements)
# elements = something_more ...

collections.deque(elements)

Additionally, this PR adds two new low level bulk helpers, streaming_chunks and parallel_chunks, allowing the chunking function to be customised.

Note: Current versions of streaming_bulk and parallel_bulk uses map() to extend actions, which in python 2 exhausts the generator at once and loads all items in the generator in memory. This PR also fixes this issue as an implementation bonus.

@elasticmachine
Copy link
Collaborator

Since this is a community submitted pull request, a Jenkins build has not been kicked off automatically. Can an Elastic organization member please verify the contents of this patch and then kick off a build manually?

@lundberg lundberg changed the title Include input actions when yielding in bulk helpers Preserve input actions when yielding in bulk helpers Jul 13, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants