Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rooms.bump_stamp to Sliding Sync /sync for easier client-side sorting #17395

Merged
merged 16 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17395.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Sort by and add `rooms.bump_stamp` for easier client-side sorting in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
4 changes: 4 additions & 0 deletions synapse/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,13 @@ class EventTypes:
SpaceParent: Final = "m.space.parent"

Reaction: Final = "m.reaction"
Sticker: Final = "m.sticker"
LiveLocationShareStart: Final = "m.beacon_info"

CallInvite: Final = "m.call.invite"

PollStart: Final = "m.poll.start"


class ToDeviceEventTypes:
RoomKeyRequest: Final = "m.room_key_request"
Expand Down
123 changes: 89 additions & 34 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@
logger = logging.getLogger(__name__)


# The event types that we should consider when sorting the rooms in the sync response.
DEFAULT_BUMP_EVENT_TYPES = {
EventTypes.Message,
EventTypes.Encrypted,
EventTypes.Sticker,
EventTypes.CallInvite,
EventTypes.PollStart,
EventTypes.LiveLocationShareStart,
}


def filter_membership_for_sync(
*, membership: str, user_id: str, sender: Optional[str]
) -> bool:
Expand Down Expand Up @@ -112,6 +123,7 @@ class _RoomMembershipForUser:
range
"""

room_id: str
event_id: Optional[str]
event_pos: PersistedEventPosition
membership: str
Expand All @@ -122,6 +134,12 @@ def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
return attr.evolve(self, **kwds)


@attr.s(slots=True, frozen=True, auto_attribs=True)
class _RelevantRoomEntry:
room_sync_config: RoomSyncConfig
room_membership_for_user: _RoomMembershipForUser


class SlidingSyncHandler:
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
Expand Down Expand Up @@ -242,7 +260,7 @@ async def current_sync_for_user(

# Assemble sliding window lists
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
relevant_room_map: Dict[str, RoomSyncConfig] = {}
relevant_room_map: Dict[str, _RelevantRoomEntry] = {}
if sync_config.lists:
# Get all of the room IDs that the user should be able to see in the sync
# response
Expand All @@ -260,64 +278,78 @@ async def current_sync_for_user(
sync_config.user, sync_room_map, list_config.filters, to_token
)

sorted_room_info = await self.sort_rooms(
sorted_sync_rooms = await self.sort_rooms(
filtered_sync_room_map, to_token
)

ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
if list_config.ranges:
for range in list_config.ranges:
sliced_room_ids = [
room_id
# Both sides of range are inclusive
for room_id, _ in sorted_room_info[range[0] : range[1] + 1]
]
# Both sides of range are inclusive
sliced_sync_rooms = sorted_sync_rooms[range[0] : range[1] + 1]

ops.append(
SlidingSyncResult.SlidingWindowList.Operation(
op=OperationType.SYNC,
range=range,
room_ids=sliced_room_ids,
room_ids=[
room_membership.room_id
for room_membership in sliced_sync_rooms
],
)
)

# Take the superset of the `RoomSyncConfig` for each room
for room_id in sliced_room_ids:
if relevant_room_map.get(room_id) is not None:
for room_membership_for_user in sliced_sync_rooms:
relevant_room_entry = relevant_room_map.get(
room_membership_for_user.room_id
)
if relevant_room_entry is not None:
existing_room_sync_config = (
relevant_room_entry.room_sync_config
)

# Take the highest timeline limit
if (
relevant_room_map[room_id].timeline_limit
existing_room_sync_config.timeline_limit
< list_config.timeline_limit
):
relevant_room_map[room_id].timeline_limit = (
existing_room_sync_config.timeline_limit = (
list_config.timeline_limit
)

# Union the required state
relevant_room_map[room_id].required_state.update(
existing_room_sync_config.required_state.update(
list_config.required_state
)
else:
relevant_room_map[room_id] = RoomSyncConfig(
timeline_limit=list_config.timeline_limit,
required_state=set(list_config.required_state),
relevant_room_map[room_membership_for_user.room_id] = (
_RelevantRoomEntry(
room_sync_config=RoomSyncConfig(
timeline_limit=list_config.timeline_limit,
required_state=set(
list_config.required_state
),
),
room_membership_for_user=room_membership_for_user,
)
)

lists[list_key] = SlidingSyncResult.SlidingWindowList(
count=len(sorted_room_info),
count=len(sorted_sync_rooms),
ops=ops,
)

# TODO: if (sync_config.room_subscriptions):

# Fetch room data
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
for room_id, room_sync_config in relevant_room_map.items():
for room_id, relevant_room_entry in relevant_room_map.items():
room_sync_result = await self.get_room_sync_data(
user=sync_config.user,
room_id=room_id,
room_sync_config=room_sync_config,
rooms_membership_for_user_at_to_token=sync_room_map[room_id],
room_sync_config=relevant_room_entry.room_sync_config,
room_membership_for_user_at_to_token=relevant_room_entry.room_membership_for_user,
from_token=from_token,
to_token=to_token,
)
Expand Down Expand Up @@ -389,6 +421,7 @@ async def get_sync_room_ids_for_user(
# (below) because they are potentially from the current snapshot time
# instead from the time of the `to_token`.
room_for_user.room_id: _RoomMembershipForUser(
room_id=room_for_user.room_id,
event_id=room_for_user.event_id,
event_pos=room_for_user.event_pos,
membership=room_for_user.membership,
Expand Down Expand Up @@ -489,6 +522,7 @@ async def get_sync_room_ids_for_user(
is not None
):
sync_room_id_set[room_id] = _RoomMembershipForUser(
room_id=room_id,
event_id=first_membership_change_after_to_token.prev_event_id,
event_pos=first_membership_change_after_to_token.prev_event_pos,
membership=first_membership_change_after_to_token.prev_membership,
Expand Down Expand Up @@ -583,6 +617,7 @@ async def get_sync_room_ids_for_user(
# is their own leave event
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
filtered_sync_room_id_set[room_id] = _RoomMembershipForUser(
room_id=room_id,
event_id=last_membership_change_in_from_to_range.event_id,
event_pos=last_membership_change_in_from_to_range.event_pos,
membership=last_membership_change_in_from_to_range.membership,
Expand Down Expand Up @@ -768,7 +803,7 @@ async def sort_rooms(
self,
sync_room_map: Dict[str, _RoomMembershipForUser],
to_token: StreamToken,
) -> List[Tuple[str, _RoomMembershipForUser]]:
) -> List[_RoomMembershipForUser]:
"""
Sort by `stream_ordering` of the last event that the user should see in the
room. `stream_ordering` is unique so we get a stable sort.
Expand Down Expand Up @@ -806,12 +841,17 @@ async def sort_rooms(
else:
# Otherwise, if the user has left/been invited/knocked/been banned from
# a room, they shouldn't see anything past that point.
#
# FIXME: It's possible that people should see beyond this point in
# invited/knocked cases if for example the room has
# `invite`/`world_readable` history visibility, see
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream

return sorted(
sync_room_map.items(),
sync_room_map.values(),
# Sort by the last activity (stream_ordering) in the room
key=lambda room_info: last_activity_in_room_map[room_info[0]],
key=lambda room_info: last_activity_in_room_map[room_info.room_id],
# We want descending order
reverse=True,
)
Expand All @@ -821,7 +861,7 @@ async def get_room_sync_data(
user: UserID,
room_id: str,
room_sync_config: RoomSyncConfig,
rooms_membership_for_user_at_to_token: _RoomMembershipForUser,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
from_token: Optional[StreamToken],
to_token: StreamToken,
) -> SlidingSyncResult.RoomResult:
Expand All @@ -835,7 +875,7 @@ async def get_room_sync_data(
room_id: The room ID to fetch data for
room_sync_config: Config for what data we should fetch for a room in the
sync response.
rooms_membership_for_user_at_to_token: Membership information for the user
room_membership_for_user_at_to_token: Membership information for the user
in the room at the time of `to_token`.
from_token: The point in the stream to sync from.
to_token: The point in the stream to sync up to.
Expand All @@ -855,7 +895,7 @@ async def get_room_sync_data(
if (
room_sync_config.timeline_limit > 0
# No timeline for invite/knock rooms (just `stripped_state`)
and rooms_membership_for_user_at_to_token.membership
and room_membership_for_user_at_to_token.membership
not in (Membership.INVITE, Membership.KNOCK)
):
limited = False
Expand All @@ -868,12 +908,12 @@ async def get_room_sync_data(
# We're going to paginate backwards from the `to_token`
from_bound = to_token.room_key
# People shouldn't see past their leave/ban event
if rooms_membership_for_user_at_to_token.membership in (
if room_membership_for_user_at_to_token.membership in (
Membership.LEAVE,
Membership.BAN,
):
from_bound = (
rooms_membership_for_user_at_to_token.event_pos.to_room_stream_token()
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
)

# Determine whether we should limit the timeline to the token range.
Expand All @@ -888,7 +928,7 @@ async def get_room_sync_data(
to_bound = (
from_token.room_key
if from_token is not None
and not rooms_membership_for_user_at_to_token.newly_joined
and not room_membership_for_user_at_to_token.newly_joined
else None
)

Expand Down Expand Up @@ -925,7 +965,7 @@ async def get_room_sync_data(
self.storage_controllers,
user.to_string(),
timeline_events,
is_peeking=rooms_membership_for_user_at_to_token.membership
is_peeking=room_membership_for_user_at_to_token.membership
!= Membership.JOIN,
filter_send_to_client=True,
)
Expand Down Expand Up @@ -980,16 +1020,16 @@ async def get_room_sync_data(
# Figure out any stripped state events for invite/knocks. This allows the
# potential joiner to identify the room.
stripped_state: List[JsonDict] = []
if rooms_membership_for_user_at_to_token.membership in (
if room_membership_for_user_at_to_token.membership in (
Membership.INVITE,
Membership.KNOCK,
):
# This should never happen. If someone is invited/knocked on room, then
# there should be an event for it.
assert rooms_membership_for_user_at_to_token.event_id is not None
assert room_membership_for_user_at_to_token.event_id is not None

invite_or_knock_event = await self.store.get_event(
rooms_membership_for_user_at_to_token.event_id
room_membership_for_user_at_to_token.event_id
)

stripped_state = []
Expand All @@ -1005,11 +1045,25 @@ async def get_room_sync_data(
stripped_state.append(strip_event(invite_or_knock_event))

# TODO: Handle state resets. For example, if we see
# `rooms_membership_for_user_at_to_token.membership = Membership.LEAVE` but
# `room_membership_for_user_at_to_token.membership = Membership.LEAVE` but
# `required_state` doesn't include it, we should indicate to the client that a
# state reset happened. Perhaps we should indicate this by setting `initial:
# True` and empty `required_state`.

# Figure out the last bump event in the room
last_bump_event_result = (
await self.store.get_last_event_pos_in_room_before_stream_ordering(
room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
)
)

# By default, just choose the membership event position
bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
# But if we found a bump event, use that instead
if last_bump_event_result:
_, bump_event_pos = last_bump_event_result
bump_stamp = bump_event_pos.stream

return SlidingSyncResult.RoomResult(
# TODO: Dummy value
name=None,
Expand All @@ -1031,6 +1085,7 @@ async def get_room_sync_data(
stripped_state=stripped_state,
prev_batch=prev_batch_token,
limited=limited,
bump_stamp=bump_stamp,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm slightly dubious about giving back an opaque number rather than a timestamp, but we can iterate later.

# TODO: Dummy values
joined_count=0,
invited_count=0,
Expand Down
1 change: 1 addition & 0 deletions synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,7 @@ async def encode_rooms(
serialized_rooms: Dict[str, JsonDict] = {}
for room_id, room_result in rooms.items():
serialized_rooms[room_id] = {
"bump_stamp": room_result.bump_stamp,
"joined_count": room_result.joined_count,
"invited_count": room_result.invited_count,
"notification_count": room_result.notification_count,
Expand Down
Loading
Loading