Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve logging when processing incoming transactions #9596

Merged
merged 3 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9596.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve logging when processing incoming transactions.
61 changes: 34 additions & 27 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,34 +335,41 @@ async def _handle_pdus_in_txn(
# impose a limit to avoid going too crazy with ram/cpu.

async def process_pdus_for_room(room_id: str):
logger.debug("Processing PDUs for %s", room_id)
try:
await self.check_server_matches_acl(origin_host, room_id)
except AuthError as e:
logger.warning("Ignoring PDUs for room %s from banned server", room_id)
for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
pdu_results[event_id] = e.error_dict()
return
with nested_logging_context(room_id):
logger.debug("Processing PDUs for %s", room_id)

for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
with pdu_process_time.time():
with nested_logging_context(event_id):
try:
await self._handle_received_pdu(origin, pdu)
pdu_results[event_id] = {}
except FederationError as e:
logger.warning("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
except Exception as e:
f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
logger.error(
"Failed to handle PDU %s",
event_id,
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
)
try:
await self.check_server_matches_acl(origin_host, room_id)
except AuthError as e:
logger.warning(
"Ignoring PDUs for room %s from banned server", room_id
)
for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
pdu_results[event_id] = e.error_dict()
return

for pdu in pdus_by_room[room_id]:
pdu_results[pdu.event_id] = await process_pdu(pdu)

async def process_pdu(pdu: EventBase) -> JsonDict:
event_id = pdu.event_id
with pdu_process_time.time():
with nested_logging_context(event_id):
try:
await self._handle_received_pdu(origin, pdu)
return {}
except FederationError as e:
logger.warning("Error handling PDU %s: %s", event_id, e)
return {"error": str(e)}
except Exception as e:
f = failure.Failure()
logger.error(
"Failed to handle PDU %s",
event_id,
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
)
return {"error": str(e)}

await concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
Expand Down
62 changes: 16 additions & 46 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:
or pdu.internal_metadata.is_outlier()
)
if already_seen:
logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
logger.debug("Already seen pdu")
return

# do some initial sanity-checking of the event. In particular, make
Expand All @@ -210,18 +210,14 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:
try:
self._sanity_check_event(pdu)
except SynapseError as err:
logger.warning(
"[%s %s] Received event failed sanity checks", room_id, event_id
)
logger.warning("Received event failed sanity checks")
raise FederationError("ERROR", err.code, err.msg, affected=pdu.event_id)

# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if room_id in self.room_queues:
logger.info(
"[%s %s] Queuing PDU from %s for now: join in progress",
room_id,
event_id,
"Queuing PDU from %s for now: join in progress",
origin,
)
self.room_queues[room_id].append((pdu, origin))
Expand All @@ -236,9 +232,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:
is_in_room = await self.auth.check_host_in_room(room_id, self.server_name)
if not is_in_room:
logger.info(
"[%s %s] Ignoring PDU from %s as we're not in the room",
room_id,
event_id,
"Ignoring PDU from %s as we're not in the room",
origin,
)
return None
Expand All @@ -250,7 +244,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:
# We only backfill backwards to the min depth.
min_depth = await self.get_min_depth_for_context(pdu.room_id)

logger.debug("[%s %s] min_depth: %d", room_id, event_id, min_depth)
logger.debug("min_depth: %d", min_depth)

prevs = set(pdu.prev_event_ids())
seen = await self.store.have_events_in_timeline(prevs)
Expand All @@ -267,17 +261,13 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:
# If we're missing stuff, ensure we only fetch stuff one
# at a time.
logger.info(
"[%s %s] Acquiring room lock to fetch %d missing prev_events: %s",
room_id,
event_id,
"Acquiring room lock to fetch %d missing prev_events: %s",
len(missing_prevs),
shortstr(missing_prevs),
)
with (await self._room_pdu_linearizer.queue(pdu.room_id)):
logger.info(
"[%s %s] Acquired room lock to fetch %d missing prev_events",
room_id,
event_id,
"Acquired room lock to fetch %d missing prev_events",
len(missing_prevs),
)

Expand All @@ -297,9 +287,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:

if not prevs - seen:
logger.info(
"[%s %s] Found all missing prev_events",
room_id,
event_id,
"Found all missing prev_events",
)

if prevs - seen:
Expand Down Expand Up @@ -329,9 +317,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:

if sent_to_us_directly:
logger.warning(
"[%s %s] Rejecting: failed to fetch %d prev events: %s",
room_id,
event_id,
"Rejecting: failed to fetch %d prev events: %s",
len(prevs - seen),
shortstr(prevs - seen),
)
Expand Down Expand Up @@ -414,10 +400,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:
state = [event_map[e] for e in state_map.values()]
except Exception:
logger.warning(
"[%s %s] Error attempting to resolve state at missing "
"prev_events",
room_id,
event_id,
"Error attempting to resolve state at missing " "prev_events",
exc_info=True,
)
raise FederationError(
Expand Down Expand Up @@ -454,9 +437,7 @@ async def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
latest |= seen

logger.info(
"[%s %s]: Requesting missing events between %s and %s",
room_id,
event_id,
"Requesting missing events between %s and %s",
shortstr(latest),
event_id,
)
Expand Down Expand Up @@ -523,15 +504,11 @@ async def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
# We failed to get the missing events, but since we need to handle
# the case of `get_missing_events` not returning the necessary
# events anyway, it is safe to simply log the error and continue.
logger.warning(
"[%s %s]: Failed to get prev_events: %s", room_id, event_id, e
)
logger.warning("Failed to get prev_events: %s", e)
return

logger.info(
"[%s %s]: Got %d prev_events: %s",
room_id,
event_id,
"Got %d prev_events: %s",
len(missing_events),
shortstr(missing_events),
)
Expand All @@ -542,9 +519,7 @@ async def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):

for ev in missing_events:
logger.info(
"[%s %s] Handling received prev_event %s",
room_id,
event_id,
"Handling received prev_event %s",
ev.event_id,
)
with nested_logging_context(ev.event_id):
Expand All @@ -553,9 +528,7 @@ async def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
except FederationError as e:
if e.code == 403:
logger.warning(
"[%s %s] Received prev_event %s failed history check.",
room_id,
event_id,
"Received prev_event %s failed history check.",
ev.event_id,
)
else:
Expand Down Expand Up @@ -707,10 +680,7 @@ async def _process_received_pdu(
(ie, we are missing one or more prev_events), the resolved state at the
event
"""
room_id = event.room_id
event_id = event.event_id

logger.debug("[%s %s] Processing event: %s", room_id, event_id, event)
logger.debug("Processing event: %s", event)

try:
await self._handle_new_event(origin, event, state=state)
Expand Down