Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix not sending events over federation when using sharded event persi…
Browse files Browse the repository at this point in the history
…sters (#8536)

* Fix outbound federaion with multiple event persisters.

We incorrectly notified federation senders that the minimum persisted
stream position had advanced when we got an `RDATA` from an event
persister.

Notifying of federation senders already correctly happens in the
notifier, so we just delete the offending line.

* Change some interfaces to use RoomStreamToken.

By enforcing use of `RoomStreamTokens` we make it less likely that
people pass in random ints that they got from somewhere random.
  • Loading branch information
erikjohnston committed Oct 14, 2020
1 parent 3ee97a2 commit 921a3f8
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 21 deletions.
1 change: 1 addition & 0 deletions changelog.d/8536.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix not sending events over federation when using sharded event writers.
4 changes: 0 additions & 4 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,10 +790,6 @@ async def process_replication_rows(self, stream_name, token, rows):
send_queue.process_rows_for_federation(self.federation_sender, rows)
await self.update_token(token)

# We also need to poke the federation sender when new events happen
elif stream_name == "events":
self.federation_sender.notify_new_events(token)

# ... and when new receipts happen
elif stream_name == ReceiptsStream.NAME:
await self._on_new_receipts(rows)
Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def _clear_queue_before_pos(self, position_to_delete):
for key in keys[:i]:
del self.edus[key]

def notify_new_events(self, current_id):
def notify_new_events(self, max_token):
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
Expand Down
9 changes: 7 additions & 2 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import ReadReceipt
from synapse.types import ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure, measure_func

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -154,10 +154,15 @@ def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
self._per_destination_queues[destination] = queue
return queue

def notify_new_events(self, current_id: int) -> None:
def notify_new_events(self, max_token: RoomStreamToken) -> None:
"""This gets called when we have some new events we might want to
send out to other servers.
"""
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
current_id = max_token.stream

self._last_poked_id = max(current_id, self._last_poked_id)

if self._is_processing:
Expand Down
11 changes: 7 additions & 4 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import RoomStreamToken
from synapse.util.metrics import Measure

logger = logging.getLogger(__name__)
Expand All @@ -47,15 +48,17 @@ def __init__(self, hs):
self.current_max = 0
self.is_processing = False

async def notify_interested_services(self, current_id):
async def notify_interested_services(self, max_token: RoomStreamToken):
"""Notifies (pushes) all application services interested in this event.
Pushing is done asynchronously, so this method won't block for any
prolonged length of time.
Args:
current_id(int): The current maximum ID.
"""
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
current_id = max_token.stream

services = self.store.get_app_services()
if not services or not self.notify_appservices:
return
Expand Down
6 changes: 3 additions & 3 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,19 +319,19 @@ def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken):
)

if self.federation_sender:
self.federation_sender.notify_new_events(max_room_stream_token.stream)
self.federation_sender.notify_new_events(max_room_stream_token)

async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
try:
await self.appservice_handler.notify_interested_services(
max_room_stream_token.stream
max_room_stream_token
)
except Exception:
logger.exception("Error notifying application services of event")

async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
await self._pusher_pool.on_new_notifications(max_room_stream_token.stream)
await self._pusher_pool.on_new_notifications(max_room_stream_token)
except Exception:
logger.exception("Error pusher pool of event")

Expand Down
8 changes: 7 additions & 1 deletion synapse/push/emailpusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from twisted.internet.error import AlreadyCalled, AlreadyCancelled

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import RoomStreamToken

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -91,7 +92,12 @@ def on_stop(self):
pass
self.timed_call = None

def on_new_notifications(self, max_stream_ordering):
def on_new_notifications(self, max_token: RoomStreamToken):
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
max_stream_ordering = max_token.stream

if self.max_stream_ordering:
self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering
Expand Down
8 changes: 7 additions & 1 deletion synapse/push/httppusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from synapse.logging import opentracing
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
from synapse.types import RoomStreamToken

from . import push_rule_evaluator, push_tools

Expand Down Expand Up @@ -114,7 +115,12 @@ def on_started(self, should_check_for_notifs):
if should_check_for_notifs:
self._start_processing()

def on_new_notifications(self, max_stream_ordering):
def on_new_notifications(self, max_token: RoomStreamToken):
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
max_stream_ordering = max_token.stream

self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering or 0
)
Expand Down
10 changes: 8 additions & 2 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from synapse.push.emailpusher import EmailPusher
from synapse.push.httppusher import HttpPusher
from synapse.push.pusher import PusherFactory
from synapse.types import RoomStreamToken
from synapse.util.async_helpers import concurrently_execute

if TYPE_CHECKING:
Expand Down Expand Up @@ -186,11 +187,16 @@ async def remove_pushers_by_access_token(self, user_id, access_tokens):
)
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])

async def on_new_notifications(self, max_stream_id: int):
async def on_new_notifications(self, max_token: RoomStreamToken):
if not self.pushers:
# nothing to do here.
return

# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
max_stream_id = max_token.stream

if max_stream_id < self._last_room_stream_id_seen:
# Nothing to do
return
Expand All @@ -214,7 +220,7 @@ async def on_new_notifications(self, max_stream_id: int):

if u in self.pushers:
for p in self.pushers[u].values():
p.on_new_notifications(max_stream_id)
p.on_new_notifications(max_token)

except Exception:
logger.exception("Exception in pusher on_new_notifications")
Expand Down
13 changes: 10 additions & 3 deletions tests/handlers/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from twisted.internet import defer

from synapse.handlers.appservice import ApplicationServicesHandler
from synapse.types import RoomStreamToken

from tests.test_utils import make_awaitable
from tests.utils import MockClock
Expand Down Expand Up @@ -61,7 +62,9 @@ def test_notify_interested_services(self):
defer.succeed((0, [event])),
defer.succeed((0, [])),
]
yield defer.ensureDeferred(self.handler.notify_interested_services(0))
yield defer.ensureDeferred(
self.handler.notify_interested_services(RoomStreamToken(None, 0))
)
self.mock_scheduler.submit_event_for_as.assert_called_once_with(
interested_service, event
)
Expand All @@ -80,7 +83,9 @@ def test_query_user_exists_unknown_user(self):
defer.succeed((0, [event])),
defer.succeed((0, [])),
]
yield defer.ensureDeferred(self.handler.notify_interested_services(0))
yield defer.ensureDeferred(
self.handler.notify_interested_services(RoomStreamToken(None, 0))
)
self.mock_as_api.query_user.assert_called_once_with(services[0], user_id)

@defer.inlineCallbacks
Expand All @@ -97,7 +102,9 @@ def test_query_user_exists_known_user(self):
defer.succeed((0, [event])),
defer.succeed((0, [])),
]
yield defer.ensureDeferred(self.handler.notify_interested_services(0))
yield defer.ensureDeferred(
self.handler.notify_interested_services(RoomStreamToken(None, 0))
)
self.assertFalse(
self.mock_as_api.query_user.called,
"query_user called when it shouldn't have been.",
Expand Down

0 comments on commit 921a3f8

Please sign in to comment.