Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/bigquery): Usage rate limiting and lineage exported log fix #7297

Merged
merged 2 commits into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
BQ_DATE_SHARD_FORMAT,
BQ_DATETIME_FORMAT,
_make_gcp_logging_client,
get_bigquery_client,
)
from datahub.metadata.schema_classes import (
DatasetLineageTypeClass,
Expand Down Expand Up @@ -178,7 +179,7 @@ def compute_bigquery_lineage_via_exported_bigquery_audit_metadata(
logger.info("Populating lineage info via exported GCP audit logs")
try:
# For exported logs we want to submit queries with the credentials project_id.
_client: BigQueryClient = BigQueryClient()
_client: BigQueryClient = get_bigquery_client(self.config)
exported_bigquery_audit_metadata: Iterable[
BigQueryAuditMetadata
] = self._get_exported_bigquery_audit_metadata(_client)
Expand Down Expand Up @@ -224,7 +225,7 @@ def compute_bigquery_lineage_via_catalog_lineage_api(

try:
lineage_client: lineage_v1.LineageClient = lineage_v1.LineageClient()
bigquery_client: BigQueryClient = BigQueryClient()
bigquery_client: BigQueryClient = get_bigquery_client(self.config)
# Filtering datasets
datasets = list(bigquery_client.list_datasets(project_id))
project_tables = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,18 +337,21 @@ def _get_bigquery_log_entries_via_gcp_logging(

try:
list_entries: Iterable[Union[AuditLogEntry, BigQueryAuditMetadata]]
rate_limiter: Optional[RateLimiter] = None
if self.config.rate_limit:
with RateLimiter(max_calls=self.config.requests_per_min, period=60):
list_entries = client.list_entries(
filter_=filter, page_size=self.config.log_page_size
)
else:
list_entries = client.list_entries(
filter_=filter,
page_size=self.config.log_page_size,
max_results=limit,
# client.list_entries is a generator, does api calls to GCP Logging when it runs out of entries and needs to fetch more from GCP Logging
# to properly ratelimit we multiply the page size by the number of requests per minute
rate_limiter = RateLimiter(
max_calls=self.config.requests_per_min * self.config.log_page_size,
period=60,
)

list_entries = client.list_entries(
filter_=filter,
page_size=self.config.log_page_size,
max_results=limit,
)

for i, entry in enumerate(list_entries):
if i == 0:
logger.info(
Expand All @@ -359,7 +362,12 @@ def _get_bigquery_log_entries_via_gcp_logging(
f"Loaded {i} log entries from GCP Log for {client.project}"
)
self.report.total_query_log_entries += 1
yield entry

if rate_limiter:
with rate_limiter:
yield entry
else:
yield entry

logger.info(
f"Finished loading {self.report.total_query_log_entries} log entries from GCP Logging for {client.project}"
Expand Down