Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request]: Add with_exception_handling() for PTransforms in Python #31193

Open
2 of 16 tasks
jd185367 opened this issue May 6, 2024 · 3 comments
Open
2 of 16 tasks

Comments

@jd185367
Copy link

jd185367 commented May 6, 2024

What would you like to happen?

Add a way to handle uncaught runtime exceptions thrown within a transform to the Python SDK, e.g. something like this with_exception_handling() method:

def log_errors(error_item: tuple):
    item, error_info = error_item
    logging.error(error_info)
    logging.error(f"Failed to save item: {item}")

_, errors = items | MyTransform().with_exception_handling()
errors.Map(log_errors)

This already exists for DoFns in DoFn.with_exception_handling, and the Java SDK appears to offer something similar for PTransforms: https://beam.apache.org/releases/javadoc/2.15.0/index.html?org/apache/beam/sdk/transforms/WithFailures.html

Motivation

Google Cloud Dataflow will automatically re-try failed messages in streaming jobs; however, in the case of messages that cause runtime errors due to bad data/etc., this can cause messages to be retried infinitely and block other messages from being processed. The only fix we've found is to drain and re-start the pipeline to flush the bad messages, which is manual and risks losing data. There's no way to set a maximum number of retries per message. While we try to parse + validate messages up-front as much as possible, bugs have slipped through to production and caused runtime errors (and obviously, we can't prevent 100% of bugs).

Being able to add a top-level error handler to the pipeline (or a root transform) would solve this, since in a worst-case scenario we could catch any failed messages/collections, log them, and not block the rest of the pipeline.

Right now, though, adding a top-level exception handler isn't possible. For instance, this example will not catch the raised error in Apache Beam 2.56.0, which is very unintuitive:

import logging
import apache_beam as beam

class BuggedTransform(beam.PTransform):
    def expand(self, messages: beam.PCollection) -> beam.PCollection:
        return messages | "Call function w/ bug" >> beam.Map(self.raise_error)
    
    def raise_error(self, m: str):
        if m == "bar":
            raise ValueError("This should be caught")
        print(m)
        return m

class MyTransform(beam.PTransform):
    def expand(self, messages: beam.PCollection) -> beam.PCollection:
        try:
            return messages | "Run transform w/ bug" >> BuggedTransform()
        except Exception as e:      # This should catch the error, but doesn't!
            logging.error(f"Error happened {e}")
            return messages

with beam.Pipeline() as pipeline:
    result = (
        pipeline
        | "Create example data" >> beam.Create(["foo", "bar", "baz"])
        | "Apply my transform" >> MyTransform()
    )

Output:

foo
ERROR:apache_beam.runners.common:This should be caught [while running 'Apply my transform/Run transform w/ bug/Call function w/ bug']
Traceback (most recent call last):
  File "apache_beam\runners\common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 640, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\transforms\core.py", line 1969, in <lambda>
    wrapper = lambda x: [fn(x)]
  File ".\beam_exception_example.py", line 12, in raise_error
    raise ValueError("This should be caught")
ValueError: This should be caught
Traceback (most recent call last):
  File "apache_beam\runners\common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 640, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\transforms\core.py", line 1969, in <lambda>
    wrapper = lambda x: [fn(x)]
  File ".\beam_exception_example.py", line 12, in raise_error
    raise ValueError("This should be caught")
ValueError: This should be caught

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File ".\beam_exception_example.py", line 27, in <module>
    result = (
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\pipeline.py", line 613, in __exit__
    self.result = self.run()
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\pipeline.py", line 587, in run
    return self.runner.run_pipeline(self, self._options)
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 128, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 204, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 228, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 483, in run_stages
    bundle_results = self._execute_bundle(
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 811, in _execute_bundle
    self._run_bundle(
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 1048, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 1384, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 384, in push
    response = self.worker.do_instruction(request)
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 656, in do_instruction
    return getattr(self, request_type)(
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 694, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1113, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 237, in process_encoded
    self.output(decoded_value)
  File "apache_beam\runners\worker\operations.py", line 569, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam\runners\worker\operations.py", line 571, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam\runners\worker\operations.py", line 262, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 265, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 639, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam\runners\common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam\runners\worker\operations.py", line 265, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 639, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam\runners\common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam\runners\worker\operations.py", line 265, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 639, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam\runners\common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam\runners\worker\operations.py", line 265, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1547, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 640, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\transforms\core.py", line 1969, in <lambda>
    wrapper = lambda x: [fn(x)]
  File ".\beam_exception_example.py", line 12, in raise_error
    raise ValueError("This should be caught")
ValueError: This should be caught [while running 'Apply my transform/Run transform w/ bug/Call function w/ bug']

The only solution we've found is to add this sort of error handling separately to every pipeline step, which isn't maintainable (e.g. if we have hundreds of DoFns, adding try-except blocks to all of them individually is labor-intensive).

Related Issues

Issue Priority

Priority: 2 (default / most feature requests should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@liferoad
Copy link
Collaborator

liferoad commented May 6, 2024

#24209 solved this for RunInference.

@jd185367
Copy link
Author

jd185367 commented May 6, 2024

@liferoad while that's helpful as a pattern to look at, I don't think that solves the general issue of catching exceptions in transforms for 2 reasons:

  1. That error-wrapping only applies to the RunInference transform, specifically.
  2. The error wrapping was possible for RunInference since most of its work was just calling a single DoFn, so it could use the existing DoFn.with_exception_handling() method. For transforms that call other transforms, this pattern isn't possible unless every single called transform implements this pattern (which'd require all those transforms to catch their errors this way, etc.) - which basically boils down to forcing every transform to implement its own exception-handling. That doesn't give the option of just adding a top-level error-handler (like my suggestion), which'd be more maintainable IMO.

@liferoad
Copy link
Collaborator

liferoad commented May 6, 2024

@liferoad while that's helpful as a pattern to look at, I don't think that solves the general issue of catching exceptions in transforms for 2 reasons:

  1. That error-wrapping only applies to the RunInference transform, specifically.
  2. The error wrapping was possible for RunInference since most of its work was just calling a single DoFn, so it could use the existing DoFn.with_exception_handling() method. For transforms that call other transforms, this pattern isn't possible unless every single called transform implements this pattern (which'd require all those transforms to catch their errors this way, etc.) - which basically boils down to forcing every transform to implement its own exception-handling. That doesn't give the option of just adding a top-level error-handler (like my suggestion), which'd be more maintainable IMO.

I agree with what you said. I just want to list the current implementations to solve the error handling. And #29164 introduces withBadRecordHandler for Java to handle IO transforms.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants