Skip to content

Commit

Permalink
fix(ingest/pipeline): catch pipeline exceptions (datahub-project#10753)
Browse files Browse the repository at this point in the history
Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
  • Loading branch information
2 people authored and aviv-julienjehannet committed Jul 17, 2024
1 parent af89594 commit 4fc06ff
Showing 1 changed file with 48 additions and 21 deletions.
69 changes: 48 additions & 21 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import enum
import itertools
import logging
import os
Expand Down Expand Up @@ -117,6 +118,13 @@ class PipelineInitError(Exception):
pass


class PipelineStatus(enum.Enum):
UNKNOWN = enum.auto()
COMPLETED = enum.auto()
PIPELINE_ERROR = enum.auto()
CANCELLED = enum.auto()


@contextlib.contextmanager
def _add_init_error_context(step: str) -> Iterator[None]:
"""Enriches any exceptions raised with information about the step that failed."""
Expand Down Expand Up @@ -366,11 +374,11 @@ def _notify_reporters_on_ingestion_completion(self) -> None:
try:
reporter.on_completion(
status="CANCELLED"
if self.final_status == "cancelled"
if self.final_status == PipelineStatus.CANCELLED
else "FAILURE"
if self.has_failures()
else "SUCCESS"
if self.final_status == "completed"
if self.final_status == PipelineStatus.COMPLETED
else "UNKNOWN",
report=self._get_structured_report(),
ctx=self.ctx,
Expand Down Expand Up @@ -422,7 +430,7 @@ def run(self) -> None:
)
)

self.final_status = "unknown"
self.final_status = PipelineStatus.UNKNOWN
self._notify_reporters_on_ingestion_start()
callback = None
try:
Expand Down Expand Up @@ -459,9 +467,7 @@ def run(self) -> None:
f"Failed to write record: {e}"
)

except RuntimeError:
raise
except SystemExit:
except (RuntimeError, SystemExit):
raise
except Exception as e:
logger.error(
Expand Down Expand Up @@ -490,11 +496,22 @@ def run(self) -> None:
self.sink.write_record_async(record_envelope, callback)

self.process_commits()
self.final_status = "completed"
except (SystemExit, RuntimeError, KeyboardInterrupt) as e:
self.final_status = "cancelled"
self.final_status = PipelineStatus.COMPLETED
except (SystemExit, KeyboardInterrupt) as e:
self.final_status = PipelineStatus.CANCELLED
logger.error("Caught error", exc_info=e)
raise
except Exception as exc:
self.final_status = PipelineStatus.PIPELINE_ERROR
logger.exception("Ingestion pipeline threw an uncaught exception")

# HACK: We'll report this as a source error, since we don't have a great place to put it.
# It theoretically could've come from any part of the pipeline, but usually it's from the source.
# This ensures that it is included in the report, and that the run is marked as failed.
self.source.get_report().report_failure(
"pipeline_error",
f"Ingestion pipeline threw an uncaught exception: {exc}",
)
finally:
clear_global_warnings()

Expand Down Expand Up @@ -629,20 +646,30 @@ def has_failures(self) -> bool:
def pretty_print_summary(
self, warnings_as_failure: bool = False, currently_running: bool = False
) -> int:
click.echo()
click.secho("Cli report:", bold=True)
click.secho(self.cli_report.as_string())
click.secho(f"Source ({self.source_type}) report:", bold=True)
click.echo(self.source.get_report().as_string())
click.secho(f"Sink ({self.sink_type}) report:", bold=True)
click.echo(self.sink.get_report().as_string())
global_warnings = get_global_warnings()
if len(global_warnings) > 0:
click.secho("Global Warnings:", bold=True)
click.echo(global_warnings)
click.echo()
workunits_produced = self.sink.get_report().total_records_written

if (
not workunits_produced
and not currently_running
and self.final_status == PipelineStatus.PIPELINE_ERROR
):
# If the pipeline threw an uncaught exception before doing anything, printing
# out the report would just be annoying.
pass
else:
click.echo()
click.secho("Cli report:", bold=True)
click.secho(self.cli_report.as_string())
click.secho(f"Source ({self.source_type}) report:", bold=True)
click.echo(self.source.get_report().as_string())
click.secho(f"Sink ({self.sink_type}) report:", bold=True)
click.echo(self.sink.get_report().as_string())
global_warnings = get_global_warnings()
if len(global_warnings) > 0:
click.secho("Global Warnings:", bold=True)
click.echo(global_warnings)
click.echo()

duration_message = f"in {humanfriendly.format_timespan(self.source.get_report().running_time)}."
if currently_running:
message_template = f"⏳ Pipeline running {{status}} so far; produced {workunits_produced} events {duration_message}"
Expand Down

0 comments on commit 4fc06ff

Please sign in to comment.