Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Replace uses of simple_insert_many with simple_insert_many_values. #11742

Merged
merged 6 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11742.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Minor efficiency improvements when inserting many values into the database.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I guess the efficiency savings here are:

  • the interpreter doesn't have to have to have PyObject* pointers repeatedly pointing to the same set of keys (column names)
    • not sure if it'd repeatedly allocate for copies of the same string literals?
  • the interpreter doesn't have to spend time calculating hashes, resizing the hash table, ... and any other fun involved in maintaining a dictionary
  • we don't have to spend time doing sorting in the code removed in 36919b1, and we save a function call too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mostly thinking of the allocation of all the dictionaries, but yes to all of that!

I also copied this changelog from the previous PR.

44 changes: 18 additions & 26 deletions synapse/rest/admin/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,41 +123,33 @@ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
job_name = body["job_name"]

if job_name == "populate_stats_process_rooms":
jobs = [
{
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
},
]
jobs = [("populate_stats_process_rooms", "{}", "")]
elif job_name == "regenerate_directory":
jobs = [
{
"update_name": "populate_user_directory_createtables",
"progress_json": "{}",
"depends_on": "",
},
{
"update_name": "populate_user_directory_process_rooms",
"progress_json": "{}",
"depends_on": "populate_user_directory_createtables",
},
{
"update_name": "populate_user_directory_process_users",
"progress_json": "{}",
"depends_on": "populate_user_directory_process_rooms",
},
{
"update_name": "populate_user_directory_cleanup",
"progress_json": "{}",
"depends_on": "populate_user_directory_process_users",
},
("populate_user_directory_createtables", "{}", ""),
(
"populate_user_directory_process_rooms",
"{}",
"populate_user_directory_createtables",
),
(
"populate_user_directory_process_users",
"{}",
"populate_user_directory_process_rooms",
),
(
"populate_user_directory_cleanup",
"{}",
"populate_user_directory_process_users",
),
]
else:
raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name")

try:
await self._store.db_pool.simple_insert_many(
table="background_updates",
keys=("update_name", "progress_json", "depends_on"),
values=jobs,
desc=f"admin_api_run_{job_name}",
)
Expand Down
54 changes: 2 additions & 52 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,56 +934,6 @@ def simple_insert_txn(
txn.execute(sql, vals)

async def simple_insert_many(
self, table: str, values: List[Dict[str, Any]], desc: str
) -> None:
"""Executes an INSERT query on the named table.

The input is given as a list of dicts, with one dict per row.
Generally simple_insert_many_values should be preferred for new code.

Args:
table: string giving the table name
values: dict of new column names and values for them
desc: description of the transaction, for logging and metrics
"""
await self.runInteraction(desc, self.simple_insert_many_txn, table, values)

@staticmethod
def simple_insert_many_txn(
txn: LoggingTransaction, table: str, values: List[Dict[str, Any]]
) -> None:
"""Executes an INSERT query on the named table.

The input is given as a list of dicts, with one dict per row.
Generally simple_insert_many_values_txn should be preferred for new code.

Args:
txn: The transaction to use.
table: string giving the table name
values: dict of new column names and values for them
"""
if not values:
return

# This is a *slight* abomination to get a list of tuples of key names
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some value of "slight"!

# and a list of tuples of value names.
#
# i.e. [{"a": 1, "b": 2}, {"c": 3, "d": 4}]
# => [("a", "b",), ("c", "d",)] and [(1, 2,), (3, 4,)]
#
# The sort is to ensure that we don't rely on dictionary iteration
# order.
keys, vals = zip(
*(zip(*(sorted(i.items(), key=lambda kv: kv[0]))) for i in values if i)
)

for k in keys:
if k != keys[0]:
raise RuntimeError("All items must have the same keys")

return DatabasePool.simple_insert_many_values_txn(txn, table, keys[0], vals)

async def simple_insert_many_values(
self,
table: str,
keys: Collection[str],
Expand All @@ -1002,11 +952,11 @@ async def simple_insert_many_values(
desc: description of the transaction, for logging and metrics
"""
await self.runInteraction(
desc, self.simple_insert_many_values_txn, table, keys, values
desc, self.simple_insert_many_txn, table, keys, values
)

@staticmethod
def simple_insert_many_values_txn(
def simple_insert_many_txn(
txn: LoggingTransaction,
table: str,
keys: Collection[str],
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,9 +536,9 @@ def _add_account_data_for_user(
self.db_pool.simple_insert_many_txn(
txn,
table="ignored_users",
keys=("ignorer_user_id", "ignored_user_id"),
values=[
{"ignorer_user_id": user_id, "ignored_user_id": u}
for u in currently_ignored_users - previously_ignored_users
(user_id, u) for u in currently_ignored_users - previously_ignored_users
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a handful (maybe 1/3rd?) of callers which could benefit from passing in some constant values for some keys, but I figured that could be left to another round of optimization, if we wanted to complicate the API. (I'm unsure if there's real benefit to it...)

],
)

Expand Down
30 changes: 16 additions & 14 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,14 +432,21 @@ def add_messages_txn(txn, now_ms, stream_id):
self.db_pool.simple_insert_many_txn(
txn,
table="device_federation_outbox",
keys=(
"destination",
"stream_id",
"queued_ts",
"messages_json",
"instance_name",
),
values=[
{
"destination": destination,
"stream_id": stream_id,
"queued_ts": now_ms,
"messages_json": json_encoder.encode(edu),
"instance_name": self._instance_name,
}
(
destination,
stream_id,
now_ms,
json_encoder.encode(edu),
self._instance_name,
)
for destination, edu in remote_messages_by_destination.items()
],
)
Expand Down Expand Up @@ -571,14 +578,9 @@ def _add_messages_to_local_device_inbox_txn(
self.db_pool.simple_insert_many_txn(
txn,
table="device_inbox",
keys=("user_id", "device_id", "stream_id", "message_json", "instance_name"),
values=[
{
"user_id": user_id,
"device_id": device_id,
"stream_id": stream_id,
"message_json": message_json,
"instance_name": self._instance_name,
}
(user_id, device_id, stream_id, message_json, self._instance_name)
for user_id, messages_by_device in local_by_user_then_device.items()
for device_id, message_json in messages_by_device.items()
],
Expand Down
37 changes: 22 additions & 15 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -1386,12 +1386,9 @@ def _update_remote_device_list_cache_txn(
self.db_pool.simple_insert_many_txn(
txn,
table="device_lists_remote_cache",
keys=("user_id", "device_id", "content"),
values=[
{
"user_id": user_id,
"device_id": content["device_id"],
"content": json_encoder.encode(content),
}
(user_id, content["device_id"], json_encoder.encode(content))
for content in devices
],
)
Expand Down Expand Up @@ -1479,8 +1476,9 @@ def _add_device_change_to_stream_txn(
self.db_pool.simple_insert_many_txn(
txn,
table="device_lists_stream",
keys=("stream_id", "user_id", "device_id"),
values=[
{"stream_id": stream_id, "user_id": user_id, "device_id": device_id}
(stream_id, user_id, device_id)
for stream_id, device_id in zip(stream_ids, device_ids)
],
)
Expand All @@ -1507,18 +1505,27 @@ def _add_device_outbound_poke_to_stream_txn(
self.db_pool.simple_insert_many_txn(
txn,
table="device_lists_outbound_pokes",
keys=(
"destination",
"stream_id",
"user_id",
"device_id",
"sent",
"ts",
"opentracing_context",
),
values=[
{
"destination": destination,
"stream_id": next(next_stream_id),
"user_id": user_id,
"device_id": device_id,
"sent": False,
"ts": now,
"opentracing_context": json_encoder.encode(context)
(
destination,
next(next_stream_id),
user_id,
device_id,
False,
now,
json_encoder.encode(context)
if whitelisted_homeserver(destination)
else "{}",
}
)
for destination in hosts
for device_id in device_ids
],
Expand Down
6 changes: 2 additions & 4 deletions synapse/storage/databases/main/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,8 @@ def alias_txn(txn: LoggingTransaction) -> None:
self.db_pool.simple_insert_many_txn(
txn,
table="room_alias_servers",
values=[
{"room_alias": room_alias.to_string(), "server": server}
for server in servers
],
keys=("room_alias", "server"),
values=[(room_alias.to_string(), server) for server in servers],
)

self._invalidate_cache_and_stream(
Expand Down
34 changes: 23 additions & 11 deletions synapse/storage/databases/main/e2e_room_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,16 @@ async def add_e2e_room_keys(
values = []
for (room_id, session_id, room_key) in room_keys:
values.append(
{
"user_id": user_id,
"version": version_int,
"room_id": room_id,
"session_id": session_id,
"first_message_index": room_key["first_message_index"],
"forwarded_count": room_key["forwarded_count"],
"is_verified": room_key["is_verified"],
"session_data": json_encoder.encode(room_key["session_data"]),
}
(
user_id,
version_int,
room_id,
session_id,
room_key["first_message_index"],
room_key["forwarded_count"],
room_key["is_verified"],
json_encoder.encode(room_key["session_data"]),
)
)
log_kv(
{
Expand All @@ -131,7 +131,19 @@ async def add_e2e_room_keys(
)

await self.db_pool.simple_insert_many(
table="e2e_room_keys", values=values, desc="add_e2e_room_keys"
table="e2e_room_keys",
keys=(
"user_id",
"version",
"room_id",
"session_id",
"first_message_index",
"forwarded_count",
"is_verified",
"session_data",
),
values=values,
desc="add_e2e_room_keys",
)

@trace
Expand Down
42 changes: 25 additions & 17 deletions synapse/storage/databases/main/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,15 +387,16 @@ def _add_e2e_one_time_keys(txn: LoggingTransaction) -> None:
self.db_pool.simple_insert_many_txn(
txn,
table="e2e_one_time_keys_json",
keys=(
"user_id",
"device_id",
"algorithm",
"key_id",
"ts_added_ms",
"key_json",
),
values=[
{
"user_id": user_id,
"device_id": device_id,
"algorithm": algorithm,
"key_id": key_id,
"ts_added_ms": time_now,
"key_json": json_bytes,
}
(user_id, device_id, algorithm, key_id, time_now, json_bytes)
for algorithm, key_id, json_bytes in new_keys
],
)
Expand Down Expand Up @@ -1186,15 +1187,22 @@ async def store_e2e_cross_signing_signatures(
"""
await self.db_pool.simple_insert_many(
"e2e_cross_signing_signatures",
[
{
"user_id": user_id,
"key_id": item.signing_key_id,
"target_user_id": item.target_user_id,
"target_device_id": item.target_device_id,
"signature": item.signature,
}
keys=(
"user_id",
"key_id",
"target_user_id",
"target_device_id",
"signature",
),
values=[
(
user_id,
item.signing_key_id,
item.target_user_id,
item.target_device_id,
item.signature,
)
for item in signatures
],
"add_e2e_signing_key",
desc="add_e2e_signing_key",
)
Loading