Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

_auth_and_persist_outliers: drop events we have already seen #11994

Merged
merged 2 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11994.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move common deduplication code down into `_auth_and_persist_outliers`.
44 changes: 20 additions & 24 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,6 @@ async def process_remote_join(
Raises:
SynapseError if the response is in some way invalid.
"""
event_map = {e.event_id: e for e in itertools.chain(auth_events, state)}

create_event = None
for e in auth_events:
if (e.type, e.state_key) == (EventTypes.Create, ""):
Expand All @@ -439,11 +437,6 @@ async def process_remote_join(
if room_version.identifier != room_version_id:
raise SynapseError(400, "Room version mismatch")

# filter out any events we have already seen
seen_remotes = await self._store.have_seen_events(room_id, event_map.keys())
for s in seen_remotes:
event_map.pop(s, None)

# persist the auth chain and state events.
#
# any invalid events here will be marked as rejected, and we'll carry on.
Expand All @@ -455,7 +448,9 @@ async def process_remote_join(
# signatures right now doesn't mean that we will *never* be able to, so it
# is premature to reject them.
#
await self._auth_and_persist_outliers(room_id, event_map.values())
await self._auth_and_persist_outliers(
room_id, itertools.chain(auth_events, state)
)

# and now persist the join event itself.
logger.info("Peristing join-via-remote %s", event)
Expand Down Expand Up @@ -1245,6 +1240,16 @@ async def _auth_and_persist_outliers(
"""
event_map = {event.event_id: event for event in events}

# filter out any events we have already seen. This might happen because
# the events were eagerly pushed to us (eg, during a room join), or because
# another thread has raced against us since we decided to request the event.
#
# This is just an optimisation, so it doesn't need to be watertight - the event
# persister does another round of deduplication.
seen_remotes = await self._store.have_seen_events(room_id, event_map.keys())
for s in seen_remotes:
event_map.pop(s, None)

# XXX: it might be possible to kick this process off in parallel with fetching
# the events.
while event_map:
Expand Down Expand Up @@ -1717,31 +1722,22 @@ async def _get_remote_auth_chain_for_event(
event_id: the event for which we are lacking auth events
"""
try:
remote_event_map = {
e.event_id: e
for e in await self._federation_client.get_event_auth(
destination, room_id, event_id
)
}
remote_events = await self._federation_client.get_event_auth(
destination, room_id, event_id
)

except RequestSendFailed as e1:
# The other side isn't around or doesn't implement the
# endpoint, so lets just bail out.
logger.info("Failed to get event auth from remote: %s", e1)
return

logger.info("/event_auth returned %i events", len(remote_event_map))
logger.info("/event_auth returned %i events", len(remote_events))

# `event` may be returned, but we should not yet process it.
remote_event_map.pop(event_id, None)

# nor should we reprocess any events we have already seen.
seen_remotes = await self._store.have_seen_events(
room_id, remote_event_map.keys()
)
for s in seen_remotes:
remote_event_map.pop(s, None)
remote_auth_events = (e for e in remote_events if e.event_id != event_id)

await self._auth_and_persist_outliers(room_id, remote_event_map.values())
await self._auth_and_persist_outliers(room_id, remote_auth_events)

async def _update_context_for_auth_events(
self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase]
Expand Down