From b2df07f3b6bc1bca9f3ef2e1b3b4f8ff0d4a5319 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Fri, 17 Mar 2023 12:13:35 +0100 Subject: [PATCH 1/9] Make cleaning up pushers depend on the device_id instead of the token_id --- changelog.d/15280.misc | 1 + synapse/handlers/auth.py | 11 ---- synapse/handlers/device.py | 2 + synapse/handlers/register.py | 4 +- synapse/push/__init__.py | 7 ++- synapse/push/pusherpool.py | 53 +++++++++++++------ synapse/rest/admin/users.py | 1 - synapse/rest/client/pusher.py | 1 - synapse/storage/databases/main/pusher.py | 4 +- ...02_set_device_id_for_pushers_bg_update.sql | 18 +++++++ tests/push/test_email.py | 6 +-- tests/push/test_http.py | 33 ++++++------ tests/replication/test_pusher_shard.py | 4 +- tests/rest/admin/test_user.py | 4 +- 14 files changed, 92 insertions(+), 57 deletions(-) create mode 100644 changelog.d/15280.misc create mode 100644 synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql diff --git a/changelog.d/15280.misc b/changelog.d/15280.misc new file mode 100644 index 000000000000..41d56b0cf0b4 --- /dev/null +++ b/changelog.d/15280.misc @@ -0,0 +1 @@ +Make the pushers rely on the `device_id` instead of the `access_token_id` for various operations. diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 308e38edea2d..89e861db0625 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -1503,12 +1503,6 @@ async def delete_access_token(self, access_token: str) -> None: access_token=access_token, ) - # delete pushers associated with this access token - if token.token_id is not None: - await self.hs.get_pusherpool().remove_pushers_by_access_token( - token.user_id, (token.token_id,) - ) - async def delete_access_tokens_for_user( self, user_id: str, @@ -1534,11 +1528,6 @@ async def delete_access_tokens_for_user( user_id=user_id, device_id=device_id, access_token=token ) - # delete pushers associated with the access tokens - await self.hs.get_pusherpool().remove_pushers_by_access_token( - user_id, (token_id for _, token_id, _ in tokens_and_devices) - ) - async def add_threepid( self, user_id: str, medium: str, address: str, validated_at: int ) -> None: diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 6f7963df43ae..9ded6389acdb 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -503,6 +503,8 @@ async def delete_devices(self, user_id: str, device_ids: List[str]) -> None: else: raise + await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids) + # Delete data specific to each device. Not optimised as it is not # considered as part of a critical path. for device_id in device_ids: diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 6b110dcb6e4d..c8bf2439afb5 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -1013,11 +1013,11 @@ async def _register_email_threepid( user_tuple = await self.store.get_user_by_access_token(token) # The token better still exist. assert user_tuple - token_id = user_tuple.token_id + device_id = user_tuple.device_id await self.pusher_pool.add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="email", app_id="m.email", app_display_name="Email Notifications", diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index a0c760239db8..9e3a98741a4f 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -103,7 +103,7 @@ class PusherConfig: id: Optional[str] user_name: str - access_token: Optional[int] + profile_tag: str kind: str app_id: str @@ -119,6 +119,11 @@ class PusherConfig: enabled: bool device_id: Optional[str] + # XXX(quenting): The access_token is not persisted anymore for new pushers, but we + # keep it when reading from the database, so that we don't get stale pushers + # while the "set_device_id_for_pushers" background update is running. + access_token: Optional[int] + def as_dict(self) -> Dict[str, Any]: """Information that can be retrieved about a pusher after creation.""" return { diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index e2648cbc93c9..17c14030d19d 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -97,7 +97,6 @@ def start(self) -> None: async def add_or_update_pusher( self, user_id: str, - access_token: Optional[int], kind: str, app_id: str, app_display_name: str, @@ -128,6 +127,21 @@ async def add_or_update_pusher( # stream ordering, so it will process pushes from this point onwards. last_stream_ordering = self.store.get_room_max_stream_ordering() + # Before we actually persist the pusher, we check if the user already has one + # this app ID and pushkey. If so, we want to keep the access token and device ID + # in place, since this could be one device modifying (e.g. enabling/disabling) + # another device's pusher. + # Even though we're not persisting the access_token_id for new pushers anymore, + # we still need to copy existing access_token_ids over when updating a pusher, + # in case the "set_device_id_for_pushers" background update hasn't run yet. + access_token_id = None + existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey( + user_id, app_id, pushkey + ) + if existing_config: + device_id = existing_config.device_id + access_token_id = existing_config.access_token + # we try to create the pusher just to validate the config: it # will then get pulled out of the database, # recreated, added and started: this means we have only one @@ -136,7 +150,6 @@ async def add_or_update_pusher( PusherConfig( id=None, user_name=user_id, - access_token=access_token, profile_tag=profile_tag, kind=kind, app_id=app_id, @@ -151,23 +164,12 @@ async def add_or_update_pusher( failing_since=None, enabled=enabled, device_id=device_id, + access_token=access_token_id, ) ) - # Before we actually persist the pusher, we check if the user already has one - # this app ID and pushkey. If so, we want to keep the access token and device ID - # in place, since this could be one device modifying (e.g. enabling/disabling) - # another device's pusher. - existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey( - user_id, app_id, pushkey - ) - if existing_config: - access_token = existing_config.access_token - device_id = existing_config.device_id - await self.store.add_pusher( user_id=user_id, - access_token=access_token, kind=kind, app_id=app_id, app_display_name=app_display_name, @@ -180,6 +182,7 @@ async def add_or_update_pusher( profile_tag=profile_tag, enabled=enabled, device_id=device_id, + access_token_id=access_token_id, ) pusher = await self.process_pusher_change_by_id(app_id, pushkey, user_id) @@ -199,7 +202,7 @@ async def remove_pushers_by_app_id_and_pushkey_not_user( ) await self.remove_pusher(p.app_id, p.pushkey, p.user_name) - async def remove_pushers_by_access_token( + async def remove_pushers_by_access_tokens( self, user_id: str, access_tokens: Iterable[int] ) -> None: """Remove the pushers for a given user corresponding to a set of @@ -220,6 +223,26 @@ async def remove_pushers_by_access_token( ) await self.remove_pusher(p.app_id, p.pushkey, p.user_name) + async def remove_pushers_by_devices( + self, user_id: str, devices: Iterable[str] + ) -> None: + """Remove the pushers for a given user corresponding to a set of devices + + Args: + user_id: user to remove pushers for + devices: device IDs to remove pushers for + """ + device_ids = set(devices) + for p in await self.store.get_pushers_by_user_id(user_id): + if p.device_id is not None and p.device_id in device_ids: + logger.info( + "Removing pusher for app id %s, pushkey %s, user %s", + p.app_id, + p.pushkey, + p.user_name, + ) + await self.remove_pusher(p.app_id, p.pushkey, p.user_name) + def on_new_notifications(self, max_token: RoomStreamToken) -> None: if not self.pushers: # nothing to do here. diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index 281e8fd0adc2..331f22511618 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -425,7 +425,6 @@ async def on_PUT( ): await self.pusher_pool.add_or_update_pusher( user_id=user_id, - access_token=None, kind="email", app_id="m.email", app_display_name="Email Notifications", diff --git a/synapse/rest/client/pusher.py b/synapse/rest/client/pusher.py index 975eef2144b6..1a8f5292ac5c 100644 --- a/synapse/rest/client/pusher.py +++ b/synapse/rest/client/pusher.py @@ -126,7 +126,6 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: try: await self.pusher_pool.add_or_update_pusher( user_id=user.to_string(), - access_token=requester.access_token_id, kind=content["kind"], app_id=content["app_id"], app_display_name=content["app_display_name"], diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index 9a24f7a65526..f93d9d958136 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -568,7 +568,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore): async def add_pusher( self, user_id: str, - access_token: Optional[int], kind: str, app_id: str, app_display_name: str, @@ -581,13 +580,13 @@ async def add_pusher( profile_tag: str = "", enabled: bool = True, device_id: Optional[str] = None, + access_token_id: Optional[int] = None, ) -> None: async with self._pushers_id_gen.get_next() as stream_id: await self.db_pool.simple_upsert( table="pushers", keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, values={ - "access_token": access_token, "kind": kind, "app_display_name": app_display_name, "device_display_name": device_display_name, @@ -599,6 +598,7 @@ async def add_pusher( "id": stream_id, "enabled": enabled, "device_id": device_id, + "access_token": access_token_id, }, desc="add_pusher", ) diff --git a/synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql b/synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql new file mode 100644 index 000000000000..6bee72aca6c9 --- /dev/null +++ b/synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql @@ -0,0 +1,18 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Triggers the background update to set the device_id for pushers that don't have one. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (7402, 'set_device_id_for_pushers', '{}'); diff --git a/tests/push/test_email.py b/tests/push/test_email.py index 4ea5472eb475..4b5c96aeaea8 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -105,7 +105,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.hs.get_datastores().main.get_user_by_access_token(self.access_token) ) assert user_tuple is not None - self.token_id = user_tuple.token_id + self.device_id = user_tuple.device_id # We need to add email to account before we can create a pusher. self.get_success( @@ -117,7 +117,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: pusher = self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=self.user_id, - access_token=self.token_id, + device_id=self.device_id, kind="email", app_id="m.email", app_display_name="Email Notifications", @@ -141,7 +141,7 @@ def test_need_validated_email(self) -> None: self.get_success_or_raise( self.hs.get_pusherpool().add_or_update_pusher( user_id=self.user_id, - access_token=self.token_id, + device_id=self.device_id, kind="email", app_id="m.email", app_display_name="Email Notifications", diff --git a/tests/push/test_http.py b/tests/push/test_http.py index c280ddcdf6a9..00fee30f393c 100644 --- a/tests/push/test_http.py +++ b/tests/push/test_http.py @@ -67,13 +67,13 @@ def test_invalid_configuration(self) -> None: self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None - token_id = user_tuple.token_id + device_id = user_tuple.device_id def test_data(data: Any) -> None: self.get_failure( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", @@ -114,12 +114,12 @@ def test_sends_http(self) -> None: self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None - token_id = user_tuple.token_id + device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", @@ -235,12 +235,12 @@ def test_sends_high_priority_for_encrypted(self) -> None: self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None - token_id = user_tuple.token_id + device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", @@ -356,12 +356,12 @@ def test_sends_high_priority_for_one_to_one_only(self) -> None: self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None - token_id = user_tuple.token_id + device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", @@ -443,12 +443,12 @@ def test_sends_high_priority_for_mention(self) -> None: self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None - token_id = user_tuple.token_id + device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", @@ -521,12 +521,12 @@ def test_sends_high_priority_for_atroom(self) -> None: self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None - token_id = user_tuple.token_id + device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", @@ -628,12 +628,12 @@ def _test_push_unread_count(self) -> None: self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None - token_id = user_tuple.token_id + device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", @@ -764,12 +764,12 @@ def _set_pusher(self, user_id: str, access_token: str, enabled: bool) -> None: self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None - token_id = user_tuple.token_id + device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", @@ -778,7 +778,6 @@ def _set_pusher(self, user_id: str, access_token: str, enabled: bool) -> None: lang=None, data={"url": "http://example.com/_matrix/push/v1/notify"}, enabled=enabled, - device_id=user_tuple.device_id, ) ) diff --git a/tests/replication/test_pusher_shard.py b/tests/replication/test_pusher_shard.py index 0798b021c3d0..dcb3e6669bb9 100644 --- a/tests/replication/test_pusher_shard.py +++ b/tests/replication/test_pusher_shard.py @@ -51,12 +51,12 @@ def _create_pusher_and_send_msg(self, localpart: str) -> str: self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_dict is not None - token_id = user_dict.token_id + device_id = user_dict.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=user_id, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index 4b8f889a71ba..b4241ceaf023 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -3047,12 +3047,12 @@ def test_get_pushers(self) -> None: self.store.get_user_by_access_token(other_user_token) ) assert user_tuple is not None - token_id = user_tuple.token_id + device_id = user_tuple.device_id self.get_success( self.hs.get_pusherpool().add_or_update_pusher( user_id=self.other_user, - access_token=token_id, + device_id=device_id, kind="http", app_id="m.http", app_display_name="HTTP Push Notifications", From 4a5ed8f54a2bc41e5dd0784388cc561a536110f4 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Fri, 17 Mar 2023 15:19:00 +0100 Subject: [PATCH 2/9] Include the PusherBackgroundUpdatesStore in the list of stores loaded by the port script --- synapse/_scripts/synapse_port_db.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 2c9cbf8b275b..22e5f0d3c68f 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -67,7 +67,7 @@ MediaRepositoryBackgroundUpdateStore, ) from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore -from synapse.storage.databases.main.pusher import PusherWorkerStore +from synapse.storage.databases.main.pusher import PusherBackgroundUpdatesStore, PusherWorkerStore from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore from synapse.storage.databases.main.registration import ( RegistrationBackgroundUpdateStore, @@ -225,6 +225,7 @@ class Store( AccountDataWorkerStore, PushRuleStore, PusherWorkerStore, + PusherBackgroundUpdatesStore, PresenceBackgroundUpdateStore, ReceiptsBackgroundUpdateStore, RelationsWorkerStore, From 2f3f6a83422ca5c4b423922a0ad5c30153095ca6 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Fri, 17 Mar 2023 15:24:46 +0100 Subject: [PATCH 3/9] Fix the pushers tests --- tests/push/test_http.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/tests/push/test_http.py b/tests/push/test_http.py index 00fee30f393c..9b3bd5f78d40 100644 --- a/tests/push/test_http.py +++ b/tests/push/test_http.py @@ -894,19 +894,17 @@ def test_null_enabled(self) -> None: def test_update_different_device_access_token_device_id(self) -> None: """Tests that if we create a pusher from one device, the update it from another - device, the access token and device ID associated with the pusher stays the - same. + device, the device ID associated with the pusher stays the same. """ # Create a user with a pusher. user_id, access_token = self._make_user_with_pusher("user") - # Get the token ID for the current access token, since that's what we store in - # the pushers table. Also get the device ID from it. + # Get the device ID for the current access token, since that's what we store in + # the pushers table. user_tuple = self.get_success( self.hs.get_datastores().main.get_user_by_access_token(access_token) ) assert user_tuple is not None - token_id = user_tuple.token_id device_id = user_tuple.device_id # Generate a new access token, and update the pusher with it. @@ -919,10 +917,9 @@ def test_update_different_device_access_token_device_id(self) -> None: ) pushers: List[PusherConfig] = list(ret) - # Check that we still have one pusher, and that the access token and device ID - # associated with it didn't change. + # Check that we still have one pusher, and that the device ID associated with + # it didn't change. self.assertEqual(len(pushers), 1) - self.assertEqual(pushers[0].access_token, token_id) self.assertEqual(pushers[0].device_id, device_id) @override_config({"experimental_features": {"msc3881_enabled": True}}) From 4c39125d50d38aa8d016cb0e7de2b5b472c43b85 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Fri, 17 Mar 2023 15:42:41 +0100 Subject: [PATCH 4/9] Clear the access_token column in the background update Also adds comment in places where the access_token field is used, to make it clear that it's only useful when the background update hasn't finished yet. --- synapse/push/pusherpool.py | 9 ++++++--- synapse/storage/databases/main/pusher.py | 10 +++++++--- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 17c14030d19d..60608940bb16 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -131,9 +131,10 @@ async def add_or_update_pusher( # this app ID and pushkey. If so, we want to keep the access token and device ID # in place, since this could be one device modifying (e.g. enabling/disabling) # another device's pusher. - # Even though we're not persisting the access_token_id for new pushers anymore, - # we still need to copy existing access_token_ids over when updating a pusher, - # in case the "set_device_id_for_pushers" background update hasn't run yet. + # XXX(quenting): Even though we're not persisting the access_token_id for new + # pushers anymore, we still need to copy existing access_token_ids over when + # updating a pusher, in case the "set_device_id_for_pushers" background update + # hasn't run yet. access_token_id = None existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey( user_id, app_id, pushkey @@ -212,6 +213,8 @@ async def remove_pushers_by_access_tokens( user_id: user to remove pushers for access_tokens: access token *ids* to remove pushers for """ + # XXX(quenting): This is only needed until the "set_device_id_for_pushers" + # background update finishes tokens = set(access_tokens) for p in await self.store.get_pushers_by_user_id(user_id): if p.access_token in tokens: diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index f93d9d958136..0f57a5e0a041 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -509,7 +509,8 @@ def __init__( async def _set_device_id_for_pushers( self, progress: JsonDict, batch_size: int ) -> int: - """Background update to populate the device_id column of the pushers table.""" + """Background update to populate the device_id column and clear the access_token + column for the pushers table.""" last_pusher_id = progress.get("pusher_id", 0) def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int: @@ -538,8 +539,8 @@ def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int: table="pushers", key_names=("id",), key_values=[(row["id"],) for row in rows], - value_names=("device_id",), - value_values=[(row["device_id"],) for row in rows], + value_names=("device_id", "access_token"), + value_values=[(row["device_id"], None) for row in rows], ) self.db_pool.updates._background_update_progress_txn( @@ -598,6 +599,9 @@ async def add_pusher( "id": stream_id, "enabled": enabled, "device_id": device_id, + # XXX(quenting): We're only really persisting the access token ID + # when updating an existing pusher. This is in case the + # 'set_device_id_for_pushers' background update hasn't finished yet. "access_token": access_token_id, }, desc="add_pusher", From 8a36827f8c0c2c27083fab5866e18e961eb2d39b Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Fri, 17 Mar 2023 15:44:26 +0100 Subject: [PATCH 5/9] Lint. --- synapse/_scripts/synapse_port_db.py | 5 ++++- synapse/storage/databases/main/pusher.py | 2 +- tests/push/test_http.py | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 22e5f0d3c68f..5ef877428f1f 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -67,7 +67,10 @@ MediaRepositoryBackgroundUpdateStore, ) from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore -from synapse.storage.databases.main.pusher import PusherBackgroundUpdatesStore, PusherWorkerStore +from synapse.storage.databases.main.pusher import ( + PusherBackgroundUpdatesStore, + PusherWorkerStore, +) from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore from synapse.storage.databases.main.registration import ( RegistrationBackgroundUpdateStore, diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index 0f57a5e0a041..b2a84e1d2a9a 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -599,7 +599,7 @@ async def add_pusher( "id": stream_id, "enabled": enabled, "device_id": device_id, - # XXX(quenting): We're only really persisting the access token ID + # XXX(quenting): We're only really persisting the access token ID # when updating an existing pusher. This is in case the # 'set_device_id_for_pushers' background update hasn't finished yet. "access_token": access_token_id, diff --git a/tests/push/test_http.py b/tests/push/test_http.py index 9b3bd5f78d40..99cec0836b1d 100644 --- a/tests/push/test_http.py +++ b/tests/push/test_http.py @@ -917,7 +917,7 @@ def test_update_different_device_access_token_device_id(self) -> None: ) pushers: List[PusherConfig] = list(ret) - # Check that we still have one pusher, and that the device ID associated with + # Check that we still have one pusher, and that the device ID associated with # it didn't change. self.assertEqual(len(pushers), 1) self.assertEqual(pushers[0].device_id, device_id) From f2486721e4c65eeff1871d09074b42bcc71352fc Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Fri, 17 Mar 2023 16:37:37 +0100 Subject: [PATCH 6/9] Fix a bug where email pushers would get deleted when a user logged out of the device. --- synapse/handlers/auth.py | 15 +++++++++++++++ synapse/handlers/device.py | 4 +++- synapse/push/pusherpool.py | 16 ++++++++++------ 3 files changed, 28 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 89e861db0625..4e5afedd50c5 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -1503,6 +1503,14 @@ async def delete_access_token(self, access_token: str) -> None: access_token=access_token, ) + # delete pushers associated with this access token + # XXX(quenting): This is only needed until the 'set_device_id_for_pushers' + # background update completes. + if token.token_id is not None: + await self.hs.get_pusherpool().remove_http_pushers_by_access_tokens( + token.user_id, (token.token_id,) + ) + async def delete_access_tokens_for_user( self, user_id: str, @@ -1528,6 +1536,13 @@ async def delete_access_tokens_for_user( user_id=user_id, device_id=device_id, access_token=token ) + # delete pushers associated with the access tokens + # XXX(quenting): This is only needed until the 'set_device_id_for_pushers' + # background update completes. + await self.hs.get_pusherpool().remove_http_pushers_by_access_tokens( + user_id, (token_id for _, token_id, _ in tokens_and_devices) + ) + async def add_threepid( self, user_id: str, medium: str, address: str, validated_at: int ) -> None: diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 9ded6389acdb..f5cdb88bb538 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -503,7 +503,9 @@ async def delete_devices(self, user_id: str, device_ids: List[str]) -> None: else: raise - await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids) + await self.hs.get_pusherpool().remove_http_pushers_by_devices( + user_id, device_ids + ) # Delete data specific to each device. Not optimised as it is not # considered as part of a critical path. diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 60608940bb16..9db8c7f86d7a 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -203,10 +203,10 @@ async def remove_pushers_by_app_id_and_pushkey_not_user( ) await self.remove_pusher(p.app_id, p.pushkey, p.user_name) - async def remove_pushers_by_access_tokens( + async def remove_http_pushers_by_access_tokens( self, user_id: str, access_tokens: Iterable[int] ) -> None: - """Remove the pushers for a given user corresponding to a set of + """Remove the HTTP pushers for a given user corresponding to a set of access_tokens. Args: @@ -217,7 +217,7 @@ async def remove_pushers_by_access_tokens( # background update finishes tokens = set(access_tokens) for p in await self.store.get_pushers_by_user_id(user_id): - if p.access_token in tokens: + if p.kind == "http" and p.access_token in tokens: logger.info( "Removing pusher for app id %s, pushkey %s, user %s", p.app_id, @@ -226,10 +226,10 @@ async def remove_pushers_by_access_tokens( ) await self.remove_pusher(p.app_id, p.pushkey, p.user_name) - async def remove_pushers_by_devices( + async def remove_http_pushers_by_devices( self, user_id: str, devices: Iterable[str] ) -> None: - """Remove the pushers for a given user corresponding to a set of devices + """Remove the HTTP pushers for a given user corresponding to a set of devices Args: user_id: user to remove pushers for @@ -237,7 +237,11 @@ async def remove_pushers_by_devices( """ device_ids = set(devices) for p in await self.store.get_pushers_by_user_id(user_id): - if p.device_id is not None and p.device_id in device_ids: + if ( + p.kind == "http" + and p.device_id is not None + and p.device_id in device_ids + ): logger.info( "Removing pusher for app id %s, pushkey %s, user %s", p.app_id, From 6d9fba680484e188108cc88557d71c2d4858cb48 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Fri, 17 Mar 2023 16:53:24 +0100 Subject: [PATCH 7/9] Newsfile. --- changelog.d/15280.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15280.bugfix diff --git a/changelog.d/15280.bugfix b/changelog.d/15280.bugfix new file mode 100644 index 000000000000..2eec6cb97ed9 --- /dev/null +++ b/changelog.d/15280.bugfix @@ -0,0 +1 @@ +Fix a bug where email notifications would get disabled after logging out. From e9f1d94c555a51c9e3bb5a72c52a38d83cd37803 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Thu, 23 Mar 2023 18:21:08 +0100 Subject: [PATCH 8/9] Revert the bugfix on email pushers & incorporate feedback from PR review --- changelog.d/15280.bugfix | 1 - synapse/handlers/auth.py | 4 +-- synapse/handlers/device.py | 4 +-- synapse/push/pusherpool.py | 22 ++++++------- synapse/storage/databases/main/pusher.py | 32 +++++++++++++++---- ...02_set_device_id_for_pushers_bg_update.sql | 3 +- 6 files changed, 39 insertions(+), 27 deletions(-) delete mode 100644 changelog.d/15280.bugfix diff --git a/changelog.d/15280.bugfix b/changelog.d/15280.bugfix deleted file mode 100644 index 2eec6cb97ed9..000000000000 --- a/changelog.d/15280.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix a bug where email notifications would get disabled after logging out. diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 4e5afedd50c5..1e89447044f5 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -1507,7 +1507,7 @@ async def delete_access_token(self, access_token: str) -> None: # XXX(quenting): This is only needed until the 'set_device_id_for_pushers' # background update completes. if token.token_id is not None: - await self.hs.get_pusherpool().remove_http_pushers_by_access_tokens( + await self.hs.get_pusherpool().remove_pushers_by_access_tokens( token.user_id, (token.token_id,) ) @@ -1539,7 +1539,7 @@ async def delete_access_tokens_for_user( # delete pushers associated with the access tokens # XXX(quenting): This is only needed until the 'set_device_id_for_pushers' # background update completes. - await self.hs.get_pusherpool().remove_http_pushers_by_access_tokens( + await self.hs.get_pusherpool().remove_pushers_by_access_tokens( user_id, (token_id for _, token_id, _ in tokens_and_devices) ) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index f5cdb88bb538..9ded6389acdb 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -503,9 +503,7 @@ async def delete_devices(self, user_id: str, device_ids: List[str]) -> None: else: raise - await self.hs.get_pusherpool().remove_http_pushers_by_devices( - user_id, device_ids - ) + await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids) # Delete data specific to each device. Not optimised as it is not # considered as part of a critical path. diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 9db8c7f86d7a..836daccfeda6 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -25,7 +25,7 @@ from synapse.push import Pusher, PusherConfig, PusherConfigException from synapse.push.pusher import PusherFactory from synapse.replication.http.push import ReplicationRemovePusherRestServlet -from synapse.types import JsonDict, RoomStreamToken +from synapse.types import JsonDict, RoomStreamToken, StrCollection from synapse.util.async_helpers import concurrently_execute from synapse.util.threepids import canonicalise_email @@ -128,9 +128,9 @@ async def add_or_update_pusher( last_stream_ordering = self.store.get_room_max_stream_ordering() # Before we actually persist the pusher, we check if the user already has one - # this app ID and pushkey. If so, we want to keep the access token and device ID - # in place, since this could be one device modifying (e.g. enabling/disabling) - # another device's pusher. + # for this app ID and pushkey. If so, we want to keep the access token and + # device ID in place, since this could be one device modifying + # (e.g. enabling/disabling) another device's pusher. # XXX(quenting): Even though we're not persisting the access_token_id for new # pushers anymore, we still need to copy existing access_token_ids over when # updating a pusher, in case the "set_device_id_for_pushers" background update @@ -203,7 +203,7 @@ async def remove_pushers_by_app_id_and_pushkey_not_user( ) await self.remove_pusher(p.app_id, p.pushkey, p.user_name) - async def remove_http_pushers_by_access_tokens( + async def remove_pushers_by_access_tokens( self, user_id: str, access_tokens: Iterable[int] ) -> None: """Remove the HTTP pushers for a given user corresponding to a set of @@ -217,7 +217,7 @@ async def remove_http_pushers_by_access_tokens( # background update finishes tokens = set(access_tokens) for p in await self.store.get_pushers_by_user_id(user_id): - if p.kind == "http" and p.access_token in tokens: + if p.access_token in tokens: logger.info( "Removing pusher for app id %s, pushkey %s, user %s", p.app_id, @@ -226,8 +226,8 @@ async def remove_http_pushers_by_access_tokens( ) await self.remove_pusher(p.app_id, p.pushkey, p.user_name) - async def remove_http_pushers_by_devices( - self, user_id: str, devices: Iterable[str] + async def remove_pushers_by_devices( + self, user_id: str, devices: StrCollection ) -> None: """Remove the HTTP pushers for a given user corresponding to a set of devices @@ -237,11 +237,7 @@ async def remove_http_pushers_by_devices( """ device_ids = set(devices) for p in await self.store.get_pushers_by_user_id(user_id): - if ( - p.kind == "http" - and p.device_id is not None - and p.device_id in device_ids - ): + if p.device_id in device_ids: logger.info( "Removing pusher for app id %s, pushkey %s, user %s", p.app_id, diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index b2a84e1d2a9a..ab76b754e0e8 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -509,20 +509,24 @@ def __init__( async def _set_device_id_for_pushers( self, progress: JsonDict, batch_size: int ) -> int: - """Background update to populate the device_id column and clear the access_token - column for the pushers table.""" + """ + Background update to populate the device_id column and clear the access_token + column for the pushers table. + """ last_pusher_id = progress.get("pusher_id", 0) def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int: txn.execute( """ - SELECT p.id, at.device_id + SELECT + p.id AS pusher_id, + p.device_id AS pusher_device_id, + at.device_id AS token_device_id FROM pushers AS p - INNER JOIN access_tokens AS at + LEFT JOIN access_tokens AS at ON p.access_token = at.id WHERE p.access_token IS NOT NULL - AND at.device_id IS NOT NULL AND p.id > ? ORDER BY p.id LIMIT ? @@ -534,13 +538,27 @@ def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int: if len(rows) == 0: return 0 + # The reason we're clearing the access_token column here is a bit subtle. + # When a user logs out, we: + # (1) delete the access token + # (2) delete the device + # + # Ideally, we would delete the pushers only via its link to the device + # during (2), but since this background update might not have fully run yet, + # we're still deleting the pushers via the access token during (1). self.db_pool.simple_update_many_txn( txn=txn, table="pushers", key_names=("id",), - key_values=[(row["id"],) for row in rows], + key_values=[(row["pusher_id"],) for row in rows], value_names=("device_id", "access_token"), - value_values=[(row["device_id"], None) for row in rows], + # If there was already a device_id on the pusher, we only want to clear + # the access_token column, so we keep the existing device_id. Otherwise, + # we set the device_id we got from joining the access_tokens table. + value_values=[ + (row["pusher_device_id"] or row["token_device_id"], None) + for row in rows + ], ) self.db_pool.updates._background_update_progress_txn( diff --git a/synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql b/synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql index 6bee72aca6c9..1367fb626733 100644 --- a/synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql +++ b/synapse/storage/schema/main/delta/74/02_set_device_id_for_pushers_bg_update.sql @@ -13,6 +13,7 @@ * limitations under the License. */ --- Triggers the background update to set the device_id for pushers that don't have one. +-- Triggers the background update to set the device_id for pushers +-- that don't have one, and clear the access_token column. INSERT INTO background_updates (ordering, update_name, progress_json) VALUES (7402, 'set_device_id_for_pushers', '{}'); From 789f9c995468630dfe24926436eac593f89779d5 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Thu, 23 Mar 2023 18:43:37 +0100 Subject: [PATCH 9/9] Apply suggestions from code review Co-authored-by: Patrick Cloke --- synapse/push/pusherpool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 836daccfeda6..6517e3566fae 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -206,7 +206,7 @@ async def remove_pushers_by_app_id_and_pushkey_not_user( async def remove_pushers_by_access_tokens( self, user_id: str, access_tokens: Iterable[int] ) -> None: - """Remove the HTTP pushers for a given user corresponding to a set of + """Remove the pushers for a given user corresponding to a set of access_tokens. Args: @@ -229,7 +229,7 @@ async def remove_pushers_by_access_tokens( async def remove_pushers_by_devices( self, user_id: str, devices: StrCollection ) -> None: - """Remove the HTTP pushers for a given user corresponding to a set of devices + """Remove the pushers for a given user corresponding to a set of devices Args: user_id: user to remove pushers for