Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix providing a RoomStreamToken instance to _notify_app_services_ephemeral #11137

Merged
merged 9 commits into from
Nov 2, 2021
1 change: 1 addition & 0 deletions changelog.d/11137.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Partially address occasional bursts of old read receipt and presence data being sent to application services that have opted in to receiving them.
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
16 changes: 11 additions & 5 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ async def handle_room_events(events: Iterable[EventBase]) -> None:
def notify_interested_services_ephemeral(
self,
stream_key: str,
new_token: Optional[int],
new_token: Union[int, RoomStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
) -> None:
"""
Expand All @@ -203,7 +203,7 @@ def notify_interested_services_ephemeral(
Appservices will only receive ephemeral events that fall within their
registered user and room namespaces.

new_token: The latest stream token.
new_token: The stream token of the event.
users: The users that should be informed of the new event, if any.
"""
if not self.notify_appservices:
Expand All @@ -212,6 +212,13 @@ def notify_interested_services_ephemeral(
if stream_key not in ("typing_key", "receipt_key", "presence_key"):
return

# Convert new_token from a RoomStreamToken to an int if necessary.
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
if isinstance(new_token, RoomStreamToken):
new_token = new_token.stream
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved

services = [
service
for service in self.store.get_app_services()
Expand All @@ -231,14 +238,13 @@ async def _notify_interested_services_ephemeral(
self,
services: List[ApplicationService],
stream_key: str,
new_token: Optional[int],
new_token: int,
users: Collection[Union[str, UserID]],
) -> None:
logger.debug("Checking interested services for %s" % (stream_key))
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
# Only handle typing if we have the latest token
richvdh marked this conversation as resolved.
Show resolved Hide resolved
if stream_key == "typing_key" and new_token is not None:
if stream_key == "typing_key":
# Note that we don't persist the token (via set_type_stream_id_for_appservice)
# for typing_key due to performance reasons and due to their highly
# ephemeral nature.
Expand Down
36 changes: 8 additions & 28 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,29 +383,6 @@ def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
except Exception:
logger.exception("Error notifying application services of event")

def _notify_app_services_ephemeral(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
) -> None:
"""Notify application services of ephemeral event activity.

Args:
stream_key: The stream the event came from.
new_token: The value of the new stream token.
users: The users that should be informed of the new event, if any.
"""
try:
stream_token = None
if isinstance(new_token, int):
stream_token = new_token
self.appservice_handler.notify_interested_services_ephemeral(
stream_key, stream_token, users or []
)
except Exception:
logger.exception("Error notifying application services of event")

def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
self._pusher_pool.on_new_notifications(max_room_stream_token)
Expand Down Expand Up @@ -468,11 +445,14 @@ def on_new_event(
self.notify_replication()

# Notify appservices
self._notify_app_services_ephemeral(
stream_key,
new_token,
users,
)
try:
self.appservice_handler.notify_interested_services_ephemeral(
stream_key,
new_token,
users,
)
except Exception:
logger.exception("Error notifying application services of event")

def on_new_replication_data(self) -> None:
"""Used to inform replication listeners that something has happened
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ async def add_user_signature_change_to_streams(
user_ids: the users who were signed

Returns:
THe new stream ID.
The new stream ID.
"""

async with self._device_list_id_gen.get_next() as stream_id:
Expand Down Expand Up @@ -1315,7 +1315,7 @@ def _update_remote_device_list_cache_txn(

async def add_device_change_to_streams(
self, user_id: str, device_ids: Collection[str], hosts: List[str]
):
) -> int:
"""Persist that a user's devices have been updated, and which hosts
(if any) should be poked.
"""
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def __init__(
prefilled_cache=presence_cache_prefill,
)

async def update_presence(self, presence_states):
async def update_presence(self, presence_states) -> Tuple[int, int]:
assert self._can_persist_presence

stream_ordering_manager = self._presence_id_gen.get_next_mult(
Expand Down