Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(alerts): Update Snuba queries to match events-stats more closely #77755

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
40 changes: 22 additions & 18 deletions src/sentry/api/bases/organization_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,27 @@
from sentry.utils.snuba import MAX_FIELDS, SnubaTSResult


def get_query_columns(columns, rollup):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this to be reused by anomaly detection

"""
Backwards compatibility for incidents which uses the old
column aliases as it straddles both versions of events/discover.
We will need these aliases until discover2 flags are enabled for all users.
We need these rollup columns to generate correct events-stats results
"""
column_map = {
"user_count": "count_unique(user)",
"event_count": "count()",
"epm()": "epm(%d)" % rollup,
"eps()": "eps(%d)" % rollup,
"tpm()": "tpm(%d)" % rollup,
"tps()": "tps(%d)" % rollup,
"sps()": "sps(%d)" % rollup,
"spm()": "spm(%d)" % rollup,
}

return [column_map.get(column, column) for column in columns]


def resolve_axis_column(column: str, index: int = 0) -> str:
return get_function_alias(column) if not is_equation(column) else f"equation[{index}]"

Expand Down Expand Up @@ -438,30 +459,13 @@ def get_event_stats_data(
date_range = snuba_params.date_range
stats_period = parse_stats_period(get_interval_from_range(date_range, False))
rollup = int(stats_period.total_seconds()) if stats_period is not None else 3600

if comparison_delta is not None:
retention = quotas.get_event_retention(organization=organization)
comparison_start = snuba_params.start_date - comparison_delta
if retention and comparison_start < timezone.now() - timedelta(days=retention):
raise ValidationError("Comparison period is outside your retention window")

# Backwards compatibility for incidents which uses the old
# column aliases as it straddles both versions of events/discover.
# We will need these aliases until discover2 flags are enabled for all
# users.
# We need these rollup columns to generate correct events-stats results
column_map = {
"user_count": "count_unique(user)",
"event_count": "count()",
"epm()": "epm(%d)" % rollup,
"eps()": "eps(%d)" % rollup,
"tpm()": "tpm(%d)" % rollup,
"tps()": "tps(%d)" % rollup,
"sps()": "sps(%d)" % rollup,
"spm()": "spm(%d)" % rollup,
}

query_columns = [column_map.get(column, column) for column in columns]
query_columns = get_query_columns(columns, rollup)
with sentry_sdk.start_span(op="discover.endpoint", description="base.stats_query"):
result = get_event_stats(
query_columns, query, snuba_params, rollup, zerofill_results, comparison_delta
Expand Down
13 changes: 11 additions & 2 deletions src/sentry/seer/anomaly_detection/get_historical_anomalies.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from django.conf import settings
from urllib3.exceptions import MaxRetryError, TimeoutError

from sentry.api.bases.organization_events import get_query_columns
from sentry.conf.server import SEER_ANOMALY_DETECTION_ENDPOINT_URL
from sentry.incidents.models.alert_rule import AlertRule, AlertRuleStatus
from sentry.models.project import Project
Expand Down Expand Up @@ -58,8 +59,14 @@ def get_historical_anomaly_data_from_seer(
window_min = int(snuba_query.time_window / 60)
start = datetime.fromisoformat(start_string)
end = datetime.fromisoformat(end_string)
query_columns = get_query_columns([snuba_query.aggregate], snuba_query.time_window)
historical_data = fetch_historical_data(
alert_rule=alert_rule, snuba_query=snuba_query, project=project, start=start, end=end
alert_rule=alert_rule,
snuba_query=snuba_query,
query_columns=query_columns,
project=project,
start=start,
end=end,
)

if not historical_data:
Expand All @@ -74,7 +81,9 @@ def get_historical_anomaly_data_from_seer(
},
)
return None
formatted_data = format_historical_data(historical_data, dataset)
formatted_data = format_historical_data(
historical_data, query_columns, dataset, project.organization
)
if (
not alert_rule.sensitivity
or not alert_rule.seasonality
Expand Down
8 changes: 6 additions & 2 deletions src/sentry/seer/anomaly_detection/store_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from parsimonious.exceptions import ParseError
from urllib3.exceptions import MaxRetryError, TimeoutError

from sentry.api.bases.organization_events import get_query_columns
from sentry.conf.server import SEER_ANOMALY_DETECTION_STORE_DATA_URL
from sentry.incidents.models.alert_rule import AlertRule, AlertRuleStatus
from sentry.models.project import Project
Expand Down Expand Up @@ -63,12 +64,15 @@ def send_historical_data_to_seer(alert_rule: AlertRule, project: Project) -> Ale
snuba_query = SnubaQuery.objects.get(id=alert_rule.snuba_query_id)
window_min = int(snuba_query.time_window / 60)
dataset = get_dataset(snuba_query.dataset)
historical_data = fetch_historical_data(alert_rule, snuba_query, project)
query_columns = get_query_columns([snuba_query.aggregate], snuba_query.time_window)
historical_data = fetch_historical_data(alert_rule, snuba_query, query_columns, project)

if not historical_data:
raise ValidationError("No historical data available.")

formatted_data = format_historical_data(historical_data, dataset)
formatted_data = format_historical_data(
historical_data, query_columns, dataset, project.organization
)
if not formatted_data:
raise ValidationError("Unable to get historical data for this alert.")

Expand Down
121 changes: 83 additions & 38 deletions src/sentry/seer/anomaly_detection/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,23 @@
from django.utils.datastructures import MultiValueDict

from sentry import release_health
from sentry.api.bases.organization_events import resolve_axis_column
from sentry.api.serializers.snuba import SnubaTSResultSerializer
from sentry.incidents.models.alert_rule import AlertRule, AlertRuleThresholdType
from sentry.models.organization import Organization
from sentry.models.project import Project
from sentry.search.events.types import SnubaParams
from sentry.seer.anomaly_detection.types import TimeSeriesPoint
from sentry.snuba import metrics_performance
from sentry.snuba.models import SnubaQuery
from sentry.snuba.metrics.extraction import MetricSpecType
from sentry.snuba.models import SnubaQuery, SnubaQueryEventType
from sentry.snuba.referrer import Referrer
from sentry.snuba.sessions_v2 import QueryDefinition
from sentry.snuba.utils import get_dataset
from sentry.utils.snuba import SnubaTSResult

NUM_DAYS = 28


def translate_direction(direction: int) -> str:
"""
Expand All @@ -30,7 +35,36 @@ def translate_direction(direction: int) -> str:
return direction_map[AlertRuleThresholdType(direction)]


NUM_DAYS = 28
def get_snuba_query_string(snuba_query: SnubaQuery) -> str:
"""
Generate a query string that matches what events-stats does
"""
SNUBA_QUERY_EVENT_TYPE_TO_STRING = {
SnubaQueryEventType.EventType.ERROR: "error",
SnubaQueryEventType.EventType.DEFAULT: "default",
SnubaQueryEventType.EventType.TRANSACTION: "transaction",
}

if len(snuba_query.event_types) > 1:
# e.g. (is:unresolved) AND (event.type:[error, default])
event_types_list = [
SNUBA_QUERY_EVENT_TYPE_TO_STRING[event_type] for event_type in snuba_query.event_types
]
event_types_string = "(event.type:["
for event_type in event_types_list:
event_types_string += event_type + ","
event_types_string = event_types_string[:-1] # cut off the trailing comma
event_types_string += "])"
else:
# e.g. (is:unresolved) AND (event.type:error)
snuba_query_event_type_string = SNUBA_QUERY_EVENT_TYPE_TO_STRING[snuba_query.event_types[0]]
event_types_string = f"(event.type:{snuba_query_event_type_string})"
if snuba_query.query:
snuba_query_string = f"({snuba_query.query}) AND {event_types_string}"
else:
snuba_query_string = event_types_string

return snuba_query_string


def get_crash_free_historical_data(
Expand Down Expand Up @@ -75,23 +109,16 @@ def get_crash_free_historical_data(
)


def format_historical_data(data: SnubaTSResult, dataset: Any) -> list[TimeSeriesPoint]:
def format_historical_data(
data: SnubaTSResult, query_columns: list[str], dataset: Any, organization: Organization
) -> list[TimeSeriesPoint]:
"""
Format Snuba data into the format the Seer API expects.
For errors/transactions data:
If there are no results, it's just the timestamp
{'time': 1719012000}, {'time': 1719018000}, {'time': 1719024000}

If there are results, the aggregate is added
{'time': 1721300400, 'count': 2}

For metrics_performance dataset/sessions data:
The count is stored separately from the timestamps, if there is no data the count is 0
"""
formatted_data: list[TimeSeriesPoint] = []
nested_data = data.data.get("data", [])

if dataset == metrics_performance:
nested_data = data.data.get("data", [])
groups = nested_data.get("groups")
if not len(groups):
return formatted_data
Expand All @@ -102,28 +129,31 @@ def format_historical_data(data: SnubaTSResult, dataset: Any) -> list[TimeSeries
ts_point = TimeSeriesPoint(timestamp=date.timestamp(), value=count)
formatted_data.append(ts_point)
else:
# we don't know what the aggregation key of the query is
# so we should see it when we see a data point that has a value
agg_key = ""
for datum in nested_data:
if len(datum) == 1:
# this data point has no value
ts_point = TimeSeriesPoint(timestamp=datum.get("time"), value=0)
else:
# if we don't know the aggregation key yet, we should set it
if not agg_key:
for key in datum: # only two keys in this dict
if key != "time":
agg_key = key
break
ts_point = TimeSeriesPoint(timestamp=datum.get("time"), value=datum.get(agg_key, 0))
formatted_data.append(ts_point)
serializer = SnubaTSResultSerializer(organization=organization, lookup=None, user=None)
serialized_result = serializer.serialize(
data,
resolve_axis_column(query_columns[0]),
allow_partial_buckets=False,
zerofill_results=False,
extra_columns=None,
)

for data in serialized_result.get("data"):
if len(data) > 1:
count_data = data[1]
count = 0
if len(count_data):
count = count_data[0].get("count", 0)
ts_point = TimeSeriesPoint(timestamp=data[0], value=count)
formatted_data.append(ts_point)

return formatted_data


def fetch_historical_data(
alert_rule: AlertRule,
snuba_query: SnubaQuery,
query_columns: list[str],
project: Project,
start: datetime | None = None,
end: datetime | None = None,
Expand All @@ -143,37 +173,52 @@ def fetch_historical_data(
granularity = snuba_query.time_window

dataset_label = snuba_query.dataset

if dataset_label == "events":
# DATASET_OPTIONS expects the name 'errors'
dataset_label = "errors"
elif dataset_label == "generic_metrics":
# XXX: performance alerts in prod
dataset_label = "transactions"
elif dataset_label == "transactions":
# XXX: performance alerts locally
dataset_label = "discover"
dataset = get_dataset(dataset_label)

if not project or not dataset or not alert_rule.organization:
return None

environments = []
if snuba_query.environment:
environments = [snuba_query.environment]

snuba_params = SnubaParams(
organization=alert_rule.organization,
projects=[project],
start=start,
end=end,
stats_period=None,
environments=environments,
)

if dataset == metrics_performance:
return get_crash_free_historical_data(
start, end, project, alert_rule.organization, granularity
)

else:
snuba_query_string = get_snuba_query_string(snuba_query)
historical_data = dataset.timeseries_query(
selected_columns=[snuba_query.aggregate],
query=snuba_query.query,
snuba_params=SnubaParams(
organization=alert_rule.organization,
projects=[project],
start=start,
end=end,
),
selected_columns=query_columns,
query=snuba_query_string,
snuba_params=snuba_params,
rollup=granularity,
referrer=(
Referrer.ANOMALY_DETECTION_HISTORICAL_DATA_QUERY.value
if is_store_data_request
else Referrer.ANOMALY_DETECTION_RETURN_HISTORICAL_ANOMALIES.value
),
zerofill_results=True,
allow_metric_aggregates=True,
on_demand_metrics_type=MetricSpecType.SIMPLE_QUERY,
)
return historical_data
Loading
Loading