Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move client command handling out of TCP protocol #7185

Merged
merged 21 commits into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions synapse/app/admin_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.util.logcontext import LoggingContext
from synapse.util.versionstring import get_version_string
Expand Down Expand Up @@ -79,17 +78,6 @@ def _listen_http(self, listener_config):
def start_listening(self, listeners):
pass

def build_tcp_replication(self):
return AdminCmdReplicationHandler(self)


class AdminCmdReplicationHandler(ReplicationClientHandler):
async def on_rdata(self, stream_name, token, rows):
pass

def get_streams_to_replicate(self):
return {}


@defer.inlineCallbacks
def export_data_command(hs, args):
Expand Down
30 changes: 25 additions & 5 deletions synapse/replication/tcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,31 @@


Structure of the module:
* client.py - the client classes used for workers to connect to master
* handler.py - the classes used to handle sending/receiving commands to
replication
* command.py - the definitions of all the valid commands
* protocol.py - contains bot the client and server protocol implementations,
these should not be used directly
* resource.py - the server classes that accepts and handle client connections
* streams.py - the definitons of all the valid streams
* protocol.py - the TCP protocol classes
* resource.py - handles streaming stream updates to replications
* streams/ - the definitons of all the valid streams


The general interaction of the classes are:

+---------------------+
| ReplicationStreamer |
+---------------------+
|
v
+---------------------------+ +----------------------+
| ReplicationCommandHandler |---->|ReplicationDataHandler|
+---------------------------+ +----------------------+
| ^
v |
+-------------+
| Protocols |
| (TCP/redis) |
+-------------+

Where the ReplicationDataHandler (or subclasses) handles incoming stream
updates.
"""
24 changes: 17 additions & 7 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
"""

import logging
from typing import Dict
from typing import TYPE_CHECKING, Dict

from twisted.internet.protocol import ReconnectingClientFactory

from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol

MYPY = False
if MYPY:
if TYPE_CHECKING:
from synapse.server import HomeServer
from synapse.replication.tcp.handler import ReplicationCommandHandler

logger = logging.getLogger(__name__)

Expand All @@ -34,14 +34,18 @@ class ReplicationClientFactory(ReconnectingClientFactory):
"""Factory for building connections to the master. Will reconnect if the
connection is lost.

Accepts a handler that will be called when new data is available or data
is required.
Accepts a handler that is passed to `ClientReplicationStreamProtocol`.
"""

initialDelay = 0.1
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
maxDelay = 1 # Try at least once every N seconds

def __init__(self, hs: "HomeServer", client_name, command_handler):
def __init__(
self,
hs: "HomeServer",
client_name: str,
command_handler: "ReplicationCommandHandler",
):
self.client_name = client_name
self.command_handler = command_handler
self.server_name = hs.config.server_name
Expand Down Expand Up @@ -73,7 +77,10 @@ def clientConnectionFailed(self, connector, reason):


class ReplicationDataHandler:
"""A replication data handler that calls slave data stores.
"""A replication data handler handles incoming stream updates from replication.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""A replication data handler handles incoming stream updates from replication.
"""Handles incoming stream updates from replication.


This instance notifies the slave data store about updates. Can be subclassed
to handle updates in additional ways.
"""

def __init__(self, store: BaseSlavedStore):
Expand Down Expand Up @@ -112,3 +119,6 @@ def get_streams_to_replicate(self) -> Dict[str, int]:

async def on_position(self, stream_name: str, token: int):
self.store.process_replication_rows(stream_name, token, [])

def on_remote_server_up(self, server: str):
"""Called when get a new REMOTE_SERVER_UP command."""
93 changes: 62 additions & 31 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A replication client for use by synapse workers.
"""

import logging
from typing import Any, Callable, Dict, List, Optional, Set
Expand Down Expand Up @@ -51,37 +49,37 @@ class ReplicationCommandHandler:
"""

def __init__(self, hs):
self.replication_data_handler = hs.get_replication_data_handler()
self.presence_handler = hs.get_presence_handler()
self._replication_data_handler = hs.get_replication_data_handler()
self._presence_handler = hs.get_presence_handler()

# Set of streams that we're currently catching up with.
self.streams_connecting = set() # type: Set[str]
# Set of streams that we've caught up with.
self._streams_connected = set() # type: Set[str]

self.streams = {
self._streams = {
stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
} # type: Dict[str, Stream]

self._position_linearizer = Linearizer("replication_position")

# Map of stream to batched updates. See RdataCommand for info on how
# batching works.
self.pending_batches = {} # type: Dict[str, List[Any]]
self._pending_batches = {} # type: Dict[str, List[Any]]

# The factory used to create connections.
self.factory = None # type: Optional[ReplicationClientFactory]
self._factory = None # type: Optional[ReplicationClientFactory]

# The current connection. None if we are currently (re)connecting
self.connection = None
self._connection = None

def start_replication(self, hs):
"""Helper method to start a replication connection to the remote server
using TCP.
"""
client_name = hs.config.worker_name
self.factory = ReplicationClientFactory(hs, client_name, self)
self._factory = ReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
hs.get_reactor().connectTCP(host, port, self.factory)
hs.get_reactor().connectTCP(host, port, self._factory)

async def on_RDATA(self, cmd: RdataCommand):
stream_name = cmd.stream_name
Expand All @@ -93,13 +91,13 @@ async def on_RDATA(self, cmd: RdataCommand):
logger.exception("Failed to parse RDATA: %r %r", stream_name, cmd.row)
raise

if cmd.token is None or stream_name in self.streams_connecting:
if cmd.token is None or stream_name not in self._streams_connected:
# I.e. this is part of a batch of updates for this stream. Batch
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# until we get an update for the stream with a non None token
self.pending_batches.setdefault(stream_name, []).append(row)
self._pending_batches.setdefault(stream_name, []).append(row)
else:
# Check if this is the last of a batch of updates
rows = self.pending_batches.pop(stream_name, [])
rows = self._pending_batches.pop(stream_name, [])
rows.append(row)
await self.on_rdata(stream_name, cmd.token, rows)

Expand All @@ -113,23 +111,26 @@ async def on_rdata(self, stream_name: str, token: int, rows: list):
Stream.parse_row.
"""
logger.debug("Received rdata %s -> %s", stream_name, token)
await self.replication_data_handler.on_rdata(stream_name, token, rows)
await self._replication_data_handler.on_rdata(stream_name, token, rows)

async def on_POSITION(self, cmd: PositionCommand):
stream = self.streams.get(cmd.stream_name)
stream = self._streams.get(cmd.stream_name)
if not stream:
logger.error("Got POSITION for unknown stream: %s", cmd.stream_name)
return

# We're about to go and catch up with the stream, so mark as connecting
# to stop RDATA being handled at the same time.
self.streams_connecting.add(cmd.stream_name)
# to stop RDATA being handled at the same time by removing stream from
# list of connected streams. We also clear any batched up RDATA from
# before we got the POSITION.
self._streams_connected.discard(cmd.stream_name)
self._pending_batches.clear()

# We protect catching up with a linearizer in case the replicaiton
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replicaiton

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replicaiton

# connection reconnects under us.
with await self._position_linearizer.queue(cmd.stream_name):
# Find where we previously streamed up to.
current_token = self.replication_data_handler.get_streams_to_replicate().get(
current_token = self._replication_data_handler.get_streams_to_replicate().get(
cmd.stream_name
)
if current_token is None:
Expand All @@ -153,32 +154,62 @@ async def on_POSITION(self, cmd: PositionCommand):
)

# We've now caught up to position sent to us, notify handler.
await self.replication_data_handler.on_position(cmd.stream_name, cmd.token)
await self._replication_data_handler.on_position(cmd.stream_name, cmd.token)

self.streams_connecting.discard(cmd.stream_name)
self._streams_connected.add(cmd.stream_name)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

# Handle any RDATA that came in while we were catching up.
rows = self.pending_batches.pop(cmd.stream_name, [])
rows = self._pending_batches.pop(cmd.stream_name, [])
if rows:
await self.on_rdata(cmd.stream_name, rows[-1].token, rows)
# We need to make sure we filter out RDATA rows with a token less
# than what we've caught up to. This is slightly fiddly because of
# "batched" rows which have a `None` token, indicating that they
# have the same token as the next row with a non-None token.
#
# We do this by walking the list backwards, first removing any RDATA
# rows that are part of an uncompeted batch, then taking rows while
# their token is either None or greater than where we've caught up
# to.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, am I overthinking this? Should we just pass on all RDATA commands that we get after the POSITION? I.e., clearing pending_batches when we receive a POSITION?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like that's already happening (https://github.com/matrix-org/synapse/pull/7185/files/5104d1673bb4a3be3bd2a655dfde568fda01226c..534bd868e50cd1fe2efd52d4ec0ec92452ac6a6b#diff-bff709ecab561a0aa9f155333fcc1b0dR127), but I think there's a slight problem here:

  • we receive a POSITION and start a catchup
  • RDATA arrives
  • connection drops and we reconnect
  • we receive another POSITION, clear pending_batches, and start waiting for the linearizer
  • more RDATA arrives and gets added to pending_batches
  • the first catchup completes, and we process all the RDATAs which arrived since the second POSITION despite having not caught up to it.

At this point, this stuff feels very much a separate problem to "Move client command handling out of TCP protocol". I've also realised that the current impl is also racy af, which can't be helping with #7206.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I'll revert to previous handling and we can think about this separately.

uncompleted_batch = []
unfinished_batch = True
filtered_rows = []
for row in reversed(rows):
if row.token is not None:
unfinished_batch = False
if cmd.token < row.token:
filtered_rows.append(row)
else:
break
elif unfinished_batch:
uncompleted_batch.append(row)
else:
filtered_rows.append(row)

filtered_rows.reverse()
uncompleted_batch.reverse()
if uncompleted_batch:
self._pending_batches[cmd.stream_name] = uncompleted_batch

await self.on_rdata(cmd.stream_name, rows[-1].token, filtered_rows)

async def on_SYNC(self, cmd: SyncCommand):
pass

async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
""""Called when get a new REMOTE_SERVER_UP command."""
self._replication_data_handler.on_remote_server_up(cmd.data)

def get_currently_syncing_users(self):
"""Get the list of currently syncing users (if any). This is called
when a connection has been established and we need to send the
currently syncing users. (Overriden by the synchrotron's only)
currently syncing users.
"""
return self.presence_handler.get_currently_syncing_users()
return self._presence_handler.get_currently_syncing_users()

def update_connection(self, connection):
"""Called when a connection has been established (or lost with None).
"""
self.connection = connection
self._connection = connection

def finished_connecting(self):
"""Called when we have successfully subscribed and caught up to all
Expand All @@ -189,15 +220,15 @@ def finished_connecting(self):
# We don't reset the delay any earlier as otherwise if there is a
# problem during start up we'll end up tight looping connecting to the
# server.
if self.factory:
self.factory.resetDelay()
if self._factory:
self._factory.resetDelay()

def send_command(self, cmd: Command):
"""Send a command to master (when we get establish a connection if we
don't have one already.)
"""
if self.connection:
self.connection.send_command(cmd)
if self._connection:
self._connection.send_command(cmd)
else:
logger.warning("Dropping command as not connected: %r", cmd.NAME)
richvdh marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
Loading