-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest): working with multiple bigquery projects #5240
Changes from all commits
ea59541
6f86382
eac8544
fcfa527
717c861
2894648
97a8f31
e0b3c12
fe3ff7c
70bbad0
0904c80
290c33f
90167a4
df68f9e
12f9b81
b8fe629
529f403
c546df0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -111,7 +111,7 @@ class GEProfilingConfig(ConfigModel): | |
partition_profiling_enabled: bool = Field(default=True, description="") | ||
bigquery_temp_table_schema: Optional[str] = Field( | ||
default=None, | ||
description="On bigquery for profiling partitioned tables needs to create temporary views. You have to define a schema where these will be created. Views will be cleaned up after profiler runs. (Great expectation tech details about this (https://legacy.docs.greatexpectations.io/en/0.9.0/reference/integrations/bigquery.html#custom-queries-with-sql-datasource).", | ||
description="On bigquery for profiling partitioned tables needs to create temporary views. You have to define a dataset where these will be created. Views will be cleaned up after profiler runs. (Great expectation tech details about this (https://legacy.docs.greatexpectations.io/en/0.9.0/reference/integrations/bigquery.html#custom-queries-with-sql-datasource).", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice |
||
) | ||
partition_datetime: Optional[datetime.datetime] = Field( | ||
default=None, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
from google.cloud.logging_v2.client import Client as GCPLoggingClient | ||
from ratelimiter import RateLimiter | ||
from sqlalchemy import create_engine, inspect | ||
from sqlalchemy.engine import Engine | ||
from sqlalchemy.engine.reflection import Inspector | ||
|
||
from datahub.emitter import mce_builder | ||
|
@@ -36,6 +37,7 @@ | |
support_status, | ||
) | ||
from datahub.ingestion.api.workunit import MetadataWorkUnit | ||
from datahub.ingestion.source.ge_data_profiler import DatahubGEProfiler | ||
from datahub.ingestion.source.sql.sql_common import ( | ||
SQLAlchemyConfig, | ||
SQLAlchemySource, | ||
|
@@ -332,8 +334,17 @@ def __init__(self, config, ctx): | |
self.partition_info: Dict[str, str] = dict() | ||
atexit.register(cleanup, config) | ||
|
||
def get_db_name(self, inspector: Inspector = None) -> str: | ||
if self.config.project_id: | ||
def get_db_name( | ||
self, inspector: Inspector = None, for_sql_queries: bool = True | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe some comment would be nice about the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added |
||
) -> str: | ||
""" | ||
for_sql_queries - Used mainly for multi-project setups with different permissions | ||
- should be set to True if this is to be used to run sql queries | ||
- should be set to False if this is to inspect contents and not run sql queries | ||
""" | ||
if for_sql_queries and self.config.storage_project_id: | ||
return self.config.storage_project_id | ||
elif self.config.project_id: | ||
return self.config.project_id | ||
else: | ||
return self._get_project_id(inspector) | ||
|
@@ -342,13 +353,10 @@ def _compute_big_query_lineage(self) -> None: | |
if not self.config.include_table_lineage: | ||
return | ||
|
||
lineage_client_project_id = self._get_lineage_client_project_id() | ||
if self.config.use_exported_bigquery_audit_metadata: | ||
self._compute_bigquery_lineage_via_exported_bigquery_audit_metadata( | ||
lineage_client_project_id | ||
) | ||
self._compute_bigquery_lineage_via_exported_bigquery_audit_metadata() | ||
else: | ||
self._compute_bigquery_lineage_via_gcp_logging(lineage_client_project_id) | ||
self._compute_bigquery_lineage_via_gcp_logging() | ||
|
||
if self.lineage_metadata is None: | ||
self.lineage_metadata = {} | ||
|
@@ -359,14 +367,11 @@ def _compute_big_query_lineage(self) -> None: | |
) | ||
logger.debug(f"lineage metadata is {self.lineage_metadata}") | ||
|
||
def _compute_bigquery_lineage_via_gcp_logging( | ||
self, lineage_client_project_id: Optional[str] | ||
) -> None: | ||
def _compute_bigquery_lineage_via_gcp_logging(self) -> None: | ||
project_id = self.get_db_name() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe rename |
||
logger.info("Populating lineage info via GCP audit logs") | ||
try: | ||
_clients: List[GCPLoggingClient] = self._make_bigquery_client( | ||
lineage_client_project_id | ||
) | ||
_clients: List[GCPLoggingClient] = self._make_gcp_logging_client(project_id) | ||
template: str = BQ_FILTER_RULE_TEMPLATE | ||
|
||
if self.config.use_v2_audit_metadata: | ||
|
@@ -386,12 +391,11 @@ def _compute_bigquery_lineage_via_gcp_logging( | |
f"Error was {e}", | ||
) | ||
|
||
def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata( | ||
self, lineage_client_project_id: Optional[str] | ||
) -> None: | ||
def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata(self) -> None: | ||
project_id = self.get_db_name(for_sql_queries=True) | ||
logger.info("Populating lineage info via exported GCP audit logs") | ||
try: | ||
_client: BigQueryClient = BigQueryClient(project=lineage_client_project_id) | ||
_client: BigQueryClient = BigQueryClient(project=project_id) | ||
exported_bigquery_audit_metadata: Iterable[ | ||
BigQueryAuditMetadata | ||
] = self._get_exported_bigquery_audit_metadata(_client) | ||
|
@@ -408,28 +412,18 @@ def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata( | |
f"Error: {e}", | ||
) | ||
|
||
def _make_bigquery_client( | ||
self, lineage_client_project_id: Optional[str] | ||
def _make_gcp_logging_client( | ||
self, project_id: Optional[str] | ||
) -> List[GCPLoggingClient]: | ||
# See https://github.com/googleapis/google-cloud-python/issues/2674 for | ||
# why we disable gRPC here. | ||
client_options = self.config.extra_client_options.copy() | ||
client_options["_use_grpc"] = False | ||
if lineage_client_project_id is not None: | ||
return [ | ||
GCPLoggingClient(**client_options, project=lineage_client_project_id) | ||
] | ||
if project_id is not None: | ||
return [GCPLoggingClient(**client_options, project=project_id)] | ||
else: | ||
return [GCPLoggingClient(**client_options)] | ||
|
||
def _get_lineage_client_project_id(self) -> Optional[str]: | ||
project_id: Optional[str] = ( | ||
self.config.lineage_client_project_id | ||
if self.config.lineage_client_project_id | ||
else self.config.project_id | ||
) | ||
return project_id | ||
|
||
def _get_bigquery_log_entries( | ||
self, | ||
clients: List[GCPLoggingClient], | ||
|
@@ -665,8 +659,7 @@ def is_table_partitioned( | |
if database: | ||
project_id = database | ||
else: | ||
url = self.config.get_sql_alchemy_url() | ||
engine = create_engine(url, **self.config.options) | ||
engine = self._get_engine(for_run_sql=False) | ||
with engine.connect() as con: | ||
inspector = inspect(con) | ||
project_id = self.get_db_name(inspector) | ||
|
@@ -675,8 +668,8 @@ def is_table_partitioned( | |
def get_latest_partition( | ||
self, schema: str, table: str | ||
) -> Optional[BigQueryPartitionColumn]: | ||
url = self.config.get_sql_alchemy_url() | ||
engine = create_engine(url, **self.config.options) | ||
logger.debug(f"get_latest_partition for {schema} and {table}") | ||
engine = self._get_engine(for_run_sql=True) | ||
with engine.connect() as con: | ||
inspector = inspect(con) | ||
project_id = self.get_db_name(inspector) | ||
|
@@ -685,7 +678,9 @@ def get_latest_partition( | |
): | ||
return None | ||
sql = BQ_GET_LATEST_PARTITION_TEMPLATE.format( | ||
project_id=project_id, schema=schema, table=table | ||
project_id=self.get_db_name(inspector, for_sql_queries=True), | ||
schema=schema, | ||
table=table, | ||
) | ||
result = con.execute(sql) | ||
# Bigquery only supports one partition column | ||
|
@@ -709,8 +704,7 @@ def is_latest_shard(self, project_id: str, schema: str, table: str) -> bool: | |
table_name, shard = self.get_shard_from_table(table) | ||
if shard: | ||
logger.debug(f"{table_name} is sharded and shard id is: {shard}") | ||
url = self.config.get_sql_alchemy_url() | ||
engine = create_engine(url, **self.config.options) | ||
engine = self._get_engine(for_run_sql=True) | ||
if f"{project_id}.{schema}.{table_name}" not in self.maximum_shard_ids: | ||
with engine.connect() as con: | ||
sql = BQ_GET_LATEST_SHARD.format( | ||
|
@@ -734,9 +728,13 @@ def is_latest_shard(self, project_id: str, schema: str, table: str) -> bool: | |
else: | ||
return True | ||
|
||
def _get_engine(self, for_run_sql: bool) -> Engine: | ||
url = self.config.get_sql_alchemy_url(for_run_sql=for_run_sql) | ||
logger.debug(f"sql_alchemy_url={url}") | ||
return create_engine(url, **self.config.options) | ||
|
||
def add_information_for_schema(self, inspector: Inspector, schema: str) -> None: | ||
url = self.config.get_sql_alchemy_url() | ||
engine = create_engine(url, **self.config.options) | ||
engine = self._get_engine(for_run_sql=True) | ||
project_id = self.get_db_name(inspector) | ||
with engine.connect() as con: | ||
inspector = inspect(con) | ||
|
@@ -821,6 +819,22 @@ def generate_partition_profiler_query( | |
return shard, None | ||
return None, None | ||
|
||
def get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler": | ||
logger.debug("Getting profiler instance from bigquery") | ||
engine = self._get_engine(for_run_sql=True) | ||
with engine.connect() as conn: | ||
inspector = inspect(conn) | ||
|
||
return DatahubGEProfiler( | ||
conn=inspector.bind, | ||
report=self.report, | ||
config=self.config.profiling, | ||
platform=self.platform, | ||
) | ||
|
||
def get_profile_args(self) -> Dict: | ||
return {"temp_table_db": self.config.project_id} | ||
|
||
def is_dataset_eligible_for_profiling( | ||
self, | ||
dataset_name: str, | ||
|
@@ -884,6 +898,7 @@ def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: | |
isinstance(wu, SqlWorkUnit) | ||
and isinstance(wu.metadata, MetadataChangeEvent) | ||
and isinstance(wu.metadata.proposedSnapshot, DatasetSnapshot) | ||
and self.config.include_table_lineage | ||
): | ||
lineage_mcp = self.get_lineage_mcp(wu.metadata.proposedSnapshot.urn) | ||
if lineage_mcp is not None: | ||
|
@@ -977,7 +992,7 @@ def prepare_profiler_args( | |
custom_sql: Optional[str] = None, | ||
) -> dict: | ||
return dict( | ||
schema=self.config.project_id, | ||
schema=self.get_db_name(for_sql_queries=True), | ||
table=f"{schema}.{table}", | ||
partition=partition, | ||
custom_sql=custom_sql, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it fine we just log on this error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noticed there were some cases where it wasn't three for some people in OSS. Don't want to break until we understand why.