Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Make cleaning up pushers depend on the device_id instead of the token_id #15280

Merged
merged 10 commits into from
Mar 24, 2023
1 change: 1 addition & 0 deletions changelog.d/15280.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make the pushers rely on the `device_id` instead of the `access_token_id` for various operations.
6 changes: 5 additions & 1 deletion synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@
MediaRepositoryBackgroundUpdateStore,
)
from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore
from synapse.storage.databases.main.pusher import PusherWorkerStore
from synapse.storage.databases.main.pusher import (
PusherBackgroundUpdatesStore,
PusherWorkerStore,
)
from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore
from synapse.storage.databases.main.registration import (
RegistrationBackgroundUpdateStore,
Expand Down Expand Up @@ -226,6 +229,7 @@ class Store(
AccountDataWorkerStore,
PushRuleStore,
PusherWorkerStore,
PusherBackgroundUpdatesStore,
PresenceBackgroundUpdateStore,
ReceiptsBackgroundUpdateStore,
RelationsWorkerStore,
Expand Down
8 changes: 6 additions & 2 deletions synapse/handlers/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -1504,8 +1504,10 @@ async def delete_access_token(self, access_token: str) -> None:
)

# delete pushers associated with this access token
# XXX(quenting): This is only needed until the 'set_device_id_for_pushers'
# background update completes.
if token.token_id is not None:
await self.hs.get_pusherpool().remove_pushers_by_access_token(
await self.hs.get_pusherpool().remove_pushers_by_access_tokens(
token.user_id, (token.token_id,)
)

Expand Down Expand Up @@ -1535,7 +1537,9 @@ async def delete_access_tokens_for_user(
)

# delete pushers associated with the access tokens
await self.hs.get_pusherpool().remove_pushers_by_access_token(
# XXX(quenting): This is only needed until the 'set_device_id_for_pushers'
# background update completes.
await self.hs.get_pusherpool().remove_pushers_by_access_tokens(
user_id, (token_id for _, token_id, _ in tokens_and_devices)
)

Expand Down
2 changes: 2 additions & 0 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,8 @@ async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
else:
raise

await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids)

# Delete data specific to each device. Not optimised as it is not
# considered as part of a critical path.
for device_id in device_ids:
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,11 +1013,11 @@ async def _register_email_threepid(
user_tuple = await self.store.get_user_by_access_token(token)
# The token better still exist.
assert user_tuple
token_id = user_tuple.token_id
device_id = user_tuple.device_id

await self.pusher_pool.add_or_update_pusher(
user_id=user_id,
access_token=token_id,
device_id=device_id,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we not provide a device_id here? (Otherwise, why is it Optional?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the answer to this is that it is optional, but should almost always be provided. It seems the only time it isn't is when a user is registered via the admin API w/ an email.

kind="email",
app_id="m.email",
app_display_name="Email Notifications",
Expand Down
7 changes: 6 additions & 1 deletion synapse/push/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class PusherConfig:

id: Optional[str]
user_name: str
access_token: Optional[int]

profile_tag: str
kind: str
app_id: str
Expand All @@ -119,6 +119,11 @@ class PusherConfig:
enabled: bool
device_id: Optional[str]

# XXX(quenting): The access_token is not persisted anymore for new pushers, but we
# keep it when reading from the database, so that we don't get stale pushers
# while the "set_device_id_for_pushers" background update is running.
access_token: Optional[int]

def as_dict(self) -> Dict[str, Any]:
"""Information that can be retrieved about a pusher after creation."""
return {
Expand Down
58 changes: 42 additions & 16 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from synapse.push import Pusher, PusherConfig, PusherConfigException
from synapse.push.pusher import PusherFactory
from synapse.replication.http.push import ReplicationRemovePusherRestServlet
from synapse.types import JsonDict, RoomStreamToken
from synapse.types import JsonDict, RoomStreamToken, StrCollection
from synapse.util.async_helpers import concurrently_execute
from synapse.util.threepids import canonicalise_email

Expand Down Expand Up @@ -97,7 +97,6 @@ def start(self) -> None:
async def add_or_update_pusher(
self,
user_id: str,
access_token: Optional[int],
clokep marked this conversation as resolved.
Show resolved Hide resolved
kind: str,
app_id: str,
app_display_name: str,
Expand Down Expand Up @@ -128,6 +127,22 @@ async def add_or_update_pusher(
# stream ordering, so it will process pushes from this point onwards.
last_stream_ordering = self.store.get_room_max_stream_ordering()

# Before we actually persist the pusher, we check if the user already has one
# for this app ID and pushkey. If so, we want to keep the access token and
# device ID in place, since this could be one device modifying
# (e.g. enabling/disabling) another device's pusher.
# XXX(quenting): Even though we're not persisting the access_token_id for new
# pushers anymore, we still need to copy existing access_token_ids over when
# updating a pusher, in case the "set_device_id_for_pushers" background update
# hasn't run yet.
access_token_id = None
existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
user_id, app_id, pushkey
)
if existing_config:
device_id = existing_config.device_id
access_token_id = existing_config.access_token
Comment on lines +138 to +144
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this causes a race with the background update:

  1. We fetch an existing pusher config where device_id is None, but access_token_id is set.
  2. The background update processes this row, adding the device_id.
  3. We call add_pusher and overwrite the device_id back to None.

Is that possible?

Copy link
Member Author

@sandhose sandhose Mar 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess, but it is very unlikely, because unless it happens at the same time as the very last batch of the background update, it will be cleared right after by it.

Also, this would only impact old pushers where we don't have the device_id inserted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked about this and it should be OK since when we add the pusher back (step 3) it actually gets a new ID. So this is an extremely unlikely event to occur.


# we try to create the pusher just to validate the config: it
# will then get pulled out of the database,
# recreated, added and started: this means we have only one
Expand All @@ -136,7 +151,6 @@ async def add_or_update_pusher(
PusherConfig(
id=None,
user_name=user_id,
access_token=access_token,
profile_tag=profile_tag,
kind=kind,
app_id=app_id,
Expand All @@ -151,23 +165,12 @@ async def add_or_update_pusher(
failing_since=None,
enabled=enabled,
device_id=device_id,
access_token=access_token_id,
)
)

# Before we actually persist the pusher, we check if the user already has one
# this app ID and pushkey. If so, we want to keep the access token and device ID
# in place, since this could be one device modifying (e.g. enabling/disabling)
# another device's pusher.
existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
user_id, app_id, pushkey
)
if existing_config:
access_token = existing_config.access_token
device_id = existing_config.device_id

await self.store.add_pusher(
user_id=user_id,
access_token=access_token,
kind=kind,
app_id=app_id,
app_display_name=app_display_name,
Expand All @@ -180,6 +183,7 @@ async def add_or_update_pusher(
profile_tag=profile_tag,
enabled=enabled,
device_id=device_id,
access_token_id=access_token_id,
)
pusher = await self.process_pusher_change_by_id(app_id, pushkey, user_id)

Expand All @@ -199,7 +203,7 @@ async def remove_pushers_by_app_id_and_pushkey_not_user(
)
await self.remove_pusher(p.app_id, p.pushkey, p.user_name)

async def remove_pushers_by_access_token(
async def remove_pushers_by_access_tokens(
self, user_id: str, access_tokens: Iterable[int]
) -> None:
"""Remove the pushers for a given user corresponding to a set of
Expand All @@ -209,6 +213,8 @@ async def remove_pushers_by_access_token(
user_id: user to remove pushers for
access_tokens: access token *ids* to remove pushers for
"""
# XXX(quenting): This is only needed until the "set_device_id_for_pushers"
# background update finishes
tokens = set(access_tokens)
for p in await self.store.get_pushers_by_user_id(user_id):
if p.access_token in tokens:
Expand All @@ -220,6 +226,26 @@ async def remove_pushers_by_access_token(
)
await self.remove_pusher(p.app_id, p.pushkey, p.user_name)

async def remove_pushers_by_devices(
self, user_id: str, devices: StrCollection
) -> None:
"""Remove the pushers for a given user corresponding to a set of devices

Args:
user_id: user to remove pushers for
devices: device IDs to remove pushers for
"""
device_ids = set(devices)
for p in await self.store.get_pushers_by_user_id(user_id):
if p.device_id in device_ids:
logger.info(
"Removing pusher for app id %s, pushkey %s, user %s",
p.app_id,
p.pushkey,
p.user_name,
)
await self.remove_pusher(p.app_id, p.pushkey, p.user_name)

def on_new_notifications(self, max_token: RoomStreamToken) -> None:
if not self.pushers:
# nothing to do here.
Expand Down
1 change: 0 additions & 1 deletion synapse/rest/admin/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,6 @@ async def on_PUT(
):
await self.pusher_pool.add_or_update_pusher(
user_id=user_id,
access_token=None,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
Expand Down
1 change: 0 additions & 1 deletion synapse/rest/client/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
try:
await self.pusher_pool.add_or_update_pusher(
user_id=user.to_string(),
access_token=requester.access_token_id,
kind=content["kind"],
app_id=content["app_id"],
app_display_name=content["app_display_name"],
Expand Down
40 changes: 31 additions & 9 deletions synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,19 +509,24 @@ def __init__(
async def _set_device_id_for_pushers(
self, progress: JsonDict, batch_size: int
) -> int:
"""Background update to populate the device_id column of the pushers table."""
"""
Background update to populate the device_id column and clear the access_token
column for the pushers table.
"""
last_pusher_id = progress.get("pusher_id", 0)

def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int:
txn.execute(
"""
SELECT p.id, at.device_id
SELECT
p.id AS pusher_id,
p.device_id AS pusher_device_id,
at.device_id AS token_device_id
FROM pushers AS p
INNER JOIN access_tokens AS at
LEFT JOIN access_tokens AS at
ON p.access_token = at.id
WHERE
p.access_token IS NOT NULL
AND at.device_id IS NOT NULL
AND p.id > ?
ORDER BY p.id
LIMIT ?
Expand All @@ -533,13 +538,27 @@ def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int:
if len(rows) == 0:
return 0

# The reason we're clearing the access_token column here is a bit subtle.
# When a user logs out, we:
# (1) delete the access token
# (2) delete the device
#
# Ideally, we would delete the pushers only via its link to the device
# during (2), but since this background update might not have fully run yet,
# we're still deleting the pushers via the access token during (1).
self.db_pool.simple_update_many_txn(
txn=txn,
table="pushers",
key_names=("id",),
key_values=[(row["id"],) for row in rows],
value_names=("device_id",),
value_values=[(row["device_id"],) for row in rows],
key_values=[(row["pusher_id"],) for row in rows],
value_names=("device_id", "access_token"),
# If there was already a device_id on the pusher, we only want to clear
# the access_token column, so we keep the existing device_id. Otherwise,
# we set the device_id we got from joining the access_tokens table.
value_values=[
(row["pusher_device_id"] or row["token_device_id"], None)
for row in rows
],
)

self.db_pool.updates._background_update_progress_txn(
Expand Down Expand Up @@ -568,7 +587,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
async def add_pusher(
self,
user_id: str,
access_token: Optional[int],
kind: str,
app_id: str,
app_display_name: str,
Expand All @@ -581,13 +599,13 @@ async def add_pusher(
profile_tag: str = "",
enabled: bool = True,
device_id: Optional[str] = None,
access_token_id: Optional[int] = None,
) -> None:
async with self._pushers_id_gen.get_next() as stream_id:
await self.db_pool.simple_upsert(
table="pushers",
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
values={
"access_token": access_token,
"kind": kind,
"app_display_name": app_display_name,
"device_display_name": device_display_name,
Expand All @@ -599,6 +617,10 @@ async def add_pusher(
"id": stream_id,
"enabled": enabled,
"device_id": device_id,
# XXX(quenting): We're only really persisting the access token ID
# when updating an existing pusher. This is in case the
# 'set_device_id_for_pushers' background update hasn't finished yet.
"access_token": access_token_id,
},
desc="add_pusher",
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/* Copyright 2023 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Triggers the background update to set the device_id for pushers
-- that don't have one, and clear the access_token column.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(7402, 'set_device_id_for_pushers', '{}');
6 changes: 3 additions & 3 deletions tests/push/test_email.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.hs.get_datastores().main.get_user_by_access_token(self.access_token)
)
assert user_tuple is not None
self.token_id = user_tuple.token_id
self.device_id = user_tuple.device_id

# We need to add email to account before we can create a pusher.
self.get_success(
Expand All @@ -117,7 +117,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
pusher = self.get_success(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=self.user_id,
access_token=self.token_id,
device_id=self.device_id,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
Expand All @@ -141,7 +141,7 @@ def test_need_validated_email(self) -> None:
self.get_success_or_raise(
self.hs.get_pusherpool().add_or_update_pusher(
user_id=self.user_id,
access_token=self.token_id,
device_id=self.device_id,
kind="email",
app_id="m.email",
app_display_name="Email Notifications",
Expand Down
Loading