Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prepatory work for batching events to send #13487

Merged
merged 8 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13487.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up creation of DM rooms.
175 changes: 115 additions & 60 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
MutableStateMap,
Requester,
RoomAlias,
StateMap,
StreamToken,
UserID,
create_requester,
Expand Down Expand Up @@ -567,9 +568,17 @@ async def create_event(
outlier: bool = False,
historical: bool = False,
depth: Optional[int] = None,
state_map: Optional[StateMap[str]] = None,
for_batch: bool = False,
current_state_group: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
"""
Given a dict from a client, create a new event.
Given a dict from a client, create a new event. If bool for_batch is true, will
create an event using the prev_event_ids, and will create an event context for
the event using the parameters state_map and current_state_group, thus these parameters
must be provided in this case if for_batch is True. The subsequently created event
and context are suitable for being batched up and bulk persisted to the database
with other similarly created events.

Creates an FrozenEvent object, filling out auth_events, prev_events,
etc.
Expand Down Expand Up @@ -612,16 +621,27 @@ async def create_event(
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.

historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.

depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.

state_map: A state map of previously created events, used only when creating events
for batch persisting

for_batch: whether the event is being created for batch persisting to the db

current_state_group: the current state group, used only for creating events for
batch persisting

Raises:
ResourceLimitError if server is blocked to some resource being
exceeded

Returns:
Tuple of created event, Context
"""
Expand Down Expand Up @@ -693,6 +713,9 @@ async def create_event(
auth_event_ids=auth_event_ids,
state_event_ids=state_event_ids,
depth=depth,
state_map=state_map,
for_batch=for_batch,
current_state_group=current_state_group,
)

# In an ideal world we wouldn't need the second part of this condition. However,
Expand All @@ -707,10 +730,14 @@ async def create_event(
# federation as well as those created locally. As of room v3, aliases events
# can be created by users that are not in the room, therefore we have to
# tolerate them in event_auth.check().
prev_state_ids = await context.get_prev_state_ids(
StateFilter.from_types([(EventTypes.Member, None)])
)
prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
if for_batch:
assert state_map is not None
prev_event_id = state_map.get((EventTypes.Member, event.sender))
else:
prev_state_ids = await context.get_prev_state_ids(
StateFilter.from_types([(EventTypes.Member, None)])
)
prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
prev_event = (
await self.store.get_event(prev_event_id, allow_none=True)
if prev_event_id
Expand Down Expand Up @@ -1009,8 +1036,16 @@ async def create_new_client_event(
auth_event_ids: Optional[List[str]] = None,
state_event_ids: Optional[List[str]] = None,
depth: Optional[int] = None,
state_map: Optional[StateMap[str]] = None,
for_batch: bool = False,
current_state_group: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client
"""Create a new event for a local client. If bool for_batch is true, will
create an event using the prev_event_ids, and will create an event context for
the event using the parameters state_map and current_state_group, thus these parameters
must be provided in this case if for_batch is True. The subsequently created event
and context are suitable for being batched up and bulk persisted to the database
with other similarly created events.

Args:
builder:
Expand Down Expand Up @@ -1043,6 +1078,14 @@ async def create_new_client_event(
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.

state_map: A state map of previously created events, used only when creating events
for batch persisting

for_batch: whether the event is being created for batch persisting to the db

current_state_group: the current state group, used only for creating events for
batch persisting

Returns:
Tuple of created event, context
"""
Expand Down Expand Up @@ -1095,64 +1138,76 @@ async def create_new_client_event(
builder.type == EventTypes.Create or prev_event_ids
), "Attempting to create a non-m.room.create event with no prev_events"

event = await builder.build(
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
depth=depth,
)
if for_batch:
assert prev_event_ids is not None
assert state_map is not None
assert current_state_group is not None
auth_ids = self._event_auth_handler.compute_auth_events(builder, state_map)
event = await builder.build(
prev_event_ids=prev_event_ids, auth_event_ids=auth_ids, depth=depth
)
context = await self.state.compute_event_context_for_batched(
event, state_map, current_state_group
)
else:
event = await builder.build(
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
depth=depth,
)

# Pass on the outlier property from the builder to the event
# after it is created
if builder.internal_metadata.outlier:
event.internal_metadata.outlier = True
context = EventContext.for_outlier(self._storage_controllers)
elif (
event.type == EventTypes.MSC2716_INSERTION
and state_event_ids
and builder.internal_metadata.is_historical()
):
# Add explicit state to the insertion event so it has state to derive
# from even though it's floating with no `prev_events`. The rest of
# the batch can derive from this state and state_group.
#
# TODO(faster_joins): figure out how this works, and make sure that the
# old state is complete.
# https://github.com/matrix-org/synapse/issues/13003
metadata = await self.store.get_metadata_for_events(state_event_ids)

state_map_for_event: MutableStateMap[str] = {}
for state_id in state_event_ids:
data = metadata.get(state_id)
if data is None:
# We're trying to persist a new historical batch of events
# with the given state, e.g. via
# `RoomBatchSendEventRestServlet`. The state can be inferred
# by Synapse or set directly by the client.
#
# Either way, we should have persisted all the state before
# getting here.
raise Exception(
f"State event {state_id} not found in DB,"
" Synapse should have persisted it before using it."
)
# Pass on the outlier property from the builder to the event
# after it is created
if builder.internal_metadata.outlier:
event.internal_metadata.outlier = True
context = EventContext.for_outlier(self._storage_controllers)
elif (
event.type == EventTypes.MSC2716_INSERTION
and state_event_ids
and builder.internal_metadata.is_historical()
):
# Add explicit state to the insertion event so it has state to derive
# from even though it's floating with no `prev_events`. The rest of
# the batch can derive from this state and state_group.
#
# TODO(faster_joins): figure out how this works, and make sure that the
# old state is complete.
# https://github.com/matrix-org/synapse/issues/13003
metadata = await self.store.get_metadata_for_events(state_event_ids)

state_map_for_event: MutableStateMap[str] = {}
for state_id in state_event_ids:
data = metadata.get(state_id)
if data is None:
# We're trying to persist a new historical batch of events
# with the given state, e.g. via
# `RoomBatchSendEventRestServlet`. The state can be inferred
# by Synapse or set directly by the client.
#
# Either way, we should have persisted all the state before
# getting here.
raise Exception(
f"State event {state_id} not found in DB,"
" Synapse should have persisted it before using it."
)

if data.state_key is None:
raise Exception(
f"Trying to set non-state event {state_id} as state"
)
if data.state_key is None:
raise Exception(
f"Trying to set non-state event {state_id} as state"
)

state_map_for_event[(data.event_type, data.state_key)] = state_id
state_map_for_event[(data.event_type, data.state_key)] = state_id

context = await self.state.compute_event_context(
event,
state_ids_before_event=state_map_for_event,
# TODO(faster_joins): check how MSC2716 works and whether we can have
# partial state here
# https://github.com/matrix-org/synapse/issues/13003
partial_state=False,
)
else:
context = await self.state.compute_event_context(event)
context = await self.state.compute_event_context(
event,
state_ids_before_event=state_map_for_event,
# TODO(faster_joins): check how MSC2716 works and whether we can have
# partial state here
# https://github.com/matrix-org/synapse/issues/13003
partial_state=False,
)
else:
context = await self.state.compute_event_context(event)

if requester:
context.app_service = requester.app_service
Expand Down
Loading