Skip to content

Commit

Permalink
refactor: optimize backend log payload (#11927)
Browse files Browse the repository at this point in the history
  • Loading branch information
ktmud authored Dec 16, 2020
1 parent 77cae64 commit 76f9f18
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 105 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ combine_as_imports = true
include_trailing_comma = true
line_length = 88
known_first_party = superset
known_third_party =alembic,apispec,backoff,bleach,cachelib,celery,click,colorama,contextlib2,cron_descriptor,croniter,cryptography,dateutil,flask,flask_appbuilder,flask_babel,flask_caching,flask_compress,flask_login,flask_migrate,flask_sqlalchemy,flask_talisman,flask_testing,flask_wtf,freezegun,geohash,geopy,humanize,isodate,jinja2,jwt,markdown,markupsafe,marshmallow,msgpack,numpy,pandas,parameterized,parsedatetime,pathlib2,pgsanity,pkg_resources,polyline,prison,pyarrow,pyhive,pytest,pytz,redis,retry,selenium,setuptools,simplejson,slack,sqlalchemy,sqlalchemy_utils,sqlparse,werkzeug,wtforms,wtforms_json,yaml
known_third_party =alembic,apispec,backoff,bleach,cachelib,celery,click,colorama,contextlib2,cron_descriptor,croniter,cryptography,dateutil,flask,flask_appbuilder,flask_babel,flask_caching,flask_compress,flask_login,flask_migrate,flask_sqlalchemy,flask_talisman,flask_testing,flask_wtf,freezegun,geohash,geopy,humanize,isodate,jinja2,jwt,markdown,markupsafe,marshmallow,msgpack,numpy,pandas,parameterized,parsedatetime,pathlib2,pgsanity,pkg_resources,polyline,prison,pyarrow,pyhive,pytest,pytz,redis,retry,selenium,setuptools,simplejson,slack,sqlalchemy,sqlalchemy_utils,sqlparse,typing_extensions,werkzeug,wtforms,wtforms_json,yaml
multi_line_output = 3
order_by_type = false

Expand Down
118 changes: 73 additions & 45 deletions superset/utils/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,39 @@
import time
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import Any, Callable, cast, Iterator, Optional, Type
from typing import Any, Callable, cast, Dict, Iterator, Optional, Type, Union

from flask import current_app, g, request
from flask_appbuilder.const import API_URI_RIS_KEY
from sqlalchemy.exc import SQLAlchemyError
from typing_extensions import Literal

from superset.stats_logger import BaseStatsLogger


def strip_int_from_path(path: Optional[str]) -> str:
"""Simple function to remove ints from '/' separated paths"""
if path:
return "/".join(["<int>" if s.isdigit() else s for s in path.split("/")])
return ""
def collect_request_payload() -> Dict[str, Any]:
"""Collect log payload identifiable from request context"""
payload: Dict[str, Any] = {
"path": request.path,
**request.form.to_dict(),
# url search params can overwrite POST body
**request.args.to_dict(),
}

# save URL match pattern in addition to the request path
url_rule = str(request.url_rule)
if url_rule != request.path:
payload["url_rule"] = url_rule

# remove rison raw string (q=xxx in search params) in favor of
# rison object (could come from `payload_override`)
if "rison" in payload and API_URI_RIS_KEY in payload:
del payload[API_URI_RIS_KEY]
# delete empty rison object
if "rison" in payload and not payload["rison"]:
del payload["rison"]

return payload


class AbstractEventLogger(ABC):
Expand All @@ -53,26 +73,37 @@ def log( # pylint: disable=too-many-arguments
pass

@contextmanager
def log_context(
self, action: str, ref: Optional[str] = None, log_to_statsd: bool = True,
def log_context( # pylint: disable=too-many-locals
self, action: str, object_ref: Optional[str] = None, log_to_statsd: bool = True,
) -> Iterator[Callable[..., None]]:
"""
Log an event while reading information from the request context.
`kwargs` will be appended directly to the log payload.
Log an event with additional information from the request context.
:param action: a name to identify the event
:param object_ref: reference to the Python object that triggered this action
:param log_to_statsd: whether to update statsd counter for the action
"""
from superset.views.core import get_form_data

start_time = time.time()
referrer = request.referrer[:1000] if request.referrer else None
user_id = g.user.get_id() if hasattr(g, "user") and g.user else None
payload = request.form.to_dict() or {}
# request parameters can overwrite post body
payload.update(request.args.to_dict())
payload_override = {}

# yield a helper to update additional kwargs
yield lambda **kwargs: payload.update(kwargs)
# yield a helper to add additional payload
yield lambda **kwargs: payload_override.update(kwargs)

dashboard_id = payload.get("dashboard_id")
payload = collect_request_payload()
if object_ref:
payload["object_ref"] = object_ref
# manual updates from context comes the last
payload.update(payload_override)

dashboard_id: Optional[int] = None
try:
dashboard_id = int(payload.get("dashboard_id")) # type: ignore
except (TypeError, ValueError):
dashboard_id = None

if "form_data" in payload:
form_data, _ = get_form_data()
Expand All @@ -89,15 +120,8 @@ def log_context(
if log_to_statsd:
self.stats_logger.incr(action)

payload.update(
{
"path": request.path,
"path_no_param": strip_int_from_path(request.path),
"ref": ref,
}
)
# bulk insert
try:
# bulk insert
explode_by = payload.get("explode")
records = json.loads(payload.get(explode_by)) # type: ignore
except Exception: # pylint: disable=broad-except
Expand All @@ -114,16 +138,30 @@ def log_context(
)

def _wrapper(
self, f: Callable[..., Any], **wrapper_kwargs: Any
self,
f: Callable[..., Any],
action: Optional[Union[str, Callable[..., str]]] = None,
object_ref: Optional[Union[str, Callable[..., str], Literal[False]]] = None,
allow_extra_payload: Optional[bool] = False,
**wrapper_kwargs: Any,
) -> Callable[..., Any]:
action_str = wrapper_kwargs.get("action") or f.__name__
ref = f.__qualname__ if hasattr(f, "__qualname__") else None

@functools.wraps(f)
def wrapper(*args: Any, **kwargs: Any) -> Any:
with self.log_context(action_str, ref, **wrapper_kwargs) as log:
value = f(*args, **kwargs)
action_str = (
action(*args, **kwargs) if callable(action) else action
) or f.__name__
object_ref_str = (
object_ref(*args, **kwargs) if callable(object_ref) else object_ref
) or (f.__qualname__ if object_ref is not False else None)
with self.log_context(
action=action_str, object_ref=object_ref_str, **wrapper_kwargs
) as log:
log(**kwargs)
if allow_extra_payload:
# add a payload updater to the decorated function
value = f(*args, add_extra_log_payload=log, **kwargs)
else:
value = f(*args, **kwargs)
return value

return wrapper
Expand All @@ -140,18 +178,9 @@ def func(f: Callable[..., Any]) -> Callable[..., Any]:

return func

def log_manually(self, f: Callable[..., Any]) -> Callable[..., Any]:
"""Allow a function to manually update"""

@functools.wraps(f)
def wrapper(*args: Any, **kwargs: Any) -> Any:
with self.log_context(f.__name__) as log:
# updated_log_payload should be either the last positional
# argument or one of the named arguments of the decorated function
value = f(*args, update_log_payload=log, **kwargs)
return value

return wrapper
def log_this_with_extra_payload(self, f: Callable[..., Any]) -> Callable[..., Any]:
"""Decorator that instrument `update_log_payload` to kwargs"""
return self._wrapper(f, allow_extra_payload=True)

@property
def stats_logger(self) -> BaseStatsLogger:
Expand Down Expand Up @@ -217,9 +246,8 @@ def log( # pylint: disable=too-many-arguments,too-many-locals
) -> None:
from superset.models.core import Log

records = kwargs.get("records", list())

logs = list()
records = kwargs.get("records", [])
logs = []
for record in records:
json_string: Optional[str]
try:
Expand Down
79 changes: 48 additions & 31 deletions superset/views/base_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
"filter": {"type": "string"},
},
}
log_context = event_logger.log_context


class RelatedResultResponseSchema(Schema):
Expand Down Expand Up @@ -310,65 +309,83 @@ def send_stats_metrics(
if time_delta:
self.timing_stats("time", key, time_delta)

@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.info",
object_ref=False,
log_to_statsd=False,
)
def info_headless(self, **kwargs: Any) -> Response:
"""
Add statsd metrics to builtin FAB _info endpoint
"""
ref = f"{self.__class__.__name__}.info"
with log_context(ref, ref, log_to_statsd=False):
duration, response = time_function(super().info_headless, **kwargs)
self.send_stats_metrics(response, self.info.__name__, duration)
return response
duration, response = time_function(super().info_headless, **kwargs)
self.send_stats_metrics(response, self.info.__name__, duration)
return response

@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.get",
object_ref=False,
log_to_statsd=False,
)
def get_headless(self, pk: int, **kwargs: Any) -> Response:
"""
Add statsd metrics to builtin FAB GET endpoint
"""
ref = f"{self.__class__.__name__}.get"
with log_context(ref, ref, log_to_statsd=False):
duration, response = time_function(super().get_headless, pk, **kwargs)
self.send_stats_metrics(response, self.get.__name__, duration)
return response
duration, response = time_function(super().get_headless, pk, **kwargs)
self.send_stats_metrics(response, self.get.__name__, duration)
return response

@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.get_list",
object_ref=False,
log_to_statsd=False,
)
def get_list_headless(self, **kwargs: Any) -> Response:
"""
Add statsd metrics to builtin FAB GET list endpoint
"""
ref = f"{self.__class__.__name__}.get_list"
with log_context(ref, ref, log_to_statsd=False):
duration, response = time_function(super().get_list_headless, **kwargs)
self.send_stats_metrics(response, self.get_list.__name__, duration)
return response
duration, response = time_function(super().get_list_headless, **kwargs)
self.send_stats_metrics(response, self.get_list.__name__, duration)
return response

@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.post",
object_ref=False,
log_to_statsd=False,
)
def post_headless(self) -> Response:
"""
Add statsd metrics to builtin FAB POST endpoint
"""
ref = f"{self.__class__.__name__}.post"
with log_context(ref, ref, log_to_statsd=False):
duration, response = time_function(super().post_headless)
self.send_stats_metrics(response, self.post.__name__, duration)
return response
duration, response = time_function(super().post_headless)
self.send_stats_metrics(response, self.post.__name__, duration)
return response

@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.put",
object_ref=False,
log_to_statsd=False,
)
def put_headless(self, pk: int) -> Response:
"""
Add statsd metrics to builtin FAB PUT endpoint
"""
ref = f"{self.__class__.__name__}.put"
with log_context(ref, ref, log_to_statsd=False):
duration, response = time_function(super().put_headless, pk)
self.send_stats_metrics(response, self.put.__name__, duration)
return response
duration, response = time_function(super().put_headless, pk)
self.send_stats_metrics(response, self.put.__name__, duration)
return response

@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.delete",
object_ref=False,
log_to_statsd=False,
)
def delete_headless(self, pk: int) -> Response:
"""
Add statsd metrics to builtin FAB DELETE endpoint
"""
ref = f"{self.__class__.__name__}.delete"
with log_context(ref, ref, log_to_statsd=False):
duration, response = time_function(super().delete_headless, pk)
self.send_stats_metrics(response, self.delete.__name__, duration)
return response
duration, response = time_function(super().delete_headless, pk)
self.send_stats_metrics(response, self.delete.__name__, duration)
return response

@expose("/related/<column_name>", methods=["GET"])
@protect()
Expand Down
8 changes: 4 additions & 4 deletions superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1752,13 +1752,13 @@ def publish( # pylint: disable=no-self-use

@has_access
@expose("/dashboard/<dashboard_id_or_slug>/")
@event_logger.log_manually
@event_logger.log_this_with_extra_payload
def dashboard( # pylint: disable=too-many-locals
self,
dashboard_id_or_slug: str,
# this parameter is added by `log_manually`,
# this parameter is added by `log_this_with_manual_updates`,
# set a default value to appease pylint
update_log_payload: Callable[..., None] = lambda **kwargs: None,
add_extra_log_payload: Callable[..., None] = lambda **kwargs: None,
) -> FlaskResponse:
"""Server side rendering for a dashboard"""
session = db.session()
Expand Down Expand Up @@ -1807,7 +1807,7 @@ def dashboard( # pylint: disable=too-many-locals
request.args.get(utils.ReservedUrlParameters.EDIT_MODE.value) == "true"
)

update_log_payload(
add_extra_log_payload(
dashboard_id=dash.id,
dashboard_version="v2",
dash_edit_perm=dash_edit_perm,
Expand Down
Loading

0 comments on commit 76f9f18

Please sign in to comment.