From f4d3c1a6b557ea692d281f1e27a55103423b7038 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 12 Oct 2023 22:22:26 -0700 Subject: [PATCH] fix(ingest): better handling around sink errors Slack ref: https://datahubspace.slack.com/archives/CUMUWQU66/p1696317869262289?thread_ts=1695824642.165599&cid=CUMUWQU66 --- .../src/datahub/ingestion/run/pipeline.py | 10 +++++- .../datahub/ingestion/sink/datahub_kafka.py | 33 ++++++++----------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 07b55e0e25a89..f2735c24ca19d 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -390,7 +390,15 @@ def run(self) -> None: record_envelopes = self.extractor.get_records(wu) for record_envelope in self.transform(record_envelopes): if not self.dry_run: - self.sink.write_record_async(record_envelope, callback) + try: + self.sink.write_record_async( + record_envelope, callback + ) + except Exception as e: + # In case the sink's error handling is bad, we still want to report the error. + self.sink.report.report_failure( + f"Failed to write record: {e}" + ) except RuntimeError: raise diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_kafka.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_kafka.py index 39054c256a7fd..38ddadaafc862 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_kafka.py @@ -9,7 +9,6 @@ MetadataChangeEvent, MetadataChangeProposal, ) -from datahub.metadata.schema_classes import MetadataChangeProposalClass class KafkaSinkConfig(KafkaEmitterConfig): @@ -58,27 +57,21 @@ def write_record_async( ], write_callback: WriteCallback, ) -> None: - record = record_envelope.record - if isinstance(record, MetadataChangeEvent): - self.emitter.emit_mce_async( + callback = _KafkaCallback( + self.report, record_envelope, write_callback + ).kafka_callback + try: + record = record_envelope.record + self.emitter.emit( record, - callback=_KafkaCallback( - self.report, record_envelope, write_callback - ).kafka_callback, - ) - elif isinstance( - record, (MetadataChangeProposalWrapper, MetadataChangeProposalClass) - ): - self.emitter.emit_mcp_async( - record, - callback=_KafkaCallback( - self.report, record_envelope, write_callback - ).kafka_callback, - ) - else: - raise ValueError( - f"The datahub-kafka sink only supports MetadataChangeEvent/MetadataChangeProposal[Wrapper] classes, not {type(record)}" + callback=callback, ) + except Exception as err: + # In case we throw an exception while trying to emit the record, + # catch it and report the failure. This might happen if the schema + # registry is down or otherwise misconfigured, in which case we'd + # fail when serializing the record. + callback(err, f"Failed to write record: {err}") def close(self) -> None: self.emitter.flush()