Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): add support for query cost estimate #18694

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions superset/db_engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1025,12 +1025,15 @@ def select_star( # pylint: disable=too-many-arguments,too-many-locals
return sql

@classmethod
def estimate_statement_cost(cls, statement: str, cursor: Any) -> Dict[str, Any]:
def estimate_statement_cost(
cls, statement: str, cursor: Any, engine: Engine
) -> Dict[str, Any]:
"""
Generate a SQL query that estimates the cost of a given statement.

:param statement: A single SQL statement
:param cursor: Cursor instance
:param engine: Engine instance
:return: Dictionary with different costs
"""
raise Exception("Database does not support cost estimation")
Expand Down Expand Up @@ -1095,7 +1098,9 @@ def estimate_query_cost(
processed_statement = cls.process_statement(
statement, database, user_name
)
costs.append(cls.estimate_statement_cost(processed_statement, cursor))
costs.append(
cls.estimate_statement_cost(processed_statement, cursor, engine)
)
return costs

@classmethod
Expand Down Expand Up @@ -1425,6 +1430,33 @@ def cancel_query( # pylint: disable=unused-argument
def parse_sql(cls, sql: str) -> List[str]:
return [str(s).strip(" ;") for s in sqlparse.parse(sql)]

@classmethod
def _humanize(cls, value: Any, suffix: str, category: Optional[str] = None) -> str:
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there could be an input like (1000, "", "dollars") and an output like "$1,000", there would be more categories

try:
value = int(value)
except ValueError:
return str(value)
if category not in ["bytes", None]:
raise Exception(f"Unsupported value category: {category}")

to_next_prefix = 1000
prefixes = ["", "K", "M", "G", "T", "P", "E", "Z", "Y"]
suffixes = [p + suffix for p in prefixes]

if category == "bytes":
to_next_prefix = 1024
suffixes = ["B" if p == "" else p + "iB" for p in prefixes]

suffix = suffixes.pop(0)
while value >= to_next_prefix and suffixes:
suffix = suffixes.pop(0)
value //= to_next_prefix

if not suffix.startswith(" "):
suffix = " " + suffix

return "{}{}".format(value, suffix).strip()


# schema for adding a database by providing parameters instead of the
# full SQLAlchemy URI
Expand Down
54 changes: 44 additions & 10 deletions superset/db_engine_specs/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,47 @@ class BigQueryEngineSpec(BaseEngineSpec):
),
}

@classmethod
def get_allow_cost_estimate(cls, extra: Dict[str, Any]) -> bool:
return True

@classmethod
def estimate_statement_cost(
cls, statement: str, cursor: Any, engine: Engine
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only way to estimate the cost in advance in BigQuery is to run the query with dry_run, and since this is not possible with only cursor, I add engine as an argument.

Another way to handle bigquery.Client directly is to configure sqlalchemy to pass the dryrun parameter when creating the connection, but this seems to be more complicated...

https://github.com/googleapis/python-bigquery-sqlalchemy#connection-string-parameters

) -> Dict[str, Any]:
# pylint: disable=import-outside-toplevel
from google.cloud import bigquery
from google.oauth2 import service_account

creds = engine.dialect.credentials_info
credentials = service_account.Credentials.from_service_account_info(creds)
client = bigquery.Client(credentials=credentials)
dry_run_result = client.query(
statement, bigquery.job.QueryJobConfig(dry_run=True)
)

return {
"Total bytes processed": dry_run_result.total_bytes_processed,
}

@classmethod
def query_cost_formatter(
cls, raw_cost: List[Dict[str, Any]]
) -> List[Dict[str, str]]:
cost = []
columns = [
("Total bytes processed", "", "bytes"),
]

for row in raw_cost:
statement_cost = {}
for key, suffix, category in columns:
if key in row:
statement_cost[key] = cls._humanize(row[key], suffix, category)
cost.append(statement_cost)

return cost

@classmethod
def convert_dttm(
cls, target_type: str, dttm: datetime, db_extra: Optional[Dict[str, Any]] = None
Expand Down Expand Up @@ -316,16 +357,9 @@ def df_to_sql(
:param to_sql_kwargs: The kwargs to be passed to pandas.DataFrame.to_sql` method
"""

try:
# pylint: disable=import-outside-toplevel
import pandas_gbq
from google.oauth2 import service_account
except ImportError as ex:
raise Exception(
"Could not import libraries `pandas_gbq` or `google.oauth2`, which are "
"required to be installed in your environment in order "
"to upload data to BigQuery"
) from ex
# pylint: disable=import-outside-toplevel
import pandas_gbq
from google.oauth2 import service_account

if not table.schema:
raise Exception("The table schema must be defined")
Expand Down
5 changes: 4 additions & 1 deletion superset/db_engine_specs/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from flask_babel import gettext as __
from sqlalchemy.dialects.postgresql import ARRAY, DOUBLE_PRECISION, ENUM, JSON
from sqlalchemy.dialects.postgresql.base import PGInspector
from sqlalchemy.engine.base import Engine
from sqlalchemy.types import String

from superset.db_engine_specs.base import (
Expand Down Expand Up @@ -197,7 +198,9 @@ def get_allow_cost_estimate(cls, extra: Dict[str, Any]) -> bool:
return True

@classmethod
def estimate_statement_cost(cls, statement: str, cursor: Any) -> Dict[str, Any]:
def estimate_statement_cost(
cls, statement: str, cursor: Any, engine: Engine
) -> Dict[str, Any]:
sql = f"EXPLAIN {statement}"
cursor.execute(sql)

Expand Down
35 changes: 12 additions & 23 deletions superset/db_engine_specs/presto.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,9 @@ def select_star( # pylint: disable=too-many-arguments
)

@classmethod
def estimate_statement_cost(cls, statement: str, cursor: Any) -> Dict[str, Any]:
def estimate_statement_cost(
cls, statement: str, cursor: Any, engine: Engine
) -> Dict[str, Any]:
"""
Run a SQL query that estimates the cost of a given statement.

Expand Down Expand Up @@ -675,35 +677,22 @@ def query_cost_formatter(
:return: Human readable cost estimate
"""

def humanize(value: Any, suffix: str) -> str:
try:
value = int(value)
except ValueError:
return str(value)

prefixes = ["K", "M", "G", "T", "P", "E", "Z", "Y"]
prefix = ""
to_next_prefix = 1000
while value > to_next_prefix and prefixes:
prefix = prefixes.pop(0)
value //= to_next_prefix

return f"{value} {prefix}{suffix}"

cost = []
columns = [
("outputRowCount", "Output count", " rows"),
("outputSizeInBytes", "Output size", "B"),
("cpuCost", "CPU cost", ""),
("maxMemory", "Max memory", "B"),
("networkCost", "Network cost", ""),
("outputRowCount", "Output count", " rows", None),
("outputSizeInBytes", "Output size", "", "bytes"),
("cpuCost", "CPU cost", "", None),
("maxMemory", "Max memory", "", "bytes"),
("networkCost", "Network cost", "", None),
]
for row in raw_cost:
estimate: Dict[str, float] = row.get("estimate", {})
statement_cost = {}
for key, label, suffix in columns:
for key, label, suffix, category in columns:
if key in estimate:
statement_cost[label] = humanize(estimate[key], suffix).strip()
statement_cost[label] = cls._humanize(
estimate[key], suffix, category
).strip()
cost.append(statement_cost)

return cost
Expand Down
36 changes: 13 additions & 23 deletions superset/db_engine_specs/trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import simplejson as json
from flask import current_app
from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.url import make_url, URL

from superset.db_engine_specs.base import BaseEngineSpec
Expand Down Expand Up @@ -118,7 +119,9 @@ def get_allow_cost_estimate(cls, extra: Dict[str, Any]) -> bool:
return True

@classmethod
def estimate_statement_cost(cls, statement: str, cursor: Any) -> Dict[str, Any]:
def estimate_statement_cost(
cls, statement: str, cursor: Any, engine: Engine
) -> Dict[str, Any]:
"""
Run a SQL query that estimates the cost of a given statement.

Expand Down Expand Up @@ -156,35 +159,22 @@ def query_cost_formatter(
:return: Human readable cost estimate
"""

def humanize(value: Any, suffix: str) -> str:
try:
value = int(value)
except ValueError:
return str(value)

prefixes = ["K", "M", "G", "T", "P", "E", "Z", "Y"]
prefix = ""
to_next_prefix = 1000
while value > to_next_prefix and prefixes:
prefix = prefixes.pop(0)
value //= to_next_prefix

return f"{value} {prefix}{suffix}"

cost = []
columns = [
("outputRowCount", "Output count", " rows"),
("outputSizeInBytes", "Output size", "B"),
("cpuCost", "CPU cost", ""),
("maxMemory", "Max memory", "B"),
("networkCost", "Network cost", ""),
("outputRowCount", "Output count", " rows", None),
("outputSizeInBytes", "Output size", "", "bytes"),
("cpuCost", "CPU cost", "", None),
("maxMemory", "Max memory", "", "bytes"),
("networkCost", "Network cost", "", None),
]
for row in raw_cost:
estimate: Dict[str, float] = row.get("estimate", {})
statement_cost = {}
for key, label, suffix in columns:
for key, label, suffix, category in columns:
if key in estimate:
statement_cost[label] = humanize(estimate[key], suffix).strip()
statement_cost[label] = cls._humanize(
estimate[key], suffix, category
).strip()
cost.append(statement_cost)

return cost
Expand Down
71 changes: 71 additions & 0 deletions tests/integration_tests/db_engine_specs/bigquery_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,74 @@ def test_calculated_column_in_order_by(self):
}
sql = table.get_query_str(query_obj)
assert "ORDER BY gender_cc ASC" in sql

@mock.patch("google.cloud.bigquery.Client")
@mock.patch(
"google.oauth2.service_account.Credentials.from_service_account_info",
mock.Mock(),
)
def test_estimate_statement_cost_select_star(self, mocked_client_class):
mocked_client = mocked_client_class.return_value
mocked_client.query.return_value = mock.Mock()
mocked_client.query.return_value.total_bytes_processed = 123
cursor = mock.Mock()
engine = mock.Mock()
sql = "SELECT * FROM `some-project.database.table`"
results = BigQueryEngineSpec.estimate_statement_cost(sql, cursor, engine)
mocked_client.query.assert_called_once()
args = mocked_client.query.call_args.args
self.assertEqual(args[0], sql)
self.assertEqual(args[1].dry_run, True)
self.assertEqual(
results, {"Total bytes processed": 123},
)

@mock.patch("google.cloud.bigquery.Client")
@mock.patch(
"google.oauth2.service_account.Credentials.from_service_account_info",
mock.Mock(),
)
def test_estimate_statement_invalid_syntax(self, mocked_client_class):
from google.api_core.exceptions import BadRequest

cursor = mock.Mock()
mocked_client = mocked_client_class.return_value
mocked_client.query.side_effect = BadRequest(
"""
POST https://bigquery.googleapis.com/bigquery/v2/projects/xxx/jobs?
prettyPrint=false: Table name "birth_names" missing dataset while no def
ault dataset is set in the request.

(job ID: xxx)

-----Query Job SQL Follows-----

| . | . |
1:DROP TABLE birth_names
| . | . |
"""
)
engine = mock.Mock()
sql = "DROP TABLE birth_names"
with self.assertRaises(BadRequest):
BigQueryEngineSpec.estimate_statement_cost(sql, cursor, engine)

def test_query_cost_formatter_example_costs(self):
raw_cost = [
{"Total bytes processed": 123},
{"Total bytes processed": 1024},
{"Total bytes processed": 1024 ** 2 + 1024 * 512,},
{"Total bytes processed": 1024 ** 3 * 100,},
{"Total bytes processed": 1024 ** 4 * 1000,},
]
result = BigQueryEngineSpec.query_cost_formatter(raw_cost)
self.assertEqual(
result,
[
{"Total bytes processed": "123 B"},
{"Total bytes processed": "1 KiB"},
{"Total bytes processed": "1 MiB",},
{"Total bytes processed": "100 GiB",},
{"Total bytes processed": "1000 TiB",},
],
)
6 changes: 4 additions & 2 deletions tests/integration_tests/db_engine_specs/postgres_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,9 @@ def test_estimate_statement_cost_select_star(self):
cursor.fetchone.return_value = (
"Seq Scan on birth_names (cost=0.00..1537.91 rows=75691 width=46)",
)
engine = mock.Mock()
sql = "SELECT * FROM birth_names"
results = PostgresEngineSpec.estimate_statement_cost(sql, cursor)
results = PostgresEngineSpec.estimate_statement_cost(sql, cursor, engine)
self.assertEqual(
results, {"Start-up cost": 0.00, "Total cost": 1537.91,},
)
Expand All @@ -196,9 +197,10 @@ def test_estimate_statement_invalid_syntax(self):
^
"""
)
engine = mock.Mock()
sql = "DROP TABLE birth_names"
with self.assertRaises(errors.SyntaxError):
PostgresEngineSpec.estimate_statement_cost(sql, cursor)
PostgresEngineSpec.estimate_statement_cost(sql, cursor, engine)

def test_query_cost_formatter_example_costs(self):
"""
Expand Down
8 changes: 5 additions & 3 deletions tests/integration_tests/db_engine_specs/presto_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ def test_query_cost_formatter(self):
expected = [
{
"Output count": "904 M rows",
"Output size": "354 GB",
"Output size": "329 GiB",
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two values are identical except for the units. I took care not to change any of the other outputs.

"CPU cost": "354 G",
"Max memory": "0 B",
"Network cost": "354 G",
Expand Down Expand Up @@ -795,17 +795,19 @@ def test_estimate_statement_cost(self):
mock_cursor.fetchone.return_value = [
'{"a": "b"}',
]
mock_engine = mock.Mock()
result = PrestoEngineSpec.estimate_statement_cost(
"SELECT * FROM brth_names", mock_cursor
"SELECT * FROM brth_names", mock_cursor, mock_engine
)
assert result == estimate_json

def test_estimate_statement_cost_invalid_syntax(self):
mock_cursor = mock.MagicMock()
mock_cursor.execute.side_effect = Exception()
mock_engine = mock.Mock()
with self.assertRaises(Exception):
PrestoEngineSpec.estimate_statement_cost(
"DROP TABLE brth_names", mock_cursor
"DROP TABLE brth_names", mock_cursor, mock_engine
)

def test_get_all_datasource_names(self):
Expand Down
Loading