Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Track currently syncing users by device for presence #16172

Merged
merged 6 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16172.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Track per-device information in the presence code.
145 changes: 94 additions & 51 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"""
import abc
import contextlib
import itertools
import logging
from bisect import bisect
from contextlib import contextmanager
Expand Down Expand Up @@ -188,7 +189,9 @@ async def user_syncing(
"""

@abc.abstractmethod
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
def get_currently_syncing_users_for_replication(
self,
) -> Iterable[Tuple[str, Optional[str]]]:
"""Get an iterable of syncing users on this worker, to send to the presence handler
clokep marked this conversation as resolved.
Show resolved Hide resolved

This is called when a replication connection is established. It should return
Expand Down Expand Up @@ -284,7 +287,12 @@ async def bump_presence_active_time(
"""

async def update_external_syncs_row( # noqa: B027 (no-op by design)
self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
self,
process_id: str,
user_id: str,
device_id: Optional[str],
is_syncing: bool,
sync_time_msec: int,
) -> None:
"""Update the syncing users for an external process as a delta.

Expand All @@ -295,6 +303,7 @@ async def update_external_syncs_row( # noqa: B027 (no-op by design)
syncing against. This allows synapse to process updates
as user start and stop syncing against a given process.
user_id: The user who has started or stopped syncing
device_id: The user's device that has started or stopped syncing
is_syncing: Whether or not the user is now syncing
sync_time_msec: Time in ms when the user was last syncing
"""
Expand Down Expand Up @@ -425,16 +434,18 @@ def __init__(self, hs: "HomeServer"):
hs.config.worker.writers.presence,
)

# The number of ongoing syncs on this process, by user id.
# The number of ongoing syncs on this process, by (user ID, device ID).
# Empty if _presence_enabled is false.
self._user_to_num_current_syncs: Dict[str, int] = {}
self._user_device_to_num_current_syncs: Dict[
Tuple[str, Optional[str]], int
] = {}

self.notifier = hs.get_notifier()
self.instance_id = hs.get_instance_id()

# user_id -> last_sync_ms. Lists the users that have stopped syncing but
# we haven't notified the presence writer of that yet
self.users_going_offline: Dict[str, int] = {}
# (user_id, device_id) -> last_sync_ms. Lists the devices that have stopped
# syncing but we haven't notified the presence writer of that yet
self._user_devices_going_offline: Dict[Tuple[str, Optional[str]], int] = {}

self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
self._set_state_client = ReplicationPresenceSetState.make_client(hs)
Expand All @@ -457,39 +468,47 @@ async def _on_shutdown(self) -> None:
ClearUserSyncsCommand(self.instance_id)
)

def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None:
def send_user_sync(
self,
user_id: str,
device_id: Optional[str],
is_syncing: bool,
last_sync_ms: int,
) -> None:
if self._presence_enabled:
self.hs.get_replication_command_handler().send_user_sync(
self.instance_id, user_id, is_syncing, last_sync_ms
self.instance_id, user_id, device_id, is_syncing, last_sync_ms
)

def mark_as_coming_online(self, user_id: str) -> None:
def mark_as_coming_online(self, user_id: str, device_id: Optional[str]) -> None:
"""A user has started syncing. Send a UserSync to the presence writer,
unless they had recently stopped syncing.
"""
going_offline = self.users_going_offline.pop(user_id, None)
going_offline = self._user_devices_going_offline.pop((user_id, device_id), None)
if not going_offline:
# Safe to skip because we haven't yet told the presence writer they
# were offline
self.send_user_sync(user_id, True, self.clock.time_msec())
self.send_user_sync(user_id, device_id, True, self.clock.time_msec())

def mark_as_going_offline(self, user_id: str) -> None:
def mark_as_going_offline(self, user_id: str, device_id: Optional[str]) -> None:
"""A user has stopped syncing. We wait before notifying the presence
writer as its likely they'll come back soon. This allows us to avoid
sending a stopped syncing immediately followed by a started syncing
notification to the presence writer
"""
self.users_going_offline[user_id] = self.clock.time_msec()
self._user_devices_going_offline[(user_id, device_id)] = self.clock.time_msec()

def send_stop_syncing(self) -> None:
"""Check if there are any users who have stopped syncing a while ago and
haven't come back yet. If there are poke the presence writer about them.
"""
now = self.clock.time_msec()
for user_id, last_sync_ms in list(self.users_going_offline.items()):
for (user_id, device_id), last_sync_ms in list(
self._user_devices_going_offline.items()
):
if now - last_sync_ms > UPDATE_SYNCING_USERS_MS:
self.users_going_offline.pop(user_id, None)
self.send_user_sync(user_id, False, last_sync_ms)
self._user_devices_going_offline.pop((user_id, device_id), None)
self.send_user_sync(user_id, device_id, False, last_sync_ms)

async def user_syncing(
self,
Expand All @@ -515,23 +534,23 @@ async def user_syncing(
is_sync=True,
)

curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
self._user_to_num_current_syncs[user_id] = curr_sync + 1
curr_sync = self._user_device_to_num_current_syncs.get((user_id, device_id), 0)
self._user_device_to_num_current_syncs[(user_id, device_id)] = curr_sync + 1

# If this is the first in-flight sync, notify replication
if self._user_to_num_current_syncs[user_id] == 1:
self.mark_as_coming_online(user_id)
if self._user_device_to_num_current_syncs[(user_id, device_id)] == 1:
self.mark_as_coming_online(user_id, device_id)

def _end() -> None:
# We check that the user_id is in user_to_num_current_syncs because
# user_to_num_current_syncs may have been cleared if we are
# shutting down.
if user_id in self._user_to_num_current_syncs:
self._user_to_num_current_syncs[user_id] -= 1
if (user_id, device_id) in self._user_device_to_num_current_syncs:
self._user_device_to_num_current_syncs[(user_id, device_id)] -= 1

# If there are no more in-flight syncs, notify replication
if self._user_to_num_current_syncs[user_id] == 0:
self.mark_as_going_offline(user_id)
if self._user_device_to_num_current_syncs[(user_id, device_id)] == 0:
self.mark_as_going_offline(user_id, device_id)

@contextlib.contextmanager
def _user_syncing() -> Generator[None, None, None]:
Expand Down Expand Up @@ -598,10 +617,12 @@ async def process_replication_rows(
# If this is a federation sender, notify about presence updates.
await self.maybe_send_presence_to_interested_destinations(state_to_notify)

def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
def get_currently_syncing_users_for_replication(
self,
) -> Iterable[Tuple[str, Optional[str]]]:
return [
user_id
for user_id, count in self._user_to_num_current_syncs.items()
user_id_device_id
for user_id_device_id, count in self._user_device_to_num_current_syncs.items()
if count > 0
]

Expand Down Expand Up @@ -723,17 +744,23 @@ def __init__(self, hs: "HomeServer"):

# Keeps track of the number of *ongoing* syncs on this process. While
# this is non zero a user will never go offline.
self.user_to_num_current_syncs: Dict[str, int] = {}
self._user_device_to_num_current_syncs: Dict[
Tuple[str, Optional[str]], int
] = {}

# Keeps track of the number of *ongoing* syncs on other processes.
#
# While any sync is ongoing on another process the user will never
# go offline.
#
# Each process has a unique identifier and an update frequency. If
# no update is received from that process within the update period then
# we assume that all the sync requests on that process have stopped.
# Stored as a dict from process_id to set of user_id, and a dict of
# process_id to millisecond timestamp last updated.
self.external_process_to_current_syncs: Dict[str, Set[str]] = {}
# Stored as a dict from process_id to set of (user_id, device_id), and
# a dict of process_id to millisecond timestamp last updated.
self.external_process_to_current_syncs: Dict[
str, Set[Tuple[str, Optional[str]]]
] = {}
self.external_process_last_updated_ms: Dict[str, int] = {}

self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
Expand Down Expand Up @@ -938,7 +965,10 @@ async def _handle_timeouts(self) -> None:
# that were syncing on that process to see if they need to be timed
# out.
users_to_check.update(
self.external_process_to_current_syncs.pop(process_id, ())
user_id
for user_id, device_id in self.external_process_to_current_syncs.pop(
process_id, ()
)
)
self.external_process_last_updated_ms.pop(process_id)

Expand All @@ -951,11 +981,13 @@ async def _handle_timeouts(self) -> None:

syncing_user_ids = {
user_id
for user_id, count in self.user_to_num_current_syncs.items()
for (user_id, _), count in self._user_device_to_num_current_syncs.items()
if count
}
for user_ids in self.external_process_to_current_syncs.values():
syncing_user_ids.update(user_ids)
for user_id, _ in itertools.chain(
*self.external_process_to_current_syncs.values()
):
syncing_user_ids.add(user_id)
clokep marked this conversation as resolved.
Show resolved Hide resolved

changes = handle_timeouts(
states,
Expand Down Expand Up @@ -1013,8 +1045,8 @@ async def user_syncing(
if not affect_presence or not self._presence_enabled:
return _NullContextManager()

curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
curr_sync = self._user_device_to_num_current_syncs.get((user_id, device_id), 0)
self._user_device_to_num_current_syncs[(user_id, device_id)] = curr_sync + 1

# Note that this causes last_active_ts to be incremented which is not
# what the spec wants.
Expand All @@ -1027,7 +1059,7 @@ async def user_syncing(

async def _end() -> None:
try:
self.user_to_num_current_syncs[user_id] -= 1
self._user_device_to_num_current_syncs[(user_id, device_id)] -= 1

prev_state = await self.current_state_for_user(user_id)
await self._update_states(
Expand All @@ -1049,12 +1081,19 @@ def _user_syncing() -> Generator[None, None, None]:

return _user_syncing()

def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
def get_currently_syncing_users_for_replication(
self,
) -> Iterable[Tuple[str, Optional[str]]]:
# since we are the process handling presence, there is nothing to do here.
return []

async def update_external_syncs_row(
self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
self,
process_id: str,
user_id: str,
device_id: Optional[str],
is_syncing: bool,
sync_time_msec: int,
) -> None:
"""Update the syncing users for an external process as a delta.

Expand All @@ -1063,6 +1102,7 @@ async def update_external_syncs_row(
syncing against. This allows synapse to process updates
as user start and stop syncing against a given process.
user_id: The user who has started or stopped syncing
device_id: The user's device that has started or stopped syncing
is_syncing: Whether or not the user is now syncing
sync_time_msec: Time in ms when the user was last syncing
"""
Expand All @@ -1073,26 +1113,27 @@ async def update_external_syncs_row(
process_id, set()
)

# USER_SYNC is sent when a user starts or stops syncing on a remote
# process. (But only for the initial and last device.)
# USER_SYNC is sent when a user's device starts or stops syncing on
# a remote # process. (But only for the initial and last sync for that
# device.)
#
# When a user *starts* syncing it also calls set_state(...) which
# When a device *starts* syncing it also calls set_state(...) which
# will update the state, last_active_ts, and last_user_sync_ts.
# Simply ensure the user is tracked as syncing in this case.
# Simply ensure the user & device is tracked as syncing in this case.
#
# When a user *stops* syncing, update the last_user_sync_ts and mark
# When a device *stops* syncing, update the last_user_sync_ts and mark
# them as no longer syncing. Note this doesn't quite match the
# monolith behaviour, which updates last_user_sync_ts at the end of
# every sync, not just the last in-flight sync.
if is_syncing and user_id not in process_presence:
process_presence.add(user_id)
elif not is_syncing and user_id in process_presence:
if is_syncing and (user_id, device_id) not in process_presence:
process_presence.add((user_id, device_id))
elif not is_syncing and (user_id, device_id) in process_presence:
new_state = prev_state.copy_and_replace(
last_user_sync_ts=sync_time_msec
)
await self._update_states([new_state])

process_presence.discard(user_id)
process_presence.discard((user_id, device_id))

self.external_process_last_updated_ms[process_id] = self.clock.time_msec()

Expand All @@ -1106,7 +1147,9 @@ async def update_external_syncs_clear(self, process_id: str) -> None:
process_presence = self.external_process_to_current_syncs.pop(
process_id, set()
)
prev_states = await self.current_state_for_users(process_presence)
prev_states = await self.current_state_for_users(
{user_id for user_id, device_id in process_presence}
)
time_now_ms = self.clock.time_msec()

await self._update_states(
Expand Down
17 changes: 14 additions & 3 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,27 +267,38 @@ class UserSyncCommand(Command):
NAME = "USER_SYNC"

def __init__(
self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
self,
instance_id: str,
user_id: str,
device_id: Optional[str],
is_syncing: bool,
last_sync_ms: int,
):
self.instance_id = instance_id
self.user_id = user_id
self.device_id = device_id
self.is_syncing = is_syncing
self.last_sync_ms = last_sync_ms

@classmethod
def from_line(cls: Type["UserSyncCommand"], line: str) -> "UserSyncCommand":
instance_id, user_id, state, last_sync_ms = line.split(" ", 3)
device_id: Optional[str]
instance_id, user_id, device_id, state, last_sync_ms = line.split(" ", 4)

if device_id == "None":
device_id = None
Comment on lines +288 to +289
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a user has the exact device ID of "None" then this will not work quite right. Maybe we should use an empty string instead (which I think is an invalid device ID)? Any thoughts welcome!


if state not in ("start", "end"):
raise Exception("Invalid USER_SYNC state %r" % (state,))

return cls(instance_id, user_id, state == "start", int(last_sync_ms))
return cls(instance_id, user_id, device_id, state == "start", int(last_sync_ms))

def to_line(self) -> str:
return " ".join(
(
self.instance_id,
self.user_id,
str(self.device_id),
"start" if self.is_syncing else "end",
str(self.last_sync_ms),
)
Comment on lines 283 to 304
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned previously:

I think changing this will mean that the presence stream will somewhat break until all workers are upgraded or downgraded?

Expand Down
Loading
Loading