Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add ability to wait for locks and add locks to purge history / room deletion #15791

Merged
merged 11 commits into from
Jul 31, 2023
1 change: 1 addition & 0 deletions changelog.d/15791.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where purging history and paginating simultaneously could lead to database corruption when using workers.
17 changes: 14 additions & 3 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
)
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import (
make_deferred_yieldable,
Expand Down Expand Up @@ -137,6 +138,7 @@ def __init__(self, hs: "HomeServer"):
self._event_auth_handler = hs.get_event_auth_handler()
self._room_member_handler = hs.get_room_member_handler()
self._e2e_keys_handler = hs.get_e2e_keys_handler()
self._worker_lock_handler = hs.get_worker_locks_handler()

self._state_storage_controller = hs.get_storage_controllers().state

Expand Down Expand Up @@ -1236,9 +1238,18 @@ async def _process_incoming_pdus_in_room_inner(
logger.info("handling received PDU in room %s: %s", room_id, event)
try:
with nested_logging_context(event.event_id):
await self._federation_event_handler.on_receive_pdu(
origin, event
)
# We're taking out a lock within a lock, which could
# lead to deadlocks if we're not careful. However, it is
# safe on this occasion as we only ever take a write
# lock when deleting a room, which we would never do
# while holding the `_INBOUND_EVENT_HANDLING_LOCK_NAME`
# lock.
async with self._worker_lock_handler.acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
):
await self._federation_event_handler.on_receive_pdu(
origin, event
)
except FederationError as e:
# XXX: Ideally we'd inform the remote we failed to process
# the event, but we can't return an error in the transaction
Expand Down
38 changes: 37 additions & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field
from synapse.events.validator import EventValidator
from synapse.handlers.directory import DirectoryHandler
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
Expand Down Expand Up @@ -485,6 +486,7 @@ def __init__(self, hs: "HomeServer"):
self._events_shard_config = self.config.worker.events_shard_config
self._instance_name = hs.get_instance_name()
self._notifier = hs.get_notifier()
self._worker_lock_handler = hs.get_worker_locks_handler()

self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state

Expand Down Expand Up @@ -1010,6 +1012,37 @@ async def create_and_send_nonmember_event(
event.internal_metadata.stream_ordering,
)

async with self._worker_lock_handler.acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
):
return await self._create_and_send_nonmember_event(
requester=requester,
event_dict=event_dict,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
state_event_ids=state_event_ids,
ratelimit=ratelimit,
txn_id=txn_id,
ignore_shadow_ban=ignore_shadow_ban,
outlier=outlier,
depth=depth,
)

async def _create_and_send_nonmember_event(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate function name?

oh, underscored. Can it be prefixed or suffixed with _locked so we know the difference? :)

self,
requester: Requester,
event_dict: dict,
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
state_event_ids: Optional[List[str]] = None,
ratelimit: bool = True,
txn_id: Optional[str] = None,
ignore_shadow_ban: bool = False,
outlier: bool = False,
depth: Optional[int] = None,
) -> Tuple[EventBase, int]:
room_id = event_dict["room_id"]

# If we don't have any prev event IDs specified then we need to
# check that the host is in the room (as otherwise populating the
# prev events will fail), at which point we may as well check the
Expand Down Expand Up @@ -1924,7 +1957,10 @@ async def _send_dummy_events_to_fill_extremities(self) -> None:
)

for room_id in room_ids:
dummy_event_sent = await self._send_dummy_event_for_room(room_id)
async with self._worker_lock_handler.acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
):
dummy_event_sent = await self._send_dummy_event_for_room(room_id)

if not dummy_event_sent:
# Did not find a valid user in the room, so remove from future attempts
Expand Down
23 changes: 19 additions & 4 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3


PURGE_HISTORY_LOCK_NAME = "purge_history_lock"

DELETE_ROOM_LOCK_NAME = "delete_room_lock"


@attr.s(slots=True, auto_attribs=True)
class PurgeStatus:
"""Object tracking the status of a purge request
Expand Down Expand Up @@ -142,6 +147,7 @@ def __init__(self, hs: "HomeServer"):
self._server_name = hs.hostname
self._room_shutdown_handler = hs.get_room_shutdown_handler()
self._relations_handler = hs.get_relations_handler()
self._worker_locks = hs.get_worker_locks_handler()

self.pagination_lock = ReadWriteLock()
# IDs of rooms in which there currently an active purge *or delete* operation.
Expand Down Expand Up @@ -356,7 +362,9 @@ async def _purge_history(
"""
self._purges_in_progress_by_room.add(room_id)
try:
async with self.pagination_lock.write(room_id):
async with self._worker_locks.acquire_read_write_lock(
PURGE_HISTORY_LOCK_NAME, room_id, write=True
):
await self._storage_controllers.purge_events.purge_history(
room_id, token, delete_local_events
)
Expand Down Expand Up @@ -412,7 +420,10 @@ async def purge_room(self, room_id: str, force: bool = False) -> None:
room_id: room to be purged
force: set true to skip checking for joined users.
"""
async with self.pagination_lock.write(room_id):
async with self._worker_locks.acquire_multi_read_write_lock(
[(PURGE_HISTORY_LOCK_NAME, room_id), (DELETE_ROOM_LOCK_NAME, room_id)],
write=True,
):
# first check that we have no users in this room
if not force:
joined = await self.store.is_host_joined(room_id, self._server_name)
Expand Down Expand Up @@ -471,7 +482,9 @@ async def get_messages(

room_token = from_token.room_key

async with self.pagination_lock.read(room_id):
async with self._worker_locks.acquire_read_write_lock(
PURGE_HISTORY_LOCK_NAME, room_id, write=False
):
(membership, member_event_id) = (None, None)
if not use_admin_priviledge:
(
Expand Down Expand Up @@ -747,7 +760,9 @@ async def _shutdown_and_purge_room(

self._purges_in_progress_by_room.add(room_id)
try:
async with self.pagination_lock.write(room_id):
async with self._worker_locks.acquire_read_write_lock(
PURGE_HISTORY_LOCK_NAME, room_id, write=True
):
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
self._delete_by_id[
delete_id
Expand Down
45 changes: 25 additions & 20 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from synapse.events.snapshot import EventContext
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.logging import opentracing
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
Expand Down Expand Up @@ -94,6 +95,7 @@ def __init__(self, hs: "HomeServer"):
self.event_creation_handler = hs.get_event_creation_handler()
self.account_data_handler = hs.get_account_data_handler()
self.event_auth_handler = hs.get_event_auth_handler()
self._worker_lock_handler = hs.get_worker_locks_handler()

self.member_linearizer: Linearizer = Linearizer(name="member")
self.member_as_limiter = Linearizer(max_count=10, name="member_as_limiter")
Expand Down Expand Up @@ -638,26 +640,29 @@ async def update_membership(
# by application services), and then by room ID.
async with self.member_as_limiter.queue(as_id):
async with self.member_linearizer.queue(key):
with opentracing.start_active_span("update_membership_locked"):
result = await self.update_membership_locked(
requester,
target,
room_id,
action,
txn_id=txn_id,
remote_room_hosts=remote_room_hosts,
third_party_signed=third_party_signed,
ratelimit=ratelimit,
content=content,
new_room=new_room,
require_consent=require_consent,
outlier=outlier,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
state_event_ids=state_event_ids,
depth=depth,
origin_server_ts=origin_server_ts,
)
async with self._worker_lock_handler.acquire_read_write_lock(
DELETE_ROOM_LOCK_NAME, room_id, write=False
):
with opentracing.start_active_span("update_membership_locked"):
result = await self.update_membership_locked(
requester,
target,
room_id,
action,
txn_id=txn_id,
remote_room_hosts=remote_room_hosts,
third_party_signed=third_party_signed,
ratelimit=ratelimit,
content=content,
new_room=new_room,
require_consent=require_consent,
outlier=outlier,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
state_event_ids=state_event_ids,
depth=depth,
origin_server_ts=origin_server_ts,
)

return result

Expand Down
Loading
Loading