Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Make historical events discoverable from backfill for servers without…
Browse files Browse the repository at this point in the history
… any scrollback history (MSC2716) (#10245)

* Make historical messages available to federated servers

Part of MSC2716: matrix-org/matrix-spec-proposals#2716

Follow-up to #9247

* Debug message not available on federation

* Add base starting insertion point when no chunk ID is provided

* Fix messages from multiple senders in historical chunk

Follow-up to #9247

Part of MSC2716: matrix-org/matrix-spec-proposals#2716

---

Previously, Synapse would throw a 403,
`Cannot force another user to join.`,
because we were trying to use `?user_id` from a single virtual user
which did not match with messages from other users in the chunk.

* Remove debug lines

* Messing with selecting insertion event extremeties

* Move db schema change to new version

* Add more better comments

* Make a fake requester with just what we need

See #10276 (comment)

* Store insertion events in table

* Make base insertion event float off on its own

See #10250 (comment)

Conflicts:
	synapse/rest/client/v1/room.py

* Validate that the app service can actually control the given user

See #10276 (comment)

Conflicts:
	synapse/rest/client/v1/room.py

* Add some better comments on what we're trying to check for

* Continue debugging

* Share validation logic

* Add inserted historical messages to /backfill response

* Remove debug sql queries

* Some marker event implemntation trials

* Clean up PR

* Rename insertion_event_id to just event_id

* Add some better sql comments

* More accurate description

* Add changelog

* Make it clear what MSC the change is part of

* Add more detail on which insertion event came through

* Address review and improve sql queries

* Only use event_id as unique constraint

* Fix test case where insertion event is already in the normal DAG

* Remove debug changes

* Switch to chunk events so we can auth via power_levels

Previously, we were using `content.chunk_id` to connect one
chunk to another. But these events can be from any `sender`
and we can't tell who should be able to send historical events.
We know we only want the application service to do it but these
events have the sender of a real historical message, not the
application service user ID as the sender. Other federated homeservers
also have no indicator which senders are an application service on
the originating homeserver.

So we want to auth all of the MSC2716 events via power_levels
and have them be sent by the application service with proper
PL levels in the room.

* Switch to chunk events for federation

* Add unstable room version to support new historical PL

* Fix federated events being rejected for no state_groups

Add fix from #10439
until it merges.

* Only connect base insertion event to prev_event_ids

Per discussion with @erikjohnston,
https://matrix.to/#/!UytJQHLQYfvYWsGrGY:jki.re/$12bTUiObDFdHLAYtT7E-BvYRp3k_xv8w0dUQHibasJk?via=jki.re&via=matrix.org

* Make it possible to get the room_version with txn

* Allow but ignore historical events in unsupported room version

See #10245 (comment)

We can't reject historical events on unsupported room versions because homeservers without knowledge of MSC2716 or the new room version don't reject historical events either.

Since we can't rely on the auth check here to stop historical events on unsupported room versions, I've added some additional checks in the processing/persisting code (`synapse/storage/databases/main/events.py` ->  `_handle_insertion_event` and `_handle_chunk_event`). I've had to do some refactoring so there is method to fetch the room version by `txn`.

* Move to unique index syntax

See #10245 (comment)

* High-level document how the insertion->chunk lookup works

* Remove create_event fallback for room_versions

See https://github.com/matrix-org/synapse/pull/10245/files#r677641879

* Use updated method name
  • Loading branch information
MadLittleMods committed Jul 28, 2021
1 parent 8c201c9 commit d0b294a
Show file tree
Hide file tree
Showing 12 changed files with 338 additions and 26 deletions.
1 change: 1 addition & 0 deletions changelog.d/10245.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make historical events discoverable from backfill for servers without any scrollback history (part of MSC2716).
3 changes: 0 additions & 3 deletions synapse/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,6 @@ class EventContentFields:
MSC2716_CHUNK_ID = "org.matrix.msc2716.chunk_id"
# For "marker" events
MSC2716_MARKER_INSERTION = "org.matrix.msc2716.marker.insertion"
MSC2716_MARKER_INSERTION_PREV_EVENTS = (
"org.matrix.msc2716.marker.insertion_prev_events"
)


class RoomTypes:
Expand Down
27 changes: 27 additions & 0 deletions synapse/api/room_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class RoomVersion:
# MSC2403: Allows join_rules to be set to 'knock', changes auth rules to allow sending
# m.room.membership event with membership 'knock'.
msc2403_knocking = attr.ib(type=bool)
# MSC2716: Adds m.room.power_levels -> content.historical field to control
# whether "insertion", "chunk", "marker" events can be sent
msc2716_historical = attr.ib(type=bool)


class RoomVersions:
Expand All @@ -88,6 +91,7 @@ class RoomVersions:
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
)
V2 = RoomVersion(
"2",
Expand All @@ -101,6 +105,7 @@ class RoomVersions:
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
)
V3 = RoomVersion(
"3",
Expand All @@ -114,6 +119,7 @@ class RoomVersions:
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
)
V4 = RoomVersion(
"4",
Expand All @@ -127,6 +133,7 @@ class RoomVersions:
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
)
V5 = RoomVersion(
"5",
Expand All @@ -140,6 +147,7 @@ class RoomVersions:
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
)
V6 = RoomVersion(
"6",
Expand All @@ -153,6 +161,7 @@ class RoomVersions:
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
)
MSC2176 = RoomVersion(
"org.matrix.msc2176",
Expand All @@ -166,6 +175,7 @@ class RoomVersions:
msc2176_redaction_rules=True,
msc3083_join_rules=False,
msc2403_knocking=False,
msc2716_historical=False,
)
MSC3083 = RoomVersion(
"org.matrix.msc3083.v2",
Expand All @@ -179,6 +189,7 @@ class RoomVersions:
msc2176_redaction_rules=False,
msc3083_join_rules=True,
msc2403_knocking=False,
msc2716_historical=False,
)
V7 = RoomVersion(
"7",
Expand All @@ -192,6 +203,21 @@ class RoomVersions:
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc2403_knocking=True,
msc2716_historical=False,
)
MSC2716 = RoomVersion(
"org.matrix.msc2716",
RoomDisposition.STABLE,
EventFormatVersions.V3,
StateResolutionVersions.V2,
enforce_key_validity=True,
special_case_aliases_auth=False,
strict_canonicaljson=True,
limit_notifications_power_levels=True,
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc2403_knocking=True,
msc2716_historical=True,
)


Expand All @@ -207,6 +233,7 @@ class RoomVersions:
RoomVersions.MSC2176,
RoomVersions.MSC3083,
RoomVersions.V7,
RoomVersions.MSC2716,
)
}

Expand Down
38 changes: 38 additions & 0 deletions synapse/event_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ def check(
if event.type == EventTypes.Redaction:
check_redaction(room_version_obj, event, auth_events)

if (
event.type == EventTypes.MSC2716_INSERTION
or event.type == EventTypes.MSC2716_CHUNK
or event.type == EventTypes.MSC2716_MARKER
):
check_historical(room_version_obj, event, auth_events)

logger.debug("Allowing! %s", event)


Expand Down Expand Up @@ -539,6 +546,37 @@ def check_redaction(
raise AuthError(403, "You don't have permission to redact events")


def check_historical(
room_version_obj: RoomVersion,
event: EventBase,
auth_events: StateMap[EventBase],
) -> None:
"""Check whether the event sender is allowed to send historical related
events like "insertion", "chunk", and "marker".
Returns:
None
Raises:
AuthError if the event sender is not allowed to send historical related events
("insertion", "chunk", and "marker").
"""
# Ignore the auth checks in room versions that do not support historical
# events
if not room_version_obj.msc2716_historical:
return

user_level = get_user_power_level(event.user_id, auth_events)

historical_level = get_named_level(auth_events, "historical", 100)

if user_level < historical_level:
raise AuthError(
403,
'You don\'t have permission to send send historical related events ("insertion", "chunk", and "marker")',
)


def _check_power_levels(
room_version_obj: RoomVersion,
event: EventBase,
Expand Down
3 changes: 3 additions & 0 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ def add_fields(*fields):
if room_version.msc2176_redaction_rules:
add_fields("invite")

if room_version.msc2716_historical:
add_fields("historical")

elif event_type == EventTypes.Aliases and room_version.special_case_aliases_auth:
add_fields("aliases")
elif event_type == EventTypes.RoomHistoryVisibility:
Expand Down
6 changes: 4 additions & 2 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2748,9 +2748,11 @@ async def _update_auth_events_and_context_for_auth(
event.event_id,
e.event_id,
)
context = await self.state_handler.compute_event_context(e)
missing_auth_event_context = (
await self.state_handler.compute_event_context(e)
)
await self._auth_and_persist_event(
origin, e, context, auth_events=auth
origin, e, missing_auth_event_context, auth_events=auth
)

if e.event_id in event_auth_events:
Expand Down
1 change: 1 addition & 0 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,7 @@ async def send(etype: str, content: JsonDict, **kwargs) -> int:
"kick": 50,
"redact": 50,
"invite": 50,
"historical": 100,
}

if config["original_invitees_have_ops"]:
Expand Down
7 changes: 6 additions & 1 deletion synapse/rest/client/v1/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,6 @@ async def on_POST(self, request, room_id):

events_to_create = body["events"]

prev_event_ids = prev_events_from_query
inherited_depth = await self._inherit_depth_from_prev_ids(
prev_events_from_query
)
Expand All @@ -516,6 +515,10 @@ async def on_POST(self, request, room_id):
chunk_id_to_connect_to = chunk_id_from_query
base_insertion_event = None
if chunk_id_from_query:
# All but the first base insertion event should point at a fake
# event, which causes the HS to ask for the state at the start of
# the chunk later.
prev_event_ids = [fake_prev_event_id]
# TODO: Verify the chunk_id_from_query corresponds to an insertion event
pass
# Otherwise, create an insertion event to act as a starting point.
Expand All @@ -526,6 +529,8 @@ async def on_POST(self, request, room_id):
# an insertion event), in which case we just create a new insertion event
# that can then get pointed to by a "marker" event later.
else:
prev_event_ids = prev_events_from_query

base_insertion_event_dict = self._create_insertion_event_dict(
sender=requester.user.to_string(),
room_id=room_id,
Expand Down
88 changes: 79 additions & 9 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,15 +936,46 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
# We want to make sure that we do a breadth-first, "depth" ordered
# search.

query = (
"SELECT depth, prev_event_id FROM event_edges"
" INNER JOIN events"
" ON prev_event_id = events.event_id"
" WHERE event_edges.event_id = ?"
" AND event_edges.is_state = ?"
" LIMIT ?"
)
# Look for the prev_event_id connected to the given event_id
query = """
SELECT depth, prev_event_id FROM event_edges
/* Get the depth of the prev_event_id from the events table */
INNER JOIN events
ON prev_event_id = events.event_id
/* Find an event which matches the given event_id */
WHERE event_edges.event_id = ?
AND event_edges.is_state = ?
LIMIT ?
"""

# Look for the "insertion" events connected to the given event_id
connected_insertion_event_query = """
SELECT e.depth, i.event_id FROM insertion_event_edges AS i
/* Get the depth of the insertion event from the events table */
INNER JOIN events AS e USING (event_id)
/* Find an insertion event which points via prev_events to the given event_id */
WHERE i.insertion_prev_event_id = ?
LIMIT ?
"""

# Find any chunk connections of a given insertion event
chunk_connection_query = """
SELECT e.depth, c.event_id FROM insertion_events AS i
/* Find the chunk that connects to the given insertion event */
INNER JOIN chunk_events AS c
ON i.next_chunk_id = c.chunk_id
/* Get the depth of the chunk start event from the events table */
INNER JOIN events AS e USING (event_id)
/* Find an insertion event which matches the given event_id */
WHERE i.event_id = ?
LIMIT ?
"""

# In a PriorityQueue, the lowest valued entries are retrieved first.
# We're using depth as the priority in the queue.
# Depth is lowest at the oldest-in-time message and highest and
# newest-in-time message. We add events to the queue with a negative depth so that
# we process the newest-in-time messages first going backwards in time.
queue = PriorityQueue()

for event_id in event_list:
Expand All @@ -970,9 +1001,48 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):

event_results.add(event_id)

# Try and find any potential historical chunks of message history.
#
# First we look for an insertion event connected to the current
# event (by prev_event). If we find any, we need to go and try to
# find any chunk events connected to the insertion event (by
# chunk_id). If we find any, we'll add them to the queue and
# navigate up the DAG like normal in the next iteration of the loop.
txn.execute(
connected_insertion_event_query, (event_id, limit - len(event_results))
)
connected_insertion_event_id_results = txn.fetchall()
logger.debug(
"_get_backfill_events: connected_insertion_event_query %s",
connected_insertion_event_id_results,
)
for row in connected_insertion_event_id_results:
connected_insertion_event_depth = row[0]
connected_insertion_event = row[1]
queue.put((-connected_insertion_event_depth, connected_insertion_event))

# Find any chunk connections for the given insertion event
txn.execute(
chunk_connection_query,
(connected_insertion_event, limit - len(event_results)),
)
chunk_start_event_id_results = txn.fetchall()
logger.debug(
"_get_backfill_events: chunk_start_event_id_results %s",
chunk_start_event_id_results,
)
for row in chunk_start_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))

# Navigate up the DAG by prev_event
txn.execute(query, (event_id, False, limit - len(event_results)))
prev_event_id_results = txn.fetchall()
logger.debug(
"_get_backfill_events: prev_event_ids %s", prev_event_id_results
)

for row in txn:
for row in prev_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))

Expand Down
Loading

0 comments on commit d0b294a

Please sign in to comment.