Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Start fewer opentracing spans #8640

Merged
merged 7 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8640.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce number of OpenTracing spans started.
50 changes: 43 additions & 7 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

import logging
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union

from prometheus_client import Counter

Expand All @@ -30,7 +30,10 @@
event_processing_loop_counter,
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.types import Collection, JsonDict, RoomStreamToken, UserID
from synapse.util.metrics import Measure

Expand All @@ -53,7 +56,7 @@ def __init__(self, hs):
self.current_max = 0
self.is_processing = False

async def notify_interested_services(self, max_token: RoomStreamToken):
def notify_interested_services(self, max_token: RoomStreamToken):
"""Notifies (pushes) all application services interested in this event.

Pushing is done asynchronously, so this method won't block for any
Expand All @@ -72,6 +75,12 @@ async def notify_interested_services(self, max_token: RoomStreamToken):
if self.is_processing:
return

# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
self._notify_interested_services(max_token)

@wrap_as_background_process("notify_interested_services")
async def _notify_interested_services(self, max_token: RoomStreamToken):
with Measure(self.clock, "notify_interested_services"):
self.is_processing = True
try:
Expand Down Expand Up @@ -166,8 +175,11 @@ async def handle_room_events(events):
finally:
self.is_processing = False

async def notify_interested_services_ephemeral(
self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [],
def notify_interested_services_ephemeral(
self,
stream_key: str,
new_token: Optional[int],
users: Collection[Union[str, UserID]] = [],
):
"""This is called by the notifier in the background
when a ephemeral event handled by the homeserver.
Expand All @@ -183,13 +195,34 @@ async def notify_interested_services_ephemeral(
new_token: The latest stream token
users: The user(s) involved with the event.
"""
if not self.notify_appservices:
return

if stream_key not in ("typing_key", "receipt_key", "presence_key"):
return

services = [
service
for service in self.store.get_app_services()
if service.supports_ephemeral
]
if not services or not self.notify_appservices:
if not services:
return

# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
self._notify_interested_services_ephemeral(
services, stream_key, new_token, users
)

@wrap_as_background_process("notify_interested_services_ephemeral")
async def _notify_interested_services_ephemeral(
self,
services: List[ApplicationService],
stream_key: str,
new_token: Optional[int],
users: Collection[Union[str, UserID]],
):
logger.info("Checking interested services for %s" % (stream_key))
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
Expand Down Expand Up @@ -237,14 +270,17 @@ async def _handle_receipts(self, service: ApplicationService):
return receipts

async def _handle_presence(
self, service: ApplicationService, users: Collection[UserID]
self, service: ApplicationService, users: Collection[Union[str, UserID]]
):
events = [] # type: List[JsonDict]
presence_source = self.event_sources.sources["presence"]
from_key = await self.store.get_type_stream_id_for_appservice(
service, "presence"
)
for user in users:
if isinstance(user, str):
user = UserID.from_string(user)
Comment on lines +281 to +282
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this just a separate bug?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the commit message:

In practice this wasn't an issue as we always pass UserID objects in that particular code path.


interested = await service.is_interested_in_presence(user, self.store)
if not interested:
continue
Expand Down
10 changes: 5 additions & 5 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def ensure_active_span_inner_2(*args, **kwargs):


@contextlib.contextmanager
def _noop_context_manager(*args, **kwargs):
def noop_context_manager(*args, **kwargs):
"""Does exactly what it says on the tin"""
yield

Expand Down Expand Up @@ -413,7 +413,7 @@ def start_active_span(
"""

if opentracing is None:
return _noop_context_manager()
return noop_context_manager()

return opentracing.tracer.start_active_span(
operation_name,
Expand All @@ -428,7 +428,7 @@ def start_active_span(

def start_active_span_follows_from(operation_name, contexts):
if opentracing is None:
return _noop_context_manager()
return noop_context_manager()

references = [opentracing.follows_from(context) for context in contexts]
scope = start_active_span(operation_name, references=references)
Expand Down Expand Up @@ -459,7 +459,7 @@ def start_active_span_from_request(
# Also, twisted uses byte arrays while opentracing expects strings.

if opentracing is None:
return _noop_context_manager()
return noop_context_manager()

header_dict = {
k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
Expand Down Expand Up @@ -497,7 +497,7 @@ def start_active_span_from_edu(
"""

if opentracing is None:
return _noop_context_manager()
return noop_context_manager()

carrier = json_decoder.decode(edu_content.get("context", "{}")).get(
"opentracing", {}
Expand Down
12 changes: 9 additions & 3 deletions synapse/metrics/background_process_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from twisted.internet import defer

from synapse.logging.context import LoggingContext, PreserveLoggingContext
from synapse.logging.opentracing import start_active_span
from synapse.logging.opentracing import noop_context_manager, start_active_span

if TYPE_CHECKING:
import resource
Expand Down Expand Up @@ -167,7 +167,7 @@ def update_metrics(self):
)


def run_as_background_process(desc: str, func, *args, **kwargs):
def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwargs):
"""Run the given function in its own logcontext, with resource metrics

This should be used to wrap processes which are fired off to run in the
Expand All @@ -181,6 +181,9 @@ def run_as_background_process(desc: str, func, *args, **kwargs):
Args:
desc: a description for this background process type
func: a function, which may return a Deferred or a coroutine
bg_start_span: Whether to start an opentracing span. Defaults to True.
Should only be disabled for processes that will not log to or tag
a span.
args: positional args for func
kwargs: keyword args for func

Expand All @@ -199,7 +202,10 @@ async def run():
with BackgroundProcessLoggingContext(desc) as context:
context.request = "%s-%i" % (desc, count)
try:
with start_active_span(desc, tags={"request_id": context.request}):
ctx = noop_context_manager()
if bg_start_span:
ctx = start_active_span(desc, tags={"request_id": context.request})
with ctx:
result = func(*args, **kwargs)

if inspect.isawaitable(result):
Expand Down
34 changes: 11 additions & 23 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.streams.config import PaginationConfig
from synapse.types import (
Collection,
Expand Down Expand Up @@ -310,44 +309,37 @@ def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken):
"""

# poke any interested application service.
run_as_background_process(
"_notify_app_services", self._notify_app_services, max_room_stream_token
)

run_as_background_process(
"_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_token
)
self._notify_app_services(max_room_stream_token)
self._notify_pusher_pool(max_room_stream_token)

if self.federation_sender:
self.federation_sender.notify_new_events(max_room_stream_token)

async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
try:
await self.appservice_handler.notify_interested_services(
max_room_stream_token
)
self.appservice_handler.notify_interested_services(max_room_stream_token)
except Exception:
logger.exception("Error notifying application services of event")

async def _notify_app_services_ephemeral(
def _notify_app_services_ephemeral(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
users: Collection[UserID] = [],
users: Collection[Union[str, UserID]] = [],
):
try:
stream_token = None
if isinstance(new_token, int):
stream_token = new_token
await self.appservice_handler.notify_interested_services_ephemeral(
self.appservice_handler.notify_interested_services_ephemeral(
stream_key, stream_token, users
)
except Exception:
logger.exception("Error notifying application services of event")

async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
await self._pusher_pool.on_new_notifications(max_room_stream_token)
self._pusher_pool.on_new_notifications(max_room_stream_token)
except Exception:
logger.exception("Error pusher pool of event")

Expand Down Expand Up @@ -384,12 +376,8 @@ def on_new_event(
self.notify_replication()

# Notify appservices
run_as_background_process(
"_notify_app_services_ephemeral",
self._notify_app_services_ephemeral,
stream_key,
new_token,
users,
self._notify_app_services_ephemeral(
stream_key, new_token, users,
)

def on_new_replication_data(self) -> None:
Expand Down
18 changes: 16 additions & 2 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

from prometheus_client import Gauge

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.push import PusherConfigException
from synapse.push.emailpusher import EmailPusher
from synapse.push.httppusher import HttpPusher
Expand Down Expand Up @@ -187,7 +190,7 @@ async def remove_pushers_by_access_token(self, user_id, access_tokens):
)
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])

async def on_new_notifications(self, max_token: RoomStreamToken):
def on_new_notifications(self, max_token: RoomStreamToken):
if not self.pushers:
# nothing to do here.
return
Expand All @@ -201,6 +204,17 @@ async def on_new_notifications(self, max_token: RoomStreamToken):
# Nothing to do
return

# We only start a new background process if necessary rather than
# optimistically (to cut down on overhead).
self._on_new_notifications(max_token)

@wrap_as_background_process("on_new_notifications")
async def _on_new_notifications(self, max_token: RoomStreamToken):
# We just use the minimum stream ordering and ignore the vector clock
# component. This is safe to do as long as we *always* ignore the vector
# clock components.
max_stream_id = max_token.stream

prev_stream_id = self._last_room_stream_id_seen
self._last_room_stream_id_seen = max_stream_id

Expand Down
4 changes: 3 additions & 1 deletion synapse/replication/tcp/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ def send_command(self, cmd: Command):
Args:
cmd (Command)
"""
run_as_background_process("send-cmd", self._async_send_command, cmd)
run_as_background_process(
"send-cmd", self._async_send_command, cmd, bg_start_span=False
)

async def _async_send_command(self, cmd: Command):
"""Encode a replication command and send it over our outbound connection"""
Expand Down