Skip to content

Commit

Permalink
Move towards using MultiWriterIdGenerator everywhere (#17226)
Browse files Browse the repository at this point in the history
There is a problem with `StreamIdGenerator` where it can go backwards
over restarts when a stream ID is requested but then not inserted into
the DB. This is problematic if we want to land #17215, and is generally
a potential cause for all sorts of nastiness.

Instead of trying to fix `StreamIdGenerator`, we may as well move to
`MultiWriterIdGenerator` that does not suffer from this problem (the
latest positions are stored in `stream_positions` table). This involves
adding SQLite support to the class.

This only changes id generators that were already using
`MultiWriterIdGenerator` under postgres, a separate PR will move the
rest of the uses of `StreamIdGenerator` over.
  • Loading branch information
erikjohnston committed May 29, 2024
1 parent 726006c commit 466f344
Show file tree
Hide file tree
Showing 10 changed files with 341 additions and 379 deletions.
1 change: 1 addition & 0 deletions changelog.d/17226.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move towards using `MultiWriterIdGenerator` everywhere.
21 changes: 18 additions & 3 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2461,7 +2461,11 @@ def simple_select_list_paginate_txn(


def make_in_list_sql_clause(
database_engine: BaseDatabaseEngine, column: str, iterable: Collection[Any]
database_engine: BaseDatabaseEngine,
column: str,
iterable: Collection[Any],
*,
negative: bool = False,
) -> Tuple[str, list]:
"""Returns an SQL clause that checks the given column is in the iterable.
Expand All @@ -2474,6 +2478,7 @@ def make_in_list_sql_clause(
database_engine
column: Name of the column
iterable: The values to check the column against.
negative: Whether we should check for inequality, i.e. `NOT IN`
Returns:
A tuple of SQL query and the args
Expand All @@ -2482,9 +2487,19 @@ def make_in_list_sql_clause(
if database_engine.supports_using_any_list:
# This should hopefully be faster, but also makes postgres query
# stats easier to understand.
return "%s = ANY(?)" % (column,), [list(iterable)]
if not negative:
clause = f"{column} = ANY(?)"
else:
clause = f"{column} != ALL(?)"

return clause, [list(iterable)]
else:
return "%s IN (%s)" % (column, ",".join("?" for _ in iterable)), list(iterable)
params = ",".join("?" for _ in iterable)
if not negative:
clause = f"{column} IN ({params})"
else:
clause = f"{column} NOT IN ({params})"
return clause, list(iterable)


# These overloads ensure that `columns` and `iterable` values have the same length.
Expand Down
47 changes: 14 additions & 33 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
StreamIdGenerator,
)
from synapse.types import JsonDict, JsonMapping
from synapse.util import json_encoder
Expand Down Expand Up @@ -75,37 +73,20 @@ def __init__(

self._account_data_id_gen: AbstractStreamIdGenerator

if isinstance(database.engine, PostgresEngine):
self._account_data_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="account_data",
instance_name=self._instance_name,
tables=[
("room_account_data", "instance_name", "stream_id"),
("room_tags_revisions", "instance_name", "stream_id"),
("account_data", "instance_name", "stream_id"),
],
sequence_name="account_data_sequence",
writers=hs.config.worker.writers.account_data,
)
else:
# Multiple writers are not supported for SQLite.
#
# We shouldn't be running in worker mode with SQLite, but its useful
# to support it for unit tests.
self._account_data_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"room_account_data",
"stream_id",
extra_tables=[
("account_data", "stream_id"),
("room_tags_revisions", "stream_id"),
],
is_writer=self._instance_name in hs.config.worker.writers.account_data,
)
self._account_data_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="account_data",
instance_name=self._instance_name,
tables=[
("room_account_data", "instance_name", "stream_id"),
("room_tags_revisions", "instance_name", "stream_id"),
("account_data", "instance_name", "stream_id"),
],
sequence_name="account_data_sequence",
writers=hs.config.worker.writers.account_data,
)

account_max = self.get_max_account_data_stream_id()
self._account_data_stream_cache = StreamChangeCache(
Expand Down
46 changes: 16 additions & 30 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,9 @@
LoggingTransaction,
make_in_list_sql_clause,
)
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
StreamIdGenerator,
)
from synapse.types import JsonDict
from synapse.util import json_encoder
Expand Down Expand Up @@ -89,35 +87,23 @@ def __init__(
expiry_ms=30 * 60 * 1000,
)

if isinstance(database.engine, PostgresEngine):
self._can_write_to_device = (
self._instance_name in hs.config.worker.writers.to_device
)
self._can_write_to_device = (
self._instance_name in hs.config.worker.writers.to_device
)

self._to_device_msg_id_gen: AbstractStreamIdGenerator = (
MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="to_device",
instance_name=self._instance_name,
tables=[
("device_inbox", "instance_name", "stream_id"),
("device_federation_outbox", "instance_name", "stream_id"),
],
sequence_name="device_inbox_sequence",
writers=hs.config.worker.writers.to_device,
)
)
else:
self._can_write_to_device = True
self._to_device_msg_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"device_inbox",
"stream_id",
extra_tables=[("device_federation_outbox", "stream_id")],
)
self._to_device_msg_id_gen: AbstractStreamIdGenerator = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="to_device",
instance_name=self._instance_name,
tables=[
("device_inbox", "instance_name", "stream_id"),
("device_federation_outbox", "instance_name", "stream_id"),
],
sequence_name="device_inbox_sequence",
writers=hs.config.worker.writers.to_device,
)

max_device_inbox_id = self._to_device_msg_id_gen.get_current_token()
device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(
Expand Down
101 changes: 33 additions & 68 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,10 @@
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
StreamIdGenerator,
)
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import JsonDict, get_domain_from_id
Expand Down Expand Up @@ -195,51 +193,28 @@ def __init__(

self._stream_id_gen: AbstractStreamIdGenerator
self._backfill_id_gen: AbstractStreamIdGenerator
if isinstance(database.engine, PostgresEngine):
# If we're using Postgres than we can use `MultiWriterIdGenerator`
# regardless of whether this process writes to the streams or not.
self._stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="events",
instance_name=hs.get_instance_name(),
tables=[("events", "instance_name", "stream_ordering")],
sequence_name="events_stream_seq",
writers=hs.config.worker.writers.events,
)
self._backfill_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="backfill",
instance_name=hs.get_instance_name(),
tables=[("events", "instance_name", "stream_ordering")],
sequence_name="events_backfill_stream_seq",
positive=False,
writers=hs.config.worker.writers.events,
)
else:
# Multiple writers are not supported for SQLite.
#
# We shouldn't be running in worker mode with SQLite, but its useful
# to support it for unit tests.
self._stream_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"events",
"stream_ordering",
is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
)
self._backfill_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"events",
"stream_ordering",
step=-1,
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
)

self._stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="events",
instance_name=hs.get_instance_name(),
tables=[("events", "instance_name", "stream_ordering")],
sequence_name="events_stream_seq",
writers=hs.config.worker.writers.events,
)
self._backfill_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="backfill",
instance_name=hs.get_instance_name(),
tables=[("events", "instance_name", "stream_ordering")],
sequence_name="events_backfill_stream_seq",
positive=False,
writers=hs.config.worker.writers.events,
)

events_max = self._stream_id_gen.get_current_token()
curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
Expand Down Expand Up @@ -309,27 +284,17 @@ def get_chain_id_txn(txn: Cursor) -> int:

self._un_partial_stated_events_stream_id_gen: AbstractStreamIdGenerator

if isinstance(database.engine, PostgresEngine):
self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="un_partial_stated_event_stream",
instance_name=hs.get_instance_name(),
tables=[
("un_partial_stated_event_stream", "instance_name", "stream_id")
],
sequence_name="un_partial_stated_event_stream_sequence",
# TODO(faster_joins, multiple writers) Support multiple writers.
writers=["master"],
)
else:
self._un_partial_stated_events_stream_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"un_partial_stated_event_stream",
"stream_id",
)
self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="un_partial_stated_event_stream",
instance_name=hs.get_instance_name(),
tables=[("un_partial_stated_event_stream", "instance_name", "stream_id")],
sequence_name="un_partial_stated_event_stream_sequence",
# TODO(faster_joins, multiple writers) Support multiple writers.
writers=["master"],
)

def get_un_partial_stated_events_token(self, instance_name: str) -> int:
return (
Expand Down
27 changes: 10 additions & 17 deletions synapse/storage/databases/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,11 @@
LoggingTransaction,
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.engines._base import IsolationLevel
from synapse.storage.types import Connection
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
StreamIdGenerator,
)
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
Expand Down Expand Up @@ -91,21 +89,16 @@ def __init__(
self._instance_name in hs.config.worker.writers.presence
)

if isinstance(database.engine, PostgresEngine):
self._presence_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="presence_stream",
instance_name=self._instance_name,
tables=[("presence_stream", "instance_name", "stream_id")],
sequence_name="presence_stream_sequence",
writers=hs.config.worker.writers.presence,
)
else:
self._presence_id_gen = StreamIdGenerator(
db_conn, hs.get_replication_notifier(), "presence_stream", "stream_id"
)
self._presence_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="presence_stream",
instance_name=self._instance_name,
tables=[("presence_stream", "instance_name", "stream_id")],
sequence_name="presence_stream_sequence",
writers=hs.config.worker.writers.presence,
)

self.hs = hs
self._presence_on_startup = self._get_active_presence(db_conn)
Expand Down
Loading

0 comments on commit 466f344

Please sign in to comment.