Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix opentracing and Prometheus metrics for replication requests #10996

Merged
merged 5 commits into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10996.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.21.0 that causes opentracing and Prometheus metrics for replication requests to be measured incorrectly.
7 changes: 7 additions & 0 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,13 @@ def err_back(result):
result.addCallbacks(call_back, err_back)

else:
if inspect.iscoroutine(result):
logger.error(
"@trace did not wrap %s correctly! "
"The function is not async but returns a coroutine.",
func.__qualname__,
)
clokep marked this conversation as resolved.
Show resolved Hide resolved

scope.__exit__(None, None, None)

return result
Expand Down
154 changes: 78 additions & 76 deletions synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,85 +182,87 @@ def make_client(cls, hs):
)

@trace(opname="outgoing_replication_request")
@outgoing_gauge.track_inprogress()
async def send_request(*, instance_name="master", **kwargs):
if instance_name == local_instance_name:
raise Exception("Trying to send HTTP request to self")
if instance_name == "master":
host = master_host
port = master_port
elif instance_name in instance_map:
host = instance_map[instance_name].host
port = instance_map[instance_name].port
else:
raise Exception(
"Instance %r not in 'instance_map' config" % (instance_name,)
with outgoing_gauge.track_inprogress():
if instance_name == local_instance_name:
raise Exception("Trying to send HTTP request to self")
if instance_name == "master":
host = master_host
port = master_port
elif instance_name in instance_map:
host = instance_map[instance_name].host
port = instance_map[instance_name].port
else:
raise Exception(
"Instance %r not in 'instance_map' config" % (instance_name,)
)

data = await cls._serialize_payload(**kwargs)

url_args = [
urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
]

if cls.CACHE:
txn_id = random_string(10)
url_args.append(txn_id)

if cls.METHOD == "POST":
request_func = client.post_json_get_json
elif cls.METHOD == "PUT":
request_func = client.put_json
elif cls.METHOD == "GET":
request_func = client.get_json
else:
# We have already asserted in the constructor that a
# compatible was picked, but lets be paranoid.
raise Exception(
"Unknown METHOD on %s replication endpoint" % (cls.NAME,)
)

uri = "http://%s:%s/_synapse/replication/%s/%s" % (
host,
port,
cls.NAME,
"/".join(url_args),
)

data = await cls._serialize_payload(**kwargs)

url_args = [
urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
]

if cls.CACHE:
txn_id = random_string(10)
url_args.append(txn_id)

if cls.METHOD == "POST":
request_func = client.post_json_get_json
elif cls.METHOD == "PUT":
request_func = client.put_json
elif cls.METHOD == "GET":
request_func = client.get_json
else:
# We have already asserted in the constructor that a
# compatible was picked, but lets be paranoid.
raise Exception(
"Unknown METHOD on %s replication endpoint" % (cls.NAME,)
)

uri = "http://%s:%s/_synapse/replication/%s/%s" % (
host,
port,
cls.NAME,
"/".join(url_args),
)

try:
# We keep retrying the same request for timeouts. This is so that we
# have a good idea that the request has either succeeded or failed on
# the master, and so whether we should clean up or not.
while True:
headers: Dict[bytes, List[bytes]] = {}
# Add an authorization header, if configured.
if replication_secret:
headers[b"Authorization"] = [b"Bearer " + replication_secret]
opentracing.inject_header_dict(headers, check_destination=False)
try:
result = await request_func(uri, data, headers=headers)
break
except RequestTimedOutError:
if not cls.RETRY_ON_TIMEOUT:
raise

logger.warning("%s request timed out; retrying", cls.NAME)

# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
await clock.sleep(1)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the main process that we should send to the client. (And
# importantly, not stack traces everywhere)
_outgoing_request_counter.labels(cls.NAME, e.code).inc()
raise e.to_synapse_error()
except Exception as e:
_outgoing_request_counter.labels(cls.NAME, "ERR").inc()
raise SynapseError(502, "Failed to talk to main process") from e

_outgoing_request_counter.labels(cls.NAME, 200).inc()
return result
try:
# We keep retrying the same request for timeouts. This is so that we
# have a good idea that the request has either succeeded or failed
# on the master, and so whether we should clean up or not.
while True:
headers: Dict[bytes, List[bytes]] = {}
# Add an authorization header, if configured.
if replication_secret:
headers[b"Authorization"] = [
b"Bearer " + replication_secret
]
opentracing.inject_header_dict(headers, check_destination=False)
try:
result = await request_func(uri, data, headers=headers)
break
except RequestTimedOutError:
if not cls.RETRY_ON_TIMEOUT:
raise

logger.warning("%s request timed out; retrying", cls.NAME)

# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
await clock.sleep(1)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the main process that we should send to the client. (And
# importantly, not stack traces everywhere)
_outgoing_request_counter.labels(cls.NAME, e.code).inc()
raise e.to_synapse_error()
except Exception as e:
_outgoing_request_counter.labels(cls.NAME, "ERR").inc()
raise SynapseError(502, "Failed to talk to main process") from e

_outgoing_request_counter.labels(cls.NAME, 200).inc()
return result

return send_request

Expand Down