diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py index cf70eb95762c4..17047457e0eba 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py @@ -773,6 +773,7 @@ class BQParser: project: str target_platform: str sanitizeTopics: str + transforms: list topicsToTables: Optional[str] = None datasets: Optional[str] = None defaultDataset: Optional[str] = None @@ -788,6 +789,20 @@ def get_parser( ) -> BQParser: project = connector_manifest.config["project"] sanitizeTopics = connector_manifest.config.get("sanitizeTopics", "false") + transform_names = ( + self.connector_manifest.config.get("transforms", "").split(",") + if self.connector_manifest.config.get("transforms") + else [] + ) + transforms = [] + for name in transform_names: + transform = {"name": name} + transforms.append(transform) + for key in self.connector_manifest.config.keys(): + if key.startswith(f"transforms.{name}."): + transform[ + key.replace(f"transforms.{name}.", "") + ] = self.connector_manifest.config[key] if "defaultDataset" in connector_manifest.config: defaultDataset = connector_manifest.config["defaultDataset"] @@ -797,6 +812,7 @@ def get_parser( target_platform="bigquery", sanitizeTopics=sanitizeTopics.lower() == "true", version="v2", + transforms=transforms, ) else: # version 1.6.x and similar configs supported @@ -809,6 +825,7 @@ def get_parser( datasets=datasets, target_platform="bigquery", sanitizeTopics=sanitizeTopics.lower() == "true", + transforms=transforms, ) def get_list(self, property: str) -> Iterable[Tuple[str, str]]: @@ -867,6 +884,18 @@ def get_dataset_table_for_topic( table = self.sanitize_table_name(table) return f"{dataset}.{table}" + def apply_transformations( + self, topic: str, transforms: List[Dict[str, str]] + ) -> str: + for transform in transforms: + if transform["type"] == "org.apache.kafka.connect.transforms.RegexRouter": + regex = transform["regex"] + replacement = transform["replacement"] + pattern = re.compile(regex) + if pattern.match(topic): + topic = pattern.sub(replacement, topic, count=1) + return topic + def _extract_lineages(self): lineages: List[KafkaConnectLineage] = list() parser = self.get_parser(self.connector_manifest) @@ -874,26 +903,26 @@ def _extract_lineages(self): return lineages target_platform = parser.target_platform project = parser.project - + transforms = parser.transforms self.connector_manifest.flow_property_bag = self.connector_manifest.config - # Mask/Remove properties that may reveal credentials if "keyfile" in self.connector_manifest.flow_property_bag: del self.connector_manifest.flow_property_bag["keyfile"] for topic in self.connector_manifest.topic_names: - dataset_table = self.get_dataset_table_for_topic(topic, parser) + transformed_topic = self.apply_transformations(topic, transforms) + dataset_table = self.get_dataset_table_for_topic(transformed_topic, parser) if dataset_table is None: self.report_warning( self.connector_manifest.name, - f"could not find target dataset for topic {topic}, please check your connector configuration", + f"could not find target dataset for topic {transformed_topic}, please check your connector configuration", ) continue target_dataset = f"{project}.{dataset_table}" lineages.append( KafkaConnectLineage( - source_dataset=topic, + source_dataset=transformed_topic, source_platform=KAFKA, target_dataset=target_dataset, target_platform=target_platform, diff --git a/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml b/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml index 260887ff3d1a0..bcd8c25a1d5a2 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml +++ b/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml @@ -32,7 +32,7 @@ services: # confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0 # - #confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.6.8 + confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.6.8 # confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.10.1 # diff --git a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_bigquery_sink_mces_golden.json b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_bigquery_sink_mces_golden.json new file mode 100644 index 0000000000000..086dfbdd67b1c --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_bigquery_sink_mces_golden.json @@ -0,0 +1,48 @@ +[ +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(kafka-connect,connect-instance-1.bigquery-sink-connector,PROD)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": { + "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", + "autoCreateTables": "true", + "transforms.TableNameTransformation.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.TableNameTransformation.replacement": "my_dest_table_name", + "topics": "kafka-topic-name", + "transforms.TableNameTransformation.regex": ".*", + "transforms": "TableNameTransformation", + "name": "bigquery-sink-connector", + "project": "my-gcp-project", + "datasets": "kafka-topic-name=mybqdataset", + "defaultDataset": "mybqdataset" + }, + "name": "bigquery-sink-connector", + "description": "Sink connector using `com.wepay.kafka.connect.bigquery.BigQuerySinkConnector` plugin." + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(kafka-connect,connect-instance-1.bigquery-sink-connector,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1635166800000, + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_bigquery_sink_to_file.yml b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_bigquery_sink_to_file.yml new file mode 100644 index 0000000000000..0df332acb32de --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_bigquery_sink_to_file.yml @@ -0,0 +1,15 @@ +--- +run_id: kafka-connect-run + +source: + type: kafka-connect + config: + platform_instance: connect-instance-1 + connect_uri: http://localhost:28083 + connector_patterns: + allow: + - bigquery-sink-connector +sink: + type: file + config: + filename: "./kafka_connect_mces.json" diff --git a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_to_file.yml b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_to_file.yml index 4946cae8c4859..4adb57fba6814 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_to_file.yml +++ b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_to_file.yml @@ -10,6 +10,7 @@ source: deny: - source_mongodb_connector - confluent_s3_sink_connector + - bigquery-sink-connector provided_configs: - provider: env path_key: MYSQL_PORT diff --git a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py index 26f3d50c1167b..d0f4fc35fc03e 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py +++ b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py @@ -333,6 +333,30 @@ def loaded_kafka_connect(kafka_connect_runner): r.raise_for_status() assert r.status_code == 201 + # Creating BigQuery sink connector + r = requests.post( + KAFKA_CONNECT_ENDPOINT, + headers={"Content-Type": "application/json"}, + data="""{ + "name": "bigquery-sink-connector", + "config": { + "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", + "autoCreateTables": "true", + "transforms.TableNameTransformation.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.TableNameTransformation.replacement": "my_dest_table_name", + "topics": "kafka-topic-name", + "transforms.TableNameTransformation.regex": ".*", + "transforms": "TableNameTransformation", + "name": "bigquery-sink-connector", + "project": "my-gcp-project", + "defaultDataset": "mybqdataset", + "datasets": "kafka-topic-name=mybqdataset" + } + } + """, + ) + assert r.status_code == 201 # Created + # Give time for connectors to process the table data kafka_connect_runner.wait_until_responsive( timeout=30, @@ -637,3 +661,22 @@ def test_kafka_connect_snowflake_sink_ingest( output_path=tmp_path / "kafka_connect_snowflake_sink_mces.json", golden_path=f"{test_resources_dir}/{golden_file}", ) + + +@freeze_time(FROZEN_TIME) +def test_kafka_connect_bigquery_sink_ingest( + loaded_kafka_connect, pytestconfig, tmp_path, test_resources_dir +): + # Run the metadata ingestion pipeline. + config_file = ( + test_resources_dir / "kafka_connect_bigquery_sink_to_file.yml" + ).resolve() + run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path) + + # Verify the output. + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / "kafka_connect_mces.json", + golden_path=test_resources_dir / "kafka_connect_bigquery_sink_mces_golden.json", + ignore_paths=[], + )