Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add logging on startup/shutdown #8448

Merged
merged 13 commits into from
Oct 2, 2020
1 change: 1 addition & 0 deletions changelog.d/8448.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add SQL logging on queries that happen during startup.
2 changes: 1 addition & 1 deletion scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ class Porter(object):

hs = MockHomeserver(self.hs_config)

with make_conn(db_config, engine) as db_conn:
with make_conn(db_config, engine, "portdb") as db_conn:
engine.check_database(
db_conn, allow_outdated_version=allow_outdated_version
)
Expand Down
5 changes: 5 additions & 0 deletions synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ def handle_sighup(*args, **kwargs):
hs.get_datastore().db_pool.start_profiling()
hs.get_pusherpool().start()

# Log when we start the shut down process.
hs.get_reactor().addSystemEventTrigger(
"before", "shutdown", logger.info, "Shutting down..."
)

setup_sentry(hs)
setup_sdnotify(hs)

Expand Down
89 changes: 74 additions & 15 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
overload,
)

import attr
from prometheus_client import Histogram
from typing_extensions import Literal

Expand Down Expand Up @@ -90,13 +91,17 @@ def make_pool(
return adbapi.ConnectionPool(
db_config.config["name"],
cp_reactor=reactor,
cp_openfun=engine.on_new_connection,
cp_openfun=lambda conn: engine.on_new_connection(
LoggingDatabaseConnection(conn, engine, "on_new_connection")
),
**db_config.config.get("args", {})
)


def make_conn(
db_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine
db_config: DatabaseConnectionConfig,
engine: BaseDatabaseEngine,
default_txn_name: str,
) -> Connection:
"""Make a new connection to the database and return it.

Expand All @@ -109,11 +114,60 @@ def make_conn(
for k, v in db_config.config.get("args", {}).items()
if not k.startswith("cp_")
}
db_conn = engine.module.connect(**db_params)
native_db_conn = engine.module.connect(**db_params)
db_conn = LoggingDatabaseConnection(native_db_conn, engine, default_txn_name)

engine.on_new_connection(db_conn)
return db_conn


@attr.s(slots=True)
class LoggingDatabaseConnection:
"""A wrapper around a database connection that returns `LoggingTransaction`
as its cursor class.

This is mainly used on startup to ensure that queries get logged correctly
"""

conn = attr.ib(type=Connection)
engine = attr.ib(type=BaseDatabaseEngine)
default_txn_name = attr.ib(type=str)

def cursor(
self, *, txn_name=None, after_callbacks=None, exception_callbacks=None
) -> "LoggingTransaction":
if not txn_name:
txn_name = self.default_txn_name

return LoggingTransaction(
self.conn.cursor(),
name=txn_name,
database_engine=self.engine,
after_callbacks=after_callbacks,
exception_callbacks=exception_callbacks,
)

def close(self) -> None:
self.conn.close()

def commit(self) -> None:
self.conn.commit()

def rollback(self, *args, **kwargs) -> None:
self.conn.rollback(*args, **kwargs)

def __enter__(self) -> "Connection":
self.conn.__enter__()
return self

def __exit__(self, exc_type, exc_value, traceback) -> bool:
return self.conn.__exit__(exc_type, exc_value, traceback)

# Proxy through any unknown lookups to the DB conn class.
def __getattr__(self, name):
return getattr(self.conn, name)


# The type of entry which goes on our after_callbacks and exception_callbacks lists.
#
# Python 3.5.2 doesn't support Callable with an ellipsis, so we wrap it in quotes so
Expand Down Expand Up @@ -247,6 +301,12 @@ def _do_execute(self, func, sql: str, *args: Any) -> None:
def close(self) -> None:
self.txn.close()

def __enter__(self) -> "LoggingTransaction":
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()
Comment on lines +304 to +308
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we never previously used LoggingTransaction as a context manager even though Transaction can be used that way?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly.



class PerformanceCounters:
def __init__(self):
Expand Down Expand Up @@ -395,7 +455,7 @@ def loop():

def new_transaction(
self,
conn: Connection,
conn: LoggingDatabaseConnection,
desc: str,
after_callbacks: List[_CallbackListEntry],
exception_callbacks: List[_CallbackListEntry],
Expand All @@ -418,12 +478,10 @@ def new_transaction(
i = 0
N = 5
while True:
cursor = LoggingTransaction(
conn.cursor(),
name,
self.engine,
after_callbacks,
exception_callbacks,
cursor = conn.cursor(
txn_name=name,
after_callbacks=after_callbacks,
exception_callbacks=exception_callbacks,
)
try:
r = func(cursor, *args, **kwargs)
Expand Down Expand Up @@ -584,7 +642,10 @@ def inner_func(conn, *args, **kwargs):
logger.debug("Reconnecting closed database connection")
conn.reconnect()

return func(conn, *args, **kwargs)
db_conn = LoggingDatabaseConnection(
conn, self.engine, "runWithConnection"
)
return func(db_conn, *args, **kwargs)
Comment on lines +645 to +648
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ensuring another location uses LoggingDatabaseConnection or is it necessary because of the refactoring? I'm having a bit of trouble following why this change was done.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just ensures that whenever we have use a connection after a call to runWithConnection it is a logging connection, which a) helps ensure that more SQL statements get logged and b) necessary to make the tests work as they call runWithConnection and then call things expecting a logging connection a lot.


return await make_deferred_yieldable(
self._db_pool.runWithConnection(inner_func, *args, **kwargs)
Expand Down Expand Up @@ -1621,7 +1682,7 @@ def simple_delete_many_txn(

def get_cache_dict(
self,
db_conn: Connection,
db_conn: LoggingDatabaseConnection,
table: str,
entity_column: str,
stream_column: str,
Expand All @@ -1642,9 +1703,7 @@ def get_cache_dict(
"limit": limit,
}

sql = self.engine.convert_param_style(sql)

txn = db_conn.cursor()
txn = db_conn.cursor(txn_name="get_cache_dict")
txn.execute(sql, (int(max_value),))

cache = {row[0]: int(row[1]) for row in txn}
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(self, main_store_class, hs):
db_name = database_config.name
engine = create_engine(database_config.config)

with make_conn(database_config, engine) as db_conn:
with make_conn(database_config, engine, "startup") as db_conn:
logger.info("[database config %r]: Checking database server", db_name)
engine.check_database(db_conn)

Expand Down
1 change: 0 additions & 1 deletion synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ def _get_active_presence(self, db_conn):
" last_user_sync_ts, status_msg, currently_active FROM presence_stream"
" WHERE state != ?"
)
sql = self.database_engine.convert_param_style(sql)

txn = db_conn.cursor()
txn.execute(sql, (PresenceState.OFFLINE,))
Expand Down
8 changes: 2 additions & 6 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import attr

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction, SQLBaseStore, db_to_json
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
Expand Down Expand Up @@ -74,11 +74,7 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self.stream_ordering_month_ago = None
self.stream_ordering_day_ago = None

cur = LoggingTransaction(
db_conn.cursor(),
name="_find_stream_orderings_for_times_txn",
database_engine=self.database_engine,
)
cur = db_conn.cursor(txn_name="_find_stream_orderings_for_times_txn")
self._find_stream_orderings_for_times_txn(cur)
cur.close()

Expand Down
1 change: 0 additions & 1 deletion synapse/storage/databases/main/monthly_active_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self._mau_stats_only = hs.config.mau_stats_only

# Do not add more reserved users than the total allowable number
# cur = LoggingTransaction(
self.db_pool.new_transaction(
db_conn,
"initialise_mau_threepids",
Expand Down
13 changes: 3 additions & 10 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@
from synapse.events.snapshot import EventContext
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import (
LoggingTransaction,
SQLBaseStore,
db_to_json,
make_in_list_sql_clause,
)
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import Sqlite3Engine
Expand Down Expand Up @@ -60,10 +55,8 @@ def __init__(self, database: DatabasePool, db_conn, hs):
# background update still running?
self._current_state_events_membership_up_to_date = False

txn = LoggingTransaction(
db_conn.cursor(),
name="_check_safe_current_state_events_membership_updated",
database_engine=self.database_engine,
txn = db_conn.cursor(
txn_name="_check_safe_current_state_events_membership_updated"
)
self._check_safe_current_state_events_membership_updated_txn(txn)
txn.close()
Expand Down
19 changes: 9 additions & 10 deletions synapse/storage/databases/main/schema/delta/20/pushers.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,15 @@ def run_create(cur, database_engine, *args, **kwargs):
row[8] = bytes(row[8]).decode("utf-8")
row[11] = bytes(row[11]).decode("utf-8")
cur.execute(
database_engine.convert_param_style(
"""
INSERT into pushers2 (
id, user_name, access_token, profile_tag, kind,
app_id, app_display_name, device_display_name,
pushkey, ts, lang, data, last_token, last_success,
failing_since
) values (%s)"""
% (",".join(["?" for _ in range(len(row))]))
),
"""
INSERT into pushers2 (
id, user_name, access_token, profile_tag, kind,
app_id, app_display_name, device_display_name,
pushkey, ts, lang, data, last_token, last_success,
failing_since
) values (%s)
"""
% (",".join(["?" for _ in range(len(row))])),
row,
)
count += 1
Expand Down
2 changes: 0 additions & 2 deletions synapse/storage/databases/main/schema/delta/25/fts.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ def run_create(cur, database_engine, *args, **kwargs):
" VALUES (?, ?)"
)

sql = database_engine.convert_param_style(sql)

cur.execute(sql, ("event_search", progress_json))


Expand Down
2 changes: 0 additions & 2 deletions synapse/storage/databases/main/schema/delta/27/ts.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ def run_create(cur, database_engine, *args, **kwargs):
" VALUES (?, ?)"
)

sql = database_engine.convert_param_style(sql)

cur.execute(sql, ("event_origin_server_ts", progress_json))


Expand Down
6 changes: 2 additions & 4 deletions synapse/storage/databases/main/schema/delta/30/as_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
user_chunks = (user_ids[i : i + 100] for i in range(0, len(user_ids), n))
for chunk in user_chunks:
cur.execute(
database_engine.convert_param_style(
"UPDATE users SET appservice_id = ? WHERE name IN (%s)"
% (",".join("?" for _ in chunk),)
),
"UPDATE users SET appservice_id = ? WHERE name IN (%s)"
% (",".join("?" for _ in chunk),),
[as_id] + chunk,
)
19 changes: 9 additions & 10 deletions synapse/storage/databases/main/schema/delta/31/pushers.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,15 @@ def run_create(cur, database_engine, *args, **kwargs):
row = list(row)
row[12] = token_to_stream_ordering(row[12])
cur.execute(
database_engine.convert_param_style(
"""
INSERT into pushers2 (
id, user_name, access_token, profile_tag, kind,
app_id, app_display_name, device_display_name,
pushkey, ts, lang, data, last_stream_ordering, last_success,
failing_since
) values (%s)"""
% (",".join(["?" for _ in range(len(row))]))
),
"""
INSERT into pushers2 (
id, user_name, access_token, profile_tag, kind,
app_id, app_display_name, device_display_name,
pushkey, ts, lang, data, last_stream_ordering, last_success,
failing_since
) values (%s)
"""
% (",".join(["?" for _ in range(len(row))])),
row,
)
count += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ def run_create(cur, database_engine, *args, **kwargs):
" VALUES (?, ?)"
)

sql = database_engine.convert_param_style(sql)

cur.execute(sql, ("event_search_order", progress_json))


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ def run_create(cur, database_engine, *args, **kwargs):
" VALUES (?, ?)"
)

sql = database_engine.convert_param_style(sql)

cur.execute(sql, ("event_fields_sender_url", progress_json))


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,5 @@ def run_create(cur, database_engine, *args, **kwargs):

def run_upgrade(cur, database_engine, *args, **kwargs):
cur.execute(
database_engine.convert_param_style(
"UPDATE remote_media_cache SET last_access_ts = ?"
),
(int(time.time() * 1000),),
"UPDATE remote_media_cache SET last_access_ts = ?", (int(time.time() * 1000),),
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
from io import StringIO

from synapse.storage.engines import PostgresEngine
from synapse.storage.prepare_database import execute_statements_from_stream

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -46,7 +48,4 @@ def run_create(cur, database_engine, *args, **kwargs):
select_clause,
)

if isinstance(database_engine, PostgresEngine):
cur.execute(sql)
else:
cur.executescript(sql)
execute_statements_from_stream(cur, StringIO(sql))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? It seems potentially inefficient?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a delta so it doesn't really matter if its inefficient. We do this because we don't expose an executescript on LoggingTranasction, and implementing it would be a bit of a faff.

Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
INNER JOIN room_memberships AS r USING (event_id)
WHERE type = 'm.room.member' AND state_key LIKE ?
"""
sql = database_engine.convert_param_style(sql)
cur.execute(sql, ("%:" + config.server_name,))

cur.execute(
Expand Down
Loading