Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add types to synapse.util. #10601

Merged
merged 41 commits into from
Sep 10, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
ff8f94d
fairly mechanical changes
reivilibre Aug 13, 2021
6deee28
stranger changes (REVIEW)
reivilibre Aug 13, 2021
3c2b4dd
Newsfile & mypy.ini
reivilibre Aug 13, 2021
1d0b435
Put the switch back in to the 'more magic' position
reivilibre Aug 16, 2021
22df193
Fix up some more types
reivilibre Aug 16, 2021
e36db3f
Update annotations in util
reivilibre Aug 18, 2021
db57064
Fix fallout (related annotations and assertions around codebase)
reivilibre Aug 18, 2021
cd15b4b
Merge remote-tracking branch 'origin/develop' into rei/types1
reivilibre Aug 18, 2021
348f9ff
antilint
reivilibre Aug 18, 2021
d081c83
add type parameters for Deferreds
reivilibre Aug 18, 2021
76c3b6b
Fix circular import of HomeServer
reivilibre Aug 23, 2021
30ffee4
Quote deferreds in method signatures
reivilibre Aug 23, 2021
10bd84f
Annotate more types
reivilibre Sep 1, 2021
0c26b7f
Use attrs class and fix ignored fields [WANTS REVIEW]
reivilibre Sep 1, 2021
715bfdc
Ignore import issues [WANTS REVIEW]
reivilibre Sep 1, 2021
1e4632f
Annotate more types
reivilibre Sep 1, 2021
05cc10c
Annotate more types
reivilibre Sep 2, 2021
1c6704c
Annotate types and ignore Twisted issues [WANTS REVIEW]
reivilibre Sep 2, 2021
c384373
Add IReactorThreads as parent of ISynapseReactor
reivilibre Sep 2, 2021
884a8b6
Annotate more types
reivilibre Sep 2, 2021
a22f4c0
Add type annotation fixes to fix CI
reivilibre Sep 2, 2021
029bf34
Merge remote-tracking branch 'origin/develop' into rei/types1
reivilibre Sep 2, 2021
9444ca1
Resolve type issue that arose from merge
reivilibre Sep 2, 2021
a0aef0b
Back out of generics due to python-attrs/attrs#313
reivilibre Sep 2, 2021
289df40
Quote return types with Deferreds
reivilibre Sep 3, 2021
8e719ed
Fix use of None as default
reivilibre Sep 6, 2021
34e327d
Use a cast to work around Mocks not working with isinstance
reivilibre Sep 6, 2021
cd9a68d
Fix up parameters which were previously silently ignored
reivilibre Sep 6, 2021
b4cded1
Apply suggestions
reivilibre Sep 8, 2021
6f7fac0
Use `cast` to IReactorTime [WANTS REVIEW]
reivilibre Sep 8, 2021
d4afbca
Add types and casts to `__exit__` [REVIEW]
reivilibre Sep 8, 2021
f5cee54
Fix adherence to Jinja2's interface [REVIEW]
reivilibre Sep 8, 2021
12cfb9a
Annotate `WheelTimer`, notably `bucket_size`
reivilibre Sep 8, 2021
e69a3d6
Update Newsfile
reivilibre Sep 8, 2021
9f301ae
Note that code was lifted from CPython
reivilibre Sep 8, 2021
e6618d7
Add more type annotations
reivilibre Sep 8, 2021
b1b4f1b
Enable stricter checking on applicable modules
reivilibre Sep 8, 2021
ea4f7e0
Merge remote-tracking branch 'origin/develop' into rei/types1
reivilibre Sep 8, 2021
8871674
Correct types used in `__exit__`
reivilibre Sep 8, 2021
20d63a0
Fix up manhole types after merge [REVIEW, SEE DESC]
reivilibre Sep 8, 2021
19a602e
Avoid using evil typecasts
reivilibre Sep 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10601.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add type annotations to complete the synapse.util package.
12 changes: 1 addition & 11 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,7 @@ files =
synapse/storage/util,
synapse/streams,
synapse/types.py,
synapse/util/async_helpers.py,
synapse/util/caches,
synapse/util/daemonize.py,
synapse/util/hash.py,
synapse/util/iterutils.py,
synapse/util/linked_list.py,
synapse/util/metrics.py,
synapse/util/macaroons.py,
synapse/util/module_loader.py,
synapse/util/msisdn.py,
synapse/util/stringutils.py,
synapse/util,
synapse/visibility.py,
tests/replication,
tests/test_event_auth.py,
Expand Down
8 changes: 4 additions & 4 deletions synapse/api/ratelimiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(
# * How many times an action has occurred since a point in time
# * The point in time
# * The rate_hz of this particular entry. This can vary per request
self.actions: OrderedDict[Hashable, Tuple[float, int, float]] = OrderedDict()
self.actions: OrderedDict[Hashable, Tuple[float, float, float]] = OrderedDict()

async def can_do_action(
self,
Expand All @@ -56,7 +56,7 @@ async def can_do_action(
burst_count: Optional[int] = None,
update: bool = True,
n_actions: int = 1,
_time_now_s: Optional[int] = None,
_time_now_s: Optional[float] = None,
) -> Tuple[bool, float]:
"""Can the entity (e.g. user or IP address) perform the action?

Expand Down Expand Up @@ -160,7 +160,7 @@ async def can_do_action(

return allowed, time_allowed

def _prune_message_counts(self, time_now_s: int):
def _prune_message_counts(self, time_now_s: float):
"""Remove message count entries that have not exceeded their defined
rate_hz limit

Expand Down Expand Up @@ -188,7 +188,7 @@ async def ratelimit(
burst_count: Optional[int] = None,
update: bool = True,
n_actions: int = 1,
_time_now_s: Optional[int] = None,
_time_now_s: Optional[float] = None,
):
"""Checks if an action can be performed. If not, raises a LimitExceededError

Expand Down
8 changes: 6 additions & 2 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from typing_extensions import Literal

from twisted.internet import defer
from twisted.internet.interfaces import IDelayedCall

import synapse.metrics
from synapse.api.presence import UserPresenceState
Expand Down Expand Up @@ -284,7 +285,9 @@ def __init__(self, hs: "HomeServer"):
)

# wake up destinations that have outstanding PDUs to be caught up
self._catchup_after_startup_timer = self.clock.call_later(
self._catchup_after_startup_timer: Optional[
IDelayedCall
] = self.clock.call_later(
CATCH_UP_STARTUP_DELAY_SEC,
run_as_background_process,
"wake_destinations_needing_catchup",
Expand Down Expand Up @@ -406,7 +409,7 @@ async def handle_event(event: EventBase) -> None:

now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)

assert ts is not None
synapse.metrics.event_processing_lag_by_event.labels(
"federation_sender"
).observe((now - ts) / 1000)
Expand Down Expand Up @@ -435,6 +438,7 @@ async def handle_room_events(events: Iterable[EventBase]) -> None:
if events:
now = self.clock.time_msec()
ts = await self.store.get_received_ts(events[-1].event_id)
assert ts is not None

synapse.metrics.event_processing_lag.labels(
"federation_sender"
Expand Down
1 change: 1 addition & 0 deletions synapse/handlers/account_validity.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ async def renew_account_for_user(
"""
now = self.clock.time_msec()
if expiration_ts is None:
assert self._account_validity_period is not None
expiration_ts = now + self._account_validity_period

await self.store.set_account_validity_for_user(
Expand Down
3 changes: 3 additions & 0 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ async def start_scheduler():

now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
assert ts is not None

synapse.metrics.event_processing_lag_by_event.labels(
"appservice_sender"
).observe((now - ts) / 1000)
Expand Down Expand Up @@ -166,6 +168,7 @@ async def handle_room_events(events):
if events:
now = self.clock.time_msec()
ts = await self.store.get_received_ts(events[-1].event_id)
assert ts is not None

synapse.metrics.event_processing_lag.labels(
"appservice_sender"
Expand Down
5 changes: 3 additions & 2 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from contextlib import contextmanager
from typing import (
TYPE_CHECKING,
Any,
Callable,
Collection,
Dict,
Expand Down Expand Up @@ -610,7 +611,7 @@ def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.hs = hs
self.server_name = hs.hostname
self.wheel_timer = WheelTimer()
self.wheel_timer: WheelTimer[str] = WheelTimer()
self.notifier = hs.get_notifier()
self._presence_enabled = hs.config.use_presence

Expand Down Expand Up @@ -919,7 +920,7 @@ async def bump_presence_active_time(self, user: UserID) -> None:

prev_state = await self.current_state_for_user(user_id)

new_fields = {"last_active_ts": self.clock.time_msec()}
new_fields: Dict[str, Any] = {"last_active_ts": self.clock.time_msec()}
if prev_state.state == PresenceState.UNAVAILABLE:
new_fields["state"] = PresenceState.ONLINE

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def __init__(self, hs: "HomeServer"):
self._room_typing: Dict[str, Set[str]] = {}

self._member_last_federation_poke: Dict[RoomMember, int] = {}
self.wheel_timer = WheelTimer(bucket_size=5000)
self.wheel_timer: WheelTimer[RoomMember] = WheelTimer(bucket_size=5000)
self._latest_room_serial = 0

self.clock.looping_call(self._handle_timeouts, 5000)
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,7 @@ def set_expiration_date_for_user_txn(self, txn, user_id, use_delta=False):
delta equal to 10% of the validity period.
"""
now_ms = self._clock.time_msec()
assert self._account_validity_period is not None
expiration_ts = now_ms + self._account_validity_period

if use_delta:
Expand Down
22 changes: 12 additions & 10 deletions synapse/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
import json
import logging
import re
from typing import Pattern
from typing import Any, Dict, Pattern

import attr
from frozendict import frozendict

from twisted.internet import defer, task
from twisted.internet.interfaces import IDelayedCall
from twisted.internet.task import LoopingCall

from synapse.logging import context

Expand All @@ -30,12 +32,12 @@
_WILDCARD_RUN = re.compile(r"([\?\*]+)")


def _reject_invalid_json(val):
def _reject_invalid_json(val) -> None:
"""Do not allow Infinity, -Infinity, or NaN values in JSON."""
raise ValueError("Invalid JSON value: '%s'" % val)


def _handle_frozendict(obj):
def _handle_frozendict(obj: Any) -> Dict[Any, Any]:
"""Helper for json_encoder. Makes frozendicts serializable by returning
the underlying dict
"""
Expand Down Expand Up @@ -78,22 +80,22 @@ class Clock:
_reactor = attr.ib()

@defer.inlineCallbacks
def sleep(self, seconds):
d = defer.Deferred()
def sleep(self, seconds: float):
d: defer.Deferred[float] = defer.Deferred()
with context.PreserveLoggingContext():
self._reactor.callLater(seconds, d.callback, seconds)
res = yield d
return res

def time(self):
def time(self) -> float:
"""Returns the current system time in seconds since epoch."""
return self._reactor.seconds()

def time_msec(self):
def time_msec(self) -> int:
"""Returns the current system time in milliseconds since epoch."""
return int(self.time() * 1000)

def looping_call(self, f, msec, *args, **kwargs):
def looping_call(self, f, msec, *args, **kwargs) -> LoopingCall:
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
"""Call a function repeatedly.

Waits `msec` initially before calling `f` for the first time.
Expand All @@ -113,7 +115,7 @@ def looping_call(self, f, msec, *args, **kwargs):
d.addErrback(log_failure, "Looping call died", consumeErrors=False)
return call

def call_later(self, delay, callback, *args, **kwargs):
def call_later(self, delay, callback, *args, **kwargs) -> IDelayedCall:
"""Call something later

Note that the function will be called with no logcontext, so if it is anything
Expand All @@ -133,7 +135,7 @@ def wrapped_callback(*args, **kwargs):
with context.PreserveLoggingContext():
return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs)

def cancel_call_later(self, timer, ignore_errs=False):
def cancel_call_later(self, timer: IDelayedCall, ignore_errs=False) -> None:
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
try:
timer.cancel()
except Exception:
Expand Down
2 changes: 1 addition & 1 deletion synapse/util/batching_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ async def add_to_queue(self, value: V, key: Hashable = ()) -> R:

# First we create a defer and add it and the value to the list of
# pending items.
d = defer.Deferred()
d: defer.Deferred[R] = defer.Deferred()
self._next_values.setdefault(key, []).append((value, d))

# If we're not currently processing the key fire off a background
Expand Down
21 changes: 11 additions & 10 deletions synapse/util/distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Any, Callable, Dict, List

from twisted.internet import defer

Expand All @@ -38,10 +39,10 @@ class Distributor:
"""

def __init__(self):
self.signals = {}
self.pre_registration = {}
self.signals: Dict[str, Signal] = {}
self.pre_registration: Dict[str, List[Callable]] = {}

def declare(self, name):
def declare(self, name: str) -> None:
if name in self.signals:
raise KeyError("%r already has a signal named %s" % (self, name))

Expand All @@ -52,7 +53,7 @@ def declare(self, name):
for observer in self.pre_registration[name]:
signal.observe(observer)

def observe(self, name, observer):
def observe(self, name: str, observer: Callable) -> None:
if name in self.signals:
self.signals[name].observe(observer)
else:
Expand All @@ -62,7 +63,7 @@ def observe(self, name, observer):
self.pre_registration[name] = []
self.pre_registration[name].append(observer)

def fire(self, name, *args, **kwargs):
def fire(self, name: str, *args, **kwargs) -> None:
"""Dispatches the given signal to the registered observers.

Runs the observers as a background process. Does not return a deferred.
Expand All @@ -83,18 +84,18 @@ class Signal:
method into all of the observers.
"""

def __init__(self, name):
self.name = name
self.observers = []
def __init__(self, name: str):
self.name: str = name
self.observers: List[Callable] = []

def observe(self, observer):
def observe(self, observer: Callable) -> None:
"""Adds a new callable to the observer list which will be invoked by
the 'fire' method.

Each observer callable may return a Deferred."""
self.observers.append(observer)

def fire(self, *args, **kwargs):
def fire(self, *args, **kwargs) -> "defer.Deferred[List[Any]]":
"""Invokes every callable in the observer list, passing in the args and
kwargs. Exceptions thrown by observers are logged but ignored. It is
not an error to fire a signal with no observers.
Expand Down
16 changes: 10 additions & 6 deletions synapse/util/file_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import queue
from typing import Optional

from twisted.internet import threads

Expand Down Expand Up @@ -51,15 +52,15 @@ def __init__(self, file_obj, reactor):

# Queue of slices of bytes to be written. When producer calls
# unregister a final None is sent.
self._bytes_queue = queue.Queue()
self._bytes_queue: queue.Queue[Optional[bytes]] = queue.Queue()

# Deferred that is resolved when finished writing
self._finished_deferred = None

# If the _writer thread throws an exception it gets stored here.
self._write_exception = None

def registerProducer(self, producer, streaming):
def registerProducer(self, producer, streaming) -> None:
"""Part of IConsumer interface

Args:
Expand All @@ -81,17 +82,19 @@ def registerProducer(self, producer, streaming):
if not streaming:
self._producer.resumeProducing()

def unregisterProducer(self):
def unregisterProducer(self) -> None:
"""Part of IProducer interface"""
self._producer = None
assert self._finished_deferred is not None
if not self._finished_deferred.called:
self._bytes_queue.put_nowait(None)

def write(self, bytes):
def write(self, bytes) -> None:
"""Part of IProducer interface"""
if self._write_exception:
raise self._write_exception

assert self._finished_deferred is not None
if self._finished_deferred.called:
raise Exception("consumer has closed")

Expand All @@ -101,9 +104,10 @@ def write(self, bytes):
# then we pause the producer.
if self.streaming and self._bytes_queue.qsize() >= self._PAUSE_ON_QUEUE_SIZE:
self._paused_producer = True
assert self._producer is not None
self._producer.pauseProducing()

def _writer(self):
def _writer(self) -> None:
"""This is run in a background thread to write to the file."""
try:
while self._producer or not self._bytes_queue.empty():
Expand Down Expand Up @@ -134,7 +138,7 @@ def wait(self):
"""Returns a deferred that resolves when finished writing to file"""
return make_deferred_yieldable(self._finished_deferred)

def _resume_paused_producer(self):
def _resume_paused_producer(self) -> None:
"""Gets called if we should resume producing after being paused"""
if self._paused_producer and self._producer:
self._paused_producer = False
Expand Down
Loading