Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix MultiWriteIdGenerator's handling of restarts. (#8374)
Browse files Browse the repository at this point in the history
On startup `MultiWriteIdGenerator` fetches the maximum stream ID for
each instance from the table and uses that as its initial "current
position" for each writer. This is problematic as a) it involves either
a scan of events table or an index (neither of which is ideal), and b)
if rows are being persisted out of order elsewhere while the process
restarts then using the maximum stream ID is not correct. This could
theoretically lead to race conditions where e.g. events that are
persisted out of order are not sent down sync streams.

We fix this by creating a new table that tracks the current positions of
each writer to the stream, and update it each time we finish persisting
a new entry. This is a relatively small overhead when persisting events.
However for the cache invalidation stream this is a much bigger relative
overhead, so instead we note that for invalidation we don't actually
care about reliability over restarts (as there's no caches to
invalidate) and simply don't bother reading and writing to the new table
in that particular case.
  • Loading branch information
erikjohnston committed Sep 24, 2020
1 parent 11c9e17 commit f112cfe
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 30 deletions.
1 change: 1 addition & 0 deletions changelog.d/8374.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix theoretical race condition where events are not sent down `/sync` if the synchrotron worker is restarted without restarting other workers.
2 changes: 2 additions & 0 deletions synapse/replication/slave/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self._cache_id_gen = MultiWriterIdGenerator(
db_conn,
database,
stream_name="caches",
instance_name=hs.get_instance_name(),
table="cache_invalidation_stream_by_instance",
instance_column="instance_name",
id_column="stream_id",
sequence_name="cache_invalidation_stream_seq",
writers=[],
) # type: Optional[MultiWriterIdGenerator]
else:
self._cache_id_gen = None
Expand Down
8 changes: 7 additions & 1 deletion synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,20 @@ def __init__(self, database: DatabasePool, db_conn, hs):
)

if isinstance(self.database_engine, PostgresEngine):
# We set the `writers` to an empty list here as we don't care about
# missing updates over restarts, as we'll not have anything in our
# caches to invalidate. (This reduces the amount of writes to the DB
# that happen).
self._cache_id_gen = MultiWriterIdGenerator(
db_conn,
database,
instance_name="master",
stream_name="caches",
instance_name=hs.get_instance_name(),
table="cache_invalidation_stream_by_instance",
instance_column="instance_name",
id_column="stream_id",
sequence_name="cache_invalidation_stream_seq",
writers=[],
)
else:
self._cache_id_gen = None
Expand Down
4 changes: 4 additions & 0 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,25 @@ def __init__(self, database: DatabasePool, db_conn, hs):
self._stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
stream_name="events",
instance_name=hs.get_instance_name(),
table="events",
instance_column="instance_name",
id_column="stream_ordering",
sequence_name="events_stream_seq",
writers=hs.config.worker.writers.events,
)
self._backfill_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
stream_name="backfill",
instance_name=hs.get_instance_name(),
table="events",
instance_column="instance_name",
id_column="stream_ordering",
sequence_name="events_backfill_stream_seq",
positive=False,
writers=hs.config.worker.writers.events,
)
else:
# We shouldn't be running in worker mode with SQLite, but its useful
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE TABLE stream_positions (
stream_name TEXT NOT NULL,
instance_name TEXT NOT NULL,
stream_id BIGINT NOT NULL
);

CREATE UNIQUE INDEX stream_positions_idx ON stream_positions(stream_name, instance_name);
148 changes: 127 additions & 21 deletions synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import attr
from typing_extensions import Deque

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.util.sequence import PostgresSequenceGenerator

Expand Down Expand Up @@ -184,12 +185,16 @@ class MultiWriterIdGenerator:
Args:
db_conn
db
stream_name: A name for the stream.
instance_name: The name of this instance.
table: Database table associated with stream.
instance_column: Column that stores the row's writer's instance name
id_column: Column that stores the stream ID.
sequence_name: The name of the postgres sequence used to generate new
IDs.
writers: A list of known writers to use to populate current positions
on startup. Can be empty if nothing uses `get_current_token` or
`get_positions` (e.g. caches stream).
positive: Whether the IDs are positive (true) or negative (false).
When using negative IDs we go backwards from -1 to -2, -3, etc.
"""
Expand All @@ -198,16 +203,20 @@ def __init__(
self,
db_conn,
db: DatabasePool,
stream_name: str,
instance_name: str,
table: str,
instance_column: str,
id_column: str,
sequence_name: str,
writers: List[str],
positive: bool = True,
):
self._db = db
self._stream_name = stream_name
self._instance_name = instance_name
self._positive = positive
self._writers = writers
self._return_factor = 1 if positive else -1

# We lock as some functions may be called from DB threads.
Expand All @@ -216,9 +225,7 @@ def __init__(
# Note: If we are a negative stream then we still store all the IDs as
# positive to make life easier for us, and simply negate the IDs when we
# return them.
self._current_positions = self._load_current_ids(
db_conn, table, instance_column, id_column
)
self._current_positions = {} # type: Dict[str, int]

# Set of local IDs that we're still processing. The current position
# should be less than the minimum of this set (if not empty).
Expand Down Expand Up @@ -251,30 +258,80 @@ def __init__(

self._sequence_gen = PostgresSequenceGenerator(sequence_name)

# This goes and fills out the above state from the database.
self._load_current_ids(db_conn, table, instance_column, id_column)

def _load_current_ids(
self, db_conn, table: str, instance_column: str, id_column: str
) -> Dict[str, int]:
# If positive stream aggregate via MAX. For negative stream use MIN
# *and* negate the result to get a positive number.
sql = """
SELECT %(instance)s, %(agg)s(%(id)s) FROM %(table)s
GROUP BY %(instance)s
""" % {
"instance": instance_column,
"id": id_column,
"table": table,
"agg": "MAX" if self._positive else "-MIN",
}

):
cur = db_conn.cursor()
cur.execute(sql)

# `cur` is an iterable over returned rows, which are 2-tuples.
current_positions = dict(cur)
# Load the current positions of all writers for the stream.
if self._writers:
sql = """
SELECT instance_name, stream_id FROM stream_positions
WHERE stream_name = ?
"""
sql = self._db.engine.convert_param_style(sql)

cur.close()
cur.execute(sql, (self._stream_name,))

self._current_positions = {
instance: stream_id * self._return_factor
for instance, stream_id in cur
if instance in self._writers
}

# We set the `_persisted_upto_position` to be the minimum of all current
# positions. If empty we use the max stream ID from the DB table.
min_stream_id = min(self._current_positions.values(), default=None)

if min_stream_id is None:
sql = """
SELECT COALESCE(%(agg)s(%(id)s), 1) FROM %(table)s
""" % {
"id": id_column,
"table": table,
"agg": "MAX" if self._positive else "-MIN",
}
cur.execute(sql)
(stream_id,) = cur.fetchone()
self._persisted_upto_position = stream_id
else:
# If we have a min_stream_id then we pull out everything greater
# than it from the DB so that we can prefill
# `_known_persisted_positions` and get a more accurate
# `_persisted_upto_position`.
#
# We also check if any of the later rows are from this instance, in
# which case we use that for this instance's current position. This
# is to handle the case where we didn't finish persisting to the
# stream positions table before restart (or the stream position
# table otherwise got out of date).

sql = """
SELECT %(instance)s, %(id)s FROM %(table)s
WHERE ? %(cmp)s %(id)s
""" % {
"id": id_column,
"table": table,
"instance": instance_column,
"cmp": "<=" if self._positive else ">=",
}
sql = self._db.engine.convert_param_style(sql)
cur.execute(sql, (min_stream_id,))

self._persisted_upto_position = min_stream_id

with self._lock:
for (instance, stream_id,) in cur:
stream_id = self._return_factor * stream_id
self._add_persisted_position(stream_id)

return current_positions
if instance == self._instance_name:
self._current_positions[instance] = stream_id

cur.close()

def _load_next_id_txn(self, txn) -> int:
return self._sequence_gen.get_next_id_txn(txn)
Expand Down Expand Up @@ -316,6 +373,21 @@ def get_next_txn(self, txn: LoggingTransaction):
txn.call_after(self._mark_id_as_finished, next_id)
txn.call_on_exception(self._mark_id_as_finished, next_id)

# Update the `stream_positions` table with newly updated stream
# ID (unless self._writers is not set in which case we don't
# bother, as nothing will read it).
#
# We only do this on the success path so that the persisted current
# position points to a persited row with the correct instance name.
if self._writers:
txn.call_after(
run_as_background_process,
"MultiWriterIdGenerator._update_table",
self._db.runInteraction,
"MultiWriterIdGenerator._update_table",
self._update_stream_positions_table_txn,
)

return self._return_factor * next_id

def _mark_id_as_finished(self, next_id: int):
Expand Down Expand Up @@ -447,6 +519,28 @@ def _add_persisted_position(self, new_id: int):
# do.
break

def _update_stream_positions_table_txn(self, txn):
"""Update the `stream_positions` table with newly persisted position.
"""

if not self._writers:
return

# We upsert the value, ensuring on conflict that we always increase the
# value (or decrease if stream goes backwards).
sql = """
INSERT INTO stream_positions (stream_name, instance_name, stream_id)
VALUES (?, ?, ?)
ON CONFLICT (stream_name, instance_name)
DO UPDATE SET
stream_id = %(agg)s(stream_positions.stream_id, EXCLUDED.stream_id)
""" % {
"agg": "GREATEST" if self._positive else "LEAST",
}

pos = (self.get_current_token_for_writer(self._instance_name),)
txn.execute(sql, (self._stream_name, self._instance_name, pos))


@attr.s(slots=True)
class _AsyncCtxManagerWrapper:
Expand Down Expand Up @@ -503,4 +597,16 @@ async def __aexit__(self, exc_type, exc, tb):
if exc_type is not None:
return False

# Update the `stream_positions` table with newly updated stream
# ID (unless self._writers is not set in which case we don't
# bother, as nothing will read it).
#
# We only do this on the success path so that the persisted current
# position points to a persisted row with the correct instance name.
if self.id_gen._writers:
await self.id_gen._db.runInteraction(
"MultiWriterIdGenerator._update_table",
self.id_gen._update_stream_positions_table_txn,
)

return False
Loading

0 comments on commit f112cfe

Please sign in to comment.