Skip to content

Commit

Permalink
Stop the master relaying USER_SYNC for other workers (matrix-org#7318)
Browse files Browse the repository at this point in the history
Long story short: if we're handling presence on the current worker, we shouldn't be sending USER_SYNC commands over replication.

In an attempt to figure out what is going on here, I ended up refactoring some bits of the presencehandler code, so the first 4 commits here are non-functional refactors to move this code slightly closer to sanity. (There's still plenty to do here :/). Suggest reviewing individual commits.

Fixes (I hope) matrix-org#7257.
  • Loading branch information
richvdh authored and phil-flex committed Jun 16, 2020
1 parent b602d39 commit 300c6ea
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 159 deletions.
1 change: 1 addition & 0 deletions changelog.d/7318.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move catchup of replication streams logic to worker.
6 changes: 1 addition & 5 deletions docs/tcp_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ Asks the server for the current position of all streams.

#### USER_SYNC (C)

A user has started or stopped syncing
A user has started or stopped syncing on this process.

#### CLEAR_USER_SYNC (C)

Expand All @@ -216,10 +216,6 @@ Asks the server for the current position of all streams.

Inform the server a cache should be invalidated

#### SYNC (S, C)

Used exclusively in tests

### REMOTE_SERVER_UP (S, C)

Inform other processes that a remote server may have come back online.
Expand Down
2 changes: 2 additions & 0 deletions synapse/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class EventTypes(object):

Retention = "m.room.retention"

Presence = "m.presence"


class RejectedReason(object):
AUTH_ERROR = "auth_error"
Expand Down
85 changes: 49 additions & 36 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import contextlib
import logging
import sys
from typing import Dict, Iterable

from typing_extensions import ContextManager

from twisted.internet import defer, reactor
from twisted.web.resource import NoResource
Expand All @@ -38,14 +41,14 @@
from synapse.config.logger import setup_logging
from synapse.federation import send_queue
from synapse.federation.transport.server import TransportLayerServer
from synapse.handlers.presence import PresenceHandler, get_interested_parties
from synapse.handlers.presence import BasePresenceHandler, get_interested_parties
from synapse.http.server import JsonResource
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
Expand Down Expand Up @@ -225,23 +228,32 @@ async def on_POST(self, request, device_id):
return 200, {"one_time_key_counts": result}


class _NullContextManager(ContextManager[None]):
"""A context manager which does nothing."""

def __exit__(self, exc_type, exc_val, exc_tb):
pass


UPDATE_SYNCING_USERS_MS = 10 * 1000


class GenericWorkerPresence(object):
class GenericWorkerPresence(BasePresenceHandler):
def __init__(self, hs):
super().__init__(hs)
self.hs = hs
self.is_mine_id = hs.is_mine_id
self.http_client = hs.get_simple_http_client()
self.store = hs.get_datastore()
self.user_to_num_current_syncs = {}
self.clock = hs.get_clock()

self._presence_enabled = hs.config.use_presence

# The number of ongoing syncs on this process, by user id.
# Empty if _presence_enabled is false.
self._user_to_num_current_syncs = {} # type: Dict[str, int]

self.notifier = hs.get_notifier()
self.instance_id = hs.get_instance_id()

active_presence = self.store.take_presence_startup_info()
self.user_to_current_state = {state.user_id: state for state in active_presence}

# user_id -> last_sync_ms. Lists the users that have stopped syncing
# but we haven't notified the master of that yet
self.users_going_offline = {}
Expand All @@ -259,13 +271,13 @@ def __init__(self, hs):
)

def _on_shutdown(self):
if self.hs.config.use_presence:
if self._presence_enabled:
self.hs.get_tcp_replication().send_command(
ClearUserSyncsCommand(self.instance_id)
)

def send_user_sync(self, user_id, is_syncing, last_sync_ms):
if self.hs.config.use_presence:
if self._presence_enabled:
self.hs.get_tcp_replication().send_user_sync(
self.instance_id, user_id, is_syncing, last_sync_ms
)
Expand Down Expand Up @@ -307,28 +319,33 @@ def set_state(self, user, state, ignore_status_msg=False):
# TODO Hows this supposed to work?
return defer.succeed(None)

get_states = __func__(PresenceHandler.get_states)
get_state = __func__(PresenceHandler.get_state)
current_state_for_users = __func__(PresenceHandler.current_state_for_users)
async def user_syncing(
self, user_id: str, affect_presence: bool
) -> ContextManager[None]:
"""Record that a user is syncing.
Called by the sync and events servlets to record that a user has connected to
this worker and is waiting for some events.
"""
if not affect_presence or not self._presence_enabled:
return _NullContextManager()

def user_syncing(self, user_id, affect_presence):
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
self._user_to_num_current_syncs[user_id] = curr_sync + 1

# If we went from no in flight sync to some, notify replication
if self.user_to_num_current_syncs[user_id] == 1:
self.mark_as_coming_online(user_id)
# If we went from no in flight sync to some, notify replication
if self._user_to_num_current_syncs[user_id] == 1:
self.mark_as_coming_online(user_id)

def _end():
# We check that the user_id is in user_to_num_current_syncs because
# user_to_num_current_syncs may have been cleared if we are
# shutting down.
if affect_presence and user_id in self.user_to_num_current_syncs:
self.user_to_num_current_syncs[user_id] -= 1
if user_id in self._user_to_num_current_syncs:
self._user_to_num_current_syncs[user_id] -= 1

# If we went from one in flight sync to non, notify replication
if self.user_to_num_current_syncs[user_id] == 0:
if self._user_to_num_current_syncs[user_id] == 0:
self.mark_as_going_offline(user_id)

@contextlib.contextmanager
Expand All @@ -338,7 +355,7 @@ def _user_syncing():
finally:
_end()

return defer.succeed(_user_syncing())
return _user_syncing()

@defer.inlineCallbacks
def notify_from_replication(self, states, stream_id):
Expand Down Expand Up @@ -373,15 +390,12 @@ def process_replication_rows(self, token, rows):
stream_id = token
yield self.notify_from_replication(states, stream_id)

def get_currently_syncing_users(self):
if self.hs.config.use_presence:
return [
user_id
for user_id, count in self.user_to_num_current_syncs.items()
if count > 0
]
else:
return set()
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
return [
user_id
for user_id, count in self._user_to_num_current_syncs.items()
if count > 0
]


class GenericWorkerTyping(object):
Expand Down Expand Up @@ -625,8 +639,7 @@ def __init__(self, hs):

self.store = hs.get_datastore()
self.typing_handler = hs.get_typing_handler()
# NB this is a SynchrotronPresence, not a normal PresenceHandler
self.presence_handler = hs.get_presence_handler()
self.presence_handler = hs.get_presence_handler() # type: GenericWorkerPresence
self.notifier = hs.get_notifier()

self.notify_pushers = hs.config.start_pushers
Expand Down
20 changes: 12 additions & 8 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, SynapseError
from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.utils import log_function
from synapse.types import UserID
from synapse.visibility import filter_events_for_client
Expand Down Expand Up @@ -97,6 +98,8 @@ async def get_stream(
explicit_room_id=room_id,
)

time_now = self.clock.time_msec()

# When the user joins a new room, or another user joins a currently
# joined room, we need to send down presence for those users.
to_add = []
Expand All @@ -112,19 +115,20 @@ async def get_stream(
users = await self.state.get_current_users_in_room(
event.room_id
)
states = await presence_handler.get_states(users, as_event=True)
to_add.extend(states)
else:
users = [event.state_key]

ev = await presence_handler.get_state(
UserID.from_string(event.state_key), as_event=True
)
to_add.append(ev)
states = await presence_handler.get_states(users)
to_add.extend(
{
"type": EventTypes.Presence,
"content": format_user_presence_state(state, time_now),
}
for state in states
)

events.extend(to_add)

time_now = self.clock.time_msec()

chunks = await self._event_serializer.serialize_events(
events,
time_now,
Expand Down
10 changes: 8 additions & 2 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,16 @@ async def get_presence():
return []

states = await presence_handler.get_states(
[m.user_id for m in room_members], as_event=True
[m.user_id for m in room_members]
)

return states
return [
{
"type": EventTypes.Presence,
"content": format_user_presence_state(s, time_now),
}
for s in states
]

async def get_receipts():
receipts = await self.store.get_linearized_receipts_for_room(
Expand Down
Loading

0 comments on commit 300c6ea

Please sign in to comment.