Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make MatrixClient asynchronous #145

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions matrix_client/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
# Copyright 2017 Adam Beckmeyer
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -553,22 +554,16 @@ def _send(self, method, path, content=None, query_params={}, headers={},
if headers["Content-Type"] == "application/json" and content is not None:
content = json.dumps(content)

response = None
while True:
response = requests.request(
method, endpoint,
params=query_params,
data=content,
headers=headers,
verify=self.validate_cert
)

if response.status_code == 429:
sleep(response.json()['retry_after_ms'] / 1000)
else:
break
response = requests.request(
method, endpoint,
params=query_params,
data=content,
headers=headers,
verify=self.validate_cert
)

if response.status_code < 200 or response.status_code >= 300:
# Error raised with status_code == 429 should be handled separately
raise MatrixRequestError(
code=response.status_code, content=response.text
)
Expand Down
146 changes: 122 additions & 24 deletions matrix_client/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
# Copyright 2017 Adam Beckmeyer
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,12 +13,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from gevent import monkey; monkey.patch_all()

from .api import MatrixHttpApi
from .errors import MatrixRequestError, MatrixUnexpectedResponse
from .room import Room
from .user import User
from threading import Thread
from time import sleep
from .queue import RequestQueue
import gevent
import gevent.pool
from gevent.event import AsyncResult
from functools import partial
from uuid import uuid4
import logging
import sys
Expand Down Expand Up @@ -59,8 +65,8 @@ def global_callback(incoming_event):

"""

def __init__(self, base_url, token=None, user_id=None,
valid_cert_check=True, sync_filter_limit=20):
def __init__(self, base_url, token=None, user_id=None, valid_cert_check=True,
sync_filter_limit=20, async=False, num_threads=10):
""" Create a new Matrix Client object.

Args:
Expand All @@ -73,13 +79,18 @@ def __init__(self, base_url, token=None, user_id=None,
the token) if supplying a token; otherwise, ignored.
valid_cert_check (bool): Check the homeservers
certificate on connections?
async (bool): Run the client in async mode; if `True`, methods
return `AsyncResult`s instead of blocking on api calls.
num_threads (int): Number of greenlets with which to make
matrix requests. Only evaluated if `async`.

Returns:
MatrixClient

Raises:
MatrixRequestError, ValueError
"""
# Set properties that may be overwritten if async
if token is not None and user_id is None:
raise ValueError("must supply user_id along with token")

Expand All @@ -96,6 +107,22 @@ def __init__(self, base_url, token=None, user_id=None,
self.sync_thread = None
self.should_listen = False

# Both call methods accept two callbacks. First one is called without
# any arguments. Second is called with output of first callback as an arg
if async:
# _async_call pushses callbacks onto `self.queue` and returns an
# AsyncResult promising the output of the second callback
self._call = self._async_call
self.queue = RequestQueue()
self.thread_pool = gevent.pool.Pool(size=num_threads)
while not self.thread_pool.full():
self.thread_pool.add(gevent.spawn(self.queue.call_forever))
else:
# _sync_call immediately calls both callbacks and blocks until complete
self._call = self._sync_call
self.queue = None
self.thread_pool = None

""" Time to wait before attempting a /sync request after failing."""
self.bad_sync_timeout_limit = 60 * 60
self.rooms = {
Expand All @@ -116,9 +143,14 @@ def set_user_id(self, user_id):

def register_as_guest(self):
""" Register a guest account on this HS.

Note: Registration and login methods are always synchronous.

Note: HS must have guest registration enabled.

Returns:
str: Access Token

Raises:
MatrixRequestError
"""
Expand All @@ -128,6 +160,8 @@ def register_as_guest(self):
def register_with_password(self, username, password):
""" Register for a new account on this HS.

Note: Registration and login methods are always synchronous.

Args:
username (str): Account username
password (str): Account password
Expand Down Expand Up @@ -158,6 +192,8 @@ def _post_registration(self, response):
def login_with_password_no_sync(self, username, password):
""" Login to the homeserver.

Note: Registration and login methods are always synchronous.

Args:
username (str): Account username
password (str): Account password
Expand All @@ -182,6 +218,8 @@ def login_with_password_no_sync(self, username, password):
def login_with_password(self, username, password, limit=10):
""" Login to the homeserver.

Note: Registration and login methods are always synchronous.

Args:
username (str): Account username
password (str): Account password
Expand All @@ -203,6 +241,8 @@ def login_with_password(self, username, password, limit=10):

def logout(self):
""" Logout from the homeserver.

Note: Registration and login methods are synchronous.
"""
self.stop_listener_thread()
self.api.logout()
Expand All @@ -217,12 +257,17 @@ def create_room(self, alias=None, is_public=False, invitees=()):

Returns:
Room
or
AsyncResult(Room)

Raises:
MatrixRequestError
"""
response = self.api.create_room(alias, is_public, invitees)
return self._mkroom(response["room_id"])
out = self._call(
partial(self.api.create_room, alias, is_public, invitees),
self._mkroom
)
return out

def join_room(self, room_id_or_alias):
""" Join a room.
Expand All @@ -232,15 +277,17 @@ def join_room(self, room_id_or_alias):

Returns:
Room
or
AsyncResult(Room)

Raises:
MatrixRequestError
"""
response = self.api.join_room(room_id_or_alias)
room_id = (
response["room_id"] if "room_id" in response else room_id_or_alias
out = self._call(
partial(self.api.join_room, room_id_or_alias),
partial(self._mkroom, room_id_or_alias=room_id_or_alias)
)
return self._mkroom(room_id)
return out

def get_rooms(self):
""" Return a dict of {room_id: Room objects} that the user has joined.
Expand Down Expand Up @@ -360,7 +407,7 @@ def listen_forever(self, timeout_ms=30000, exception_handler=None):
if e.code >= 500:
logger.warning("Problem occured serverside. Waiting %i seconds",
bad_sync_timeout)
sleep(bad_sync_timeout)
gevent.sleep(bad_sync_timeout)
bad_sync_timeout = min(bad_sync_timeout * 2,
self.bad_sync_timeout_limit)
else:
Expand All @@ -375,6 +422,9 @@ def listen_forever(self, timeout_ms=30000, exception_handler=None):
def start_listener_thread(self, timeout_ms=30000, exception_handler=None):
""" Start a listener thread to listen for events in the background.

Note that as of right now this thread is responsible for calling
listener callbacks as well as for syncing with the homeserver.

Args:
timeout (int): How long to poll the Home Server for before
retrying.
Expand All @@ -383,12 +433,10 @@ def start_listener_thread(self, timeout_ms=30000, exception_handler=None):
thread.
"""
try:
thread = Thread(target=self.listen_forever,
args=(timeout_ms, exception_handler))
thread.daemon = True
thread = gevent.spawn(self.listen_forever,
timeout_ms, exception_handler)
self.sync_thread = thread
self.should_listen = True
thread.start()
except:
e = sys.exc_info()[0]
logger.error("Error: unable to start thread. %s", str(e))
Expand All @@ -412,23 +460,42 @@ def upload(self, content, content_type):
MatrixUnexpectedResponse: If the homeserver gave a strange response
MatrixRequestError: If the upload failed for some reason.
"""
try:
response = self.api.media_upload(content, content_type)
def _media_upload(self, content, content_type):
"""Wraps `self.api.media_upload` to allow error handling."""
try:
return self.api.media_upload(content, content_type)
except MatrixRequestError as e:
raise MatrixRequestError(
code=e.code,
content="Upload failed: %s" % e
)

def _upload(self, response):
"""Helper function to be used as callback by `self.upload`"""
if "content_uri" in response:
return response["content_uri"]
else:
raise MatrixUnexpectedResponse(
"The upload was successful, but content_uri wasn't found."
)

try:
# If not async, exceptions can be handled and logged
return self._call(
partial(_media_upload, content, content_type),
_upload
)
except MatrixRequestError as e:
raise MatrixRequestError(
code=e.code,
content="Upload failed: %s" % e
)

def _mkroom(self, room_id):
self.rooms[room_id] = Room(self, room_id)
return self.rooms[room_id]
def _mkroom(self, room_id_or_alias=None, response=None):
if response and "room_id" in response:
room_id_or_alias = response["room_id"]
self.rooms[room_id_or_alias] = Room(self, room_id_or_alias)
return self.rooms[room_id_or_alias]

def _process_state_event(self, state_event, current_room):
if "type" not in state_event:
Expand All @@ -447,11 +514,12 @@ def _process_state_event(self, state_event, current_room):
listener['event_type'] is None or
listener['event_type'] == state_event['type']
):
listener['callback'](state_event)
gevent.spawn(listener['callback'], state_event)

def _sync(self, timeout_ms=30000):
# TODO: Deal with presence
# TODO: Deal with left rooms
# TODO: Use gevent pool with queue to call listeners
response = self.api.sync(self.sync_token, timeout_ms, filter=self.sync_filter)
self.sync_token = response["next_batch"]

Expand All @@ -467,7 +535,7 @@ def _sync(self, timeout_ms=30000):

for room_id, sync_room in response['rooms']['join'].items():
if room_id not in self.rooms:
self._mkroom(room_id)
self._mkroom(room_id_or_alias=room_id)
room = self.rooms[room_id]
room.prev_batch = sync_room["timeline"]["prev_batch"]

Expand Down Expand Up @@ -507,8 +575,7 @@ def get_user(self, user_id):
Args:
user_id (str): The matrix user id of a user.
"""

return User(self.api, user_id)
return User(self.api, user_id, self._call)

def remove_room_alias(self, room_alias):
"""Remove mapping of an alias
Expand All @@ -524,3 +591,34 @@ def remove_room_alias(self, room_alias):
return True
except MatrixRequestError:
return False

def _async_call(self, first_callback, final_callback):
"""Call `final_callback` on result of `first_callback` asynchronously

Args:
first_callback(callable): Callable with 0 args to be called first
final_callback(callable): Callable with 1 arg whose result will be
returned. Called with output from first_callback.

Returns:
AsyncResult: Promise for the result of final_callback.
"""
first_result = AsyncResult()
self.queue.matrix_put((first_callback, first_result))
final_result = AsyncResult()
# lambda function will wait for first_result to be fulfilled
self.queue.matrix_put((lambda: final_callback(first_result.get()), final_result))
return final_result

def _sync_call(self, first_callback, final_callback):
"""Call `final_callback` on result of `first_callback` synchronously

Args:
first_callback(callable): Callable with 0 args to be called first
final_callback(callable): Callable with 1 arg whose result will be
returned. Called with output from first_callback.

Returns:
Object: Result of final_callback
"""
return final_callback(first_callback())
Loading