From f81a6932360ac90d38601e3a4fd9e74483484825 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 25 Sep 2020 12:26:46 +0100 Subject: [PATCH 1/5] Add ability to check postgres sequence consistency --- docs/postgres.md | 11 ++++ synapse/storage/util/sequence.py | 90 +++++++++++++++++++++++++++++++- 2 files changed, 99 insertions(+), 2 deletions(-) diff --git a/docs/postgres.md b/docs/postgres.md index e71a1975d8d2..c30cc1fd8cef 100644 --- a/docs/postgres.md +++ b/docs/postgres.md @@ -106,6 +106,17 @@ Note that the above may fail with an error about duplicate rows if corruption has already occurred, and such duplicate rows will need to be manually removed. +## Fixing inconsistent sequences error + +Synapse uses Postgres sequences to generate IDs for various tables. A sequence +and associated table can get out of sync if, for example, Synapse has been +downgraded and then upgraded again. + +To fix the issue shut down Synapse (including any and all workers) and run the +SQL command included in the error message. Once done Synapse should start +successfully. + + ## Tuning Postgres The default settings should be fine for most deployments. For larger diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index ffc189474890..5c3555481039 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -13,11 +13,34 @@ # See the License for the specific language governing permissions and # limitations under the License. import abc +import logging import threading from typing import Callable, List, Optional -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine -from synapse.storage.types import Cursor +from synapse.storage.engines import ( + BaseDatabaseEngine, + IncorrectDatabaseSetup, + PostgresEngine, +) +from synapse.storage.types import Connection, Cursor + +logger = logging.getLogger(__name__) + + +_INCONSISTENT_SEQUENCE_ERROR = """ +Postgres sequence '%(seq)s' is inconsistent with associated +table '%(table)s'. This can happen if Synapse has been downgraded and +then upgraded again, or due to a bad migration. + +To fix shut down Synapse (including any and all workers) and run the +following SQL: + + SELECT setval('%(seq)s', ( + %(max_id_sql)s + )); + +See docs/postgres.md for more information. +""" class SequenceGenerator(metaclass=abc.ABCMeta): @@ -28,6 +51,19 @@ def get_next_id_txn(self, txn: Cursor) -> int: """Gets the next ID in the sequence""" ... + @abc.abstractmethod + def check_consistency( + self, db_conn: Connection, table: str, id_column: str, positive: bool = True + ): + """Should be called during start up to test that the current value of + the sequence is greater than or equal to the maximum ID in the table. + + This is to handle various cases where the sequence value can get out + of sync with the table, e.g. if Synapse gets rolled back to a previous + version and the rolled forwards again. + """ + ... + class PostgresSequenceGenerator(SequenceGenerator): """An implementation of SequenceGenerator which uses a postgres sequence""" @@ -45,6 +81,50 @@ def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]: ) return [i for (i,) in txn] + def check_consistency( + self, db_conn: Connection, table: str, id_column: str, positive: bool = True + ): + txn = db_conn.cursor() + + # First we get the current max ID from the table. + table_sql = "SELECT GREATEST(%(agg)s(%(id)s), 0) FROM %(table)s" % { + "id": id_column, + "table": table, + "agg": "MAX" if positive else "-MIN", + } + + txn.execute(table_sql) + row = txn.fetchone() + if not row: + # Table is empty, so nothing to do. + txn.close() + return + + # Now we fetch the current value from the sequence and compare wit the + # above. + max_stream_id = row[0] + txn.execute( + "SELECT last_value, is_called FROM %(seq)s" % {"seq": self._sequence_name} + ) + last_value, is_called = txn.fetchone() + txn.close() + + # If `is_called` is False then `last_value` is actually the value that + # will be generated next, so we decrement to get the true "last value". + if not is_called: + last_value -= 1 + + if max_stream_id > last_value: + logger.warning( + "Postgres sequence %s is behind table %s: %d < %d", + last_value, + max_stream_id, + ) + raise IncorrectDatabaseSetup( + _INCONSISTENT_SEQUENCE_ERROR + % {"seq": self._sequence_name, "table": table, "max_id_sql": table_sql} + ) + GetFirstCallbackType = Callable[[Cursor], int] @@ -81,6 +161,12 @@ def get_next_id_txn(self, txn: Cursor) -> int: self._current_max_id += 1 return self._current_max_id + def check_consistency( + self, db_conn: Connection, table: str, id_column: str, positive: bool = True + ): + # There is nothing to do for in memory sequences + pass + def build_sequence_generator( database_engine: BaseDatabaseEngine, From 6ac22c528f5b8b274bbcda0df10dd340cd49daf1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 25 Sep 2020 12:27:02 +0100 Subject: [PATCH 2/5] Check ID gen sequences --- synapse/storage/util/id_generators.py | 5 +++++ tests/storage/test_id_generators.py | 22 ++++++++++++++++--- tests/unittest.py | 31 ++++++++++++++++++++++++++- 3 files changed, 54 insertions(+), 4 deletions(-) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 4269eaf9187e..4fd7573e260d 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -258,6 +258,11 @@ def __init__( self._sequence_gen = PostgresSequenceGenerator(sequence_name) + # We check that the table and sequence haven't diverged. + self._sequence_gen.check_consistency( + db_conn, table=table, id_column=id_column, positive=positive + ) + # This goes and fills out the above state from the database. self._load_current_ids(db_conn, table, instance_column, id_column) diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py index d4ff55fbff7d..4558bee7be85 100644 --- a/tests/storage/test_id_generators.py +++ b/tests/storage/test_id_generators.py @@ -12,9 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - from synapse.storage.database import DatabasePool +from synapse.storage.engines import IncorrectDatabaseSetup from synapse.storage.util.id_generators import MultiWriterIdGenerator from tests.unittest import HomeserverTestCase @@ -59,7 +58,7 @@ def _create(conn): writers=writers, ) - return self.get_success(self.db_pool.runWithConnection(_create)) + return self.get_success_or_raise(self.db_pool.runWithConnection(_create)) def _insert_rows(self, instance_name: str, number: int): """Insert N rows as the given instance, inserting with stream IDs pulled @@ -411,6 +410,23 @@ async def _get_next_async(): self.get_success(_get_next_async()) self.assertEqual(id_gen_3.get_persisted_upto_position(), 6) + def test_sequence_consistency(self): + """Test that we error out if the table and sequence diverges. + """ + + # Prefill with some rows + self._insert_row_with_id("master", 3) + + # Now we add a row *without* updating the stream ID + def _insert(txn): + txn.execute("INSERT INTO foobar VALUES (26, 'master')") + + self.get_success(self.db_pool.runInteraction("_insert", _insert)) + + # Creating the ID gen should error + with self.assertRaises(IncorrectDatabaseSetup): + self._create_id_generator("first") + class BackwardsMultiWriterIdGeneratorTestCase(HomeserverTestCase): """Tests MultiWriterIdGenerator that produce *negative* stream IDs. diff --git a/tests/unittest.py b/tests/unittest.py index dabf69cff405..e62a1e2cf858 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -14,7 +14,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import gc import hashlib import hmac @@ -28,6 +27,7 @@ from canonicaljson import json from twisted.internet.defer import Deferred, ensureDeferred, succeed +from twisted.python.failure import Failure from twisted.python.threadpool import ThreadPool from twisted.trial import unittest @@ -463,6 +463,35 @@ def get_failure(self, d, exc): self.pump() return self.failureResultOf(d, exc) + def get_success_or_raise(self, d, by=0.0): + """Drive deferred to completion and return result or raise exception + on failure. + """ + + if inspect.isawaitable(d): + deferred = ensureDeferred(d) + if not isinstance(deferred, Deferred): + return d + + results = [] # type: list + deferred.addBoth(results.append) + + self.pump(by=by) + + if not results: + self.fail( + "Success result expected on {!r}, found no result instead".format( + deferred + ) + ) + + result = results[0] + + if isinstance(result, Failure): + result.raiseException() + + return result + def register_user(self, username, password, admin=False): """ Register a user. Requires the Admin API be registered. From aa10531e08e2b3a3ecb4368616e76a4fbee5724c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 25 Sep 2020 12:30:41 +0100 Subject: [PATCH 3/5] Check 'state_group_id_seq' for consistency --- synapse/storage/databases/main/registration.py | 3 +++ synapse/storage/databases/state/store.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 33825e894936..0cc8572c1928 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -41,6 +41,9 @@ def __init__(self, database: DatabasePool, db_conn, hs): self.config = hs.config self.clock = hs.get_clock() + # Note: we don't check this sequence for consistency as we'd have to + # call `find_max_generated_user_id_localpart` each time, which is + # expensive if there are many entries. self._user_id_seq = build_sequence_generator( database.engine, find_max_generated_user_id_localpart, "user_id_seq", ) diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index bec3780a32b1..989f0cbc9d3b 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -99,6 +99,9 @@ def get_max_state_group_txn(txn: Cursor): self._state_group_seq_gen = build_sequence_generator( self.database_engine, get_max_state_group_txn, "state_group_id_seq" ) + self._state_group_seq_gen.check_consistency( + db_conn, table="state_groups", id_column="id" + ) @cached(max_entries=10000, iterable=True) async def get_state_group_delta(self, state_group): From 06d460aaf21ced9476efcc4ecd7b790ee2b9f831 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 25 Sep 2020 12:33:04 +0100 Subject: [PATCH 4/5] Newsfile --- changelog.d/8402.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8402.misc diff --git a/changelog.d/8402.misc b/changelog.d/8402.misc new file mode 100644 index 000000000000..ad1804d207aa --- /dev/null +++ b/changelog.d/8402.misc @@ -0,0 +1 @@ +Add checks on startup that PostgreSQL sequences are consistent with their associated tables. From 52911067e7a7b99e5d9c1b7150d484fab335e3b9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 28 Sep 2020 17:05:57 +0100 Subject: [PATCH 5/5] Code review --- synapse/storage/util/sequence.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index 5c3555481039..2dd95e270920 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -32,8 +32,8 @@ table '%(table)s'. This can happen if Synapse has been downgraded and then upgraded again, or due to a bad migration. -To fix shut down Synapse (including any and all workers) and run the -following SQL: +To fix this error, shut down Synapse (including any and all workers) +and run the following SQL: SELECT setval('%(seq)s', ( %(max_id_sql)s @@ -100,7 +100,7 @@ def check_consistency( txn.close() return - # Now we fetch the current value from the sequence and compare wit the + # Now we fetch the current value from the sequence and compare with the # above. max_stream_id = row[0] txn.execute(