From 65d3160ef9510d8c2deb8817219c2286ccadfe9a Mon Sep 17 00:00:00 2001 From: Adam Beckmeyer Date: Mon, 4 Sep 2017 21:50:16 -0400 Subject: [PATCH 1/3] WIP --- matrix_client/api.py | 23 +-- matrix_client/client.py | 136 ++++++++++++---- matrix_client/queue.py | 70 ++++++++ matrix_client/room.py | 352 +++++++++++++++++++++++++++------------- matrix_client/user.py | 57 ++++++- 5 files changed, 472 insertions(+), 166 deletions(-) create mode 100644 matrix_client/queue.py diff --git a/matrix_client/api.py b/matrix_client/api.py index c5f53df6..a9b46493 100644 --- a/matrix_client/api.py +++ b/matrix_client/api.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015 OpenMarket Ltd +# Copyright 2017 Adam Beckmeyer # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -553,22 +554,16 @@ def _send(self, method, path, content=None, query_params={}, headers={}, if headers["Content-Type"] == "application/json" and content is not None: content = json.dumps(content) - response = None - while True: - response = requests.request( - method, endpoint, - params=query_params, - data=content, - headers=headers, - verify=self.validate_cert - ) - - if response.status_code == 429: - sleep(response.json()['retry_after_ms'] / 1000) - else: - break + response = requests.request( + method, endpoint, + params=query_params, + data=content, + headers=headers, + verify=self.validate_cert + ) if response.status_code < 200 or response.status_code >= 300: + # Error raised with status_code == 429 should be handled separately raise MatrixRequestError( code=response.status_code, content=response.text ) diff --git a/matrix_client/client.py b/matrix_client/client.py index 9e7e6c05..c84d9175 100644 --- a/matrix_client/client.py +++ b/matrix_client/client.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015 OpenMarket Ltd +# Copyright 2017 Adam Beckmeyer # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,12 +13,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from gevent import monkey; monkey.patch_all() + from .api import MatrixHttpApi from .errors import MatrixRequestError, MatrixUnexpectedResponse from .room import Room from .user import User -from threading import Thread -from time import sleep +from .queue import RequestQueue +import gevent +import gevent.pool +from gevent.event import AsyncResult +from functools import partial from uuid import uuid4 import logging import sys @@ -59,8 +65,8 @@ def global_callback(incoming_event): """ - def __init__(self, base_url, token=None, user_id=None, - valid_cert_check=True, sync_filter_limit=20): + def __init__(self, base_url, token=None, user_id=None, valid_cert_check=True, + sync_filter_limit=20, async=False, num_threads=10): """ Create a new Matrix Client object. Args: @@ -73,6 +79,10 @@ def __init__(self, base_url, token=None, user_id=None, the token) if supplying a token; otherwise, ignored. valid_cert_check (bool): Check the homeservers certificate on connections? + async (bool): Run the client in async mode; if `True`, methods + return `AsyncResult`s instead of blocking on api calls. + num_threads (int): Number of greenlets with which to make + matrix requests. Only evaluated if `async`. Returns: MatrixClient @@ -80,6 +90,7 @@ def __init__(self, base_url, token=None, user_id=None, Raises: MatrixRequestError, ValueError """ + # Set properties that may be overwritten if async if token is not None and user_id is None: raise ValueError("must supply user_id along with token") @@ -96,6 +107,22 @@ def __init__(self, base_url, token=None, user_id=None, self.sync_thread = None self.should_listen = False + # Both call methods accept two callbacks. First one is called without + # any arguments. Second is called with output of first callback as an arg + if async: + # _async_call pushses callbacks onto `self.queue` and returns an + # AsyncResult promising the output of the second callback + self._call = self._async_call + self.queue = RequestQueue() + self.thread_pool = gevent.pool.Pool(size=num_threads) + while not self.thread_pool.full(): + self.thread_pool.add(gevent.spawn(self.queue.call_forever)) + else: + # _sync_call immediately calls both callbacks and blocks until complete + self._call = self._sync_call + self.queue = None + self.thread_pool = None + """ Time to wait before attempting a /sync request after failing.""" self.bad_sync_timeout_limit = 60 * 60 self.rooms = { @@ -116,9 +143,14 @@ def set_user_id(self, user_id): def register_as_guest(self): """ Register a guest account on this HS. + + Note: Registration and login methods are always synchronous. + Note: HS must have guest registration enabled. + Returns: str: Access Token + Raises: MatrixRequestError """ @@ -128,6 +160,8 @@ def register_as_guest(self): def register_with_password(self, username, password): """ Register for a new account on this HS. + Note: Registration and login methods are always synchronous. + Args: username (str): Account username password (str): Account password @@ -158,6 +192,8 @@ def _post_registration(self, response): def login_with_password_no_sync(self, username, password): """ Login to the homeserver. + Note: Registration and login methods are always synchronous. + Args: username (str): Account username password (str): Account password @@ -182,6 +218,8 @@ def login_with_password_no_sync(self, username, password): def login_with_password(self, username, password, limit=10): """ Login to the homeserver. + Note: Registration and login methods are always synchronous. + Args: username (str): Account username password (str): Account password @@ -203,6 +241,8 @@ def login_with_password(self, username, password, limit=10): def logout(self): """ Logout from the homeserver. + + Note: Registration and login methods are synchronous. """ self.stop_listener_thread() self.api.logout() @@ -217,12 +257,17 @@ def create_room(self, alias=None, is_public=False, invitees=()): Returns: Room + or + AsyncResult(Room) Raises: MatrixRequestError """ - response = self.api.create_room(alias, is_public, invitees) - return self._mkroom(response["room_id"]) + out = self._call( + partial(self.api.create_room, alias, is_public, invitees), + self._mkroom + ) + return out def join_room(self, room_id_or_alias): """ Join a room. @@ -232,15 +277,17 @@ def join_room(self, room_id_or_alias): Returns: Room + or + AsyncResult(Room) Raises: MatrixRequestError """ - response = self.api.join_room(room_id_or_alias) - room_id = ( - response["room_id"] if "room_id" in response else room_id_or_alias + out = self._call( + partial(self.api.join_room, room_id_or_alias), + partial(self._mkroom, room_id_or_alias=room_id_or_alias) ) - return self._mkroom(room_id) + return out def get_rooms(self): """ Return a dict of {room_id: Room objects} that the user has joined. @@ -360,7 +407,7 @@ def listen_forever(self, timeout_ms=30000, exception_handler=None): if e.code >= 500: logger.warning("Problem occured serverside. Waiting %i seconds", bad_sync_timeout) - sleep(bad_sync_timeout) + gevent.sleep(bad_sync_timeout) bad_sync_timeout = min(bad_sync_timeout * 2, self.bad_sync_timeout_limit) else: @@ -375,6 +422,9 @@ def listen_forever(self, timeout_ms=30000, exception_handler=None): def start_listener_thread(self, timeout_ms=30000, exception_handler=None): """ Start a listener thread to listen for events in the background. + Note that as of right now this thread is responsible for calling + listener callbacks as well as for syncing with the homeserver. + Args: timeout (int): How long to poll the Home Server for before retrying. @@ -383,12 +433,10 @@ def start_listener_thread(self, timeout_ms=30000, exception_handler=None): thread. """ try: - thread = Thread(target=self.listen_forever, - args=(timeout_ms, exception_handler)) - thread.daemon = True + thread = gevent.spawn(self.listen_forever, + timeout_ms, exception_handler) self.sync_thread = thread self.should_listen = True - thread.start() except: e = sys.exc_info()[0] logger.error("Error: unable to start thread. %s", str(e)) @@ -413,21 +461,40 @@ def upload(self, content, content_type): MatrixRequestError: If the upload failed for some reason. """ try: - response = self.api.media_upload(content, content_type) - if "content_uri" in response: - return response["content_uri"] - else: - raise MatrixUnexpectedResponse( - "The upload was successful, but content_uri wasn't found." - ) + # If not async, exceptions can be handled and logged + return self._call( + partial(self._media_upload, content, content_type), + self._upload + ) + except MatrixRequestError as e: + raise MatrixRequestError( + code=e.code, + content="Upload failed: %s" % e + ) + + def _media_upload(self, content, content_type): + """Wraps `self.api.media_upload` to allow error handling.""" + try: + return self.api.media_upload(content, content_type) except MatrixRequestError as e: raise MatrixRequestError( code=e.code, content="Upload failed: %s" % e ) - def _mkroom(self, room_id): - self.rooms[room_id] = Room(self, room_id) + def _upload(self, response): + """Helper function to be used as callback by `self.upload`""" + if "content_uri" in response: + return response["content_uri"] + else: + raise MatrixUnexpectedResponse( + "The upload was successful, but content_uri wasn't found." + ) + + def _mkroom(self, response=None, room_id_or_alias=None): + if response and "room_id" in response: + room_id_or_alias = response["room_id"] + self.rooms[room_id_or_alias] = Room(self, room_id) return self.rooms[room_id] def _process_state_event(self, state_event, current_room): @@ -447,11 +514,12 @@ def _process_state_event(self, state_event, current_room): listener['event_type'] is None or listener['event_type'] == state_event['type'] ): - listener['callback'](state_event) + gevent.spawn(listener['callback'], state_event) def _sync(self, timeout_ms=30000): # TODO: Deal with presence # TODO: Deal with left rooms + # TODO: Use gevent pool with queue to call listeners response = self.api.sync(self.sync_token, timeout_ms, filter=self.sync_filter) self.sync_token = response["next_batch"] @@ -467,7 +535,7 @@ def _sync(self, timeout_ms=30000): for room_id, sync_room in response['rooms']['join'].items(): if room_id not in self.rooms: - self._mkroom(room_id) + self._mkroom(room_id_or_alias=room_id) room = self.rooms[room_id] room.prev_batch = sync_room["timeline"]["prev_batch"] @@ -507,8 +575,7 @@ def get_user(self, user_id): Args: user_id (str): The matrix user id of a user. """ - - return User(self.api, user_id) + return User(self.api, user_id, self._call) def remove_room_alias(self, room_alias): """Remove mapping of an alias @@ -524,3 +591,16 @@ def remove_room_alias(self, room_alias): return True except MatrixRequestError: return False + + def _async_call(self, first_callback, final_callback): + """Call `final_callback` on result of `first_callback` asynchronously""" + first_result = AsyncResult() + self.queue.put((first_callback, first_result)) + final_result = AsyncResult() + # lambda function will wait for first_result to be fulfilled + self.queue.put(lambda: final_callback(first_result.get()), final_result) + return final_result + + def _sync_call(self, first_callback, final_callback): + """Call `final_callback` on result of `first_callback` synchronously""" + return final_callback(first_callback()) diff --git a/matrix_client/queue.py b/matrix_client/queue.py new file mode 100644 index 00000000..3206eecf --- /dev/null +++ b/matrix_client/queue.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Adam Beckmeyer +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gevent.queue +import time +import json +from .errors import MatrixRequestError + + +class RequestQueue(gevent.queue.Queue): + """Queue for callbacks calling the Matrix api. + + `RequestQueue` is a FIFO queue to be consumed by gevent threads calling the + `call` method. All objects put on the queue should be a tuple of len 2. + First object should be a callable, and second object should be a + gevent.event.AsyncResult. + + Other than the `call` and `call_forever` methods, it shares its api with + `gevent.queue.Queue`. + + Usage: + matrix = MatrixHttpApi("https://matrix.org", token="foobar") + a = AsyncResult() + queue = ResponseQueue() + queue.put(matrix.sync, a) + queue.call() + print(a.get()) + """ + + def call(self): + """Calls two callback tuple returned by self.get(). + + If instructed by server, will retry callback. Exponential backoff has + not yet been implemented. + """ + # If queue empty, thread blocks here + callback, future = self.get() + retry = True + while retry: + try: + output = callback() + future.set(output) + except Exception as e: + # Only handle exceptions if MatrixRequestError and status_code == 429 + if (type(e) == MatrixRequestError) and (e.code == 429): + time.sleep(json.loads(e.content)["retry_after_ms"] / 1000) + else: + # TODO: log exceptions + # TODO: allow specification of exception handler + future.set_exception(e) + else: + # If api_call doesn't raise exception, we don't need to retry + retry = False + + def call_forever(self): + """Calls self.call forever.""" + while True: + self.call() diff --git a/matrix_client/room.py b/matrix_client/room.py index b60090a6..3ec8b94c 100644 --- a/matrix_client/room.py +++ b/matrix_client/room.py @@ -1,3 +1,19 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# Copyright 2017 Adam Beckmeyer +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import re from uuid import uuid4 @@ -5,12 +21,12 @@ class Room(object): - """ The Room class can be used to call room specific functions + """The Room class can be used to call room specific functions after joining a room from the Client. """ def __init__(self, client, room_id): - """ Create a blank Room object. + """Create a blank Room object. NOTE: This should ideally be called from within the Client. NOTE: This does not verify the room with the Home Server. @@ -23,6 +39,7 @@ def __init__(self, client, room_id): self.room_id = room_id self.client = client + self._call = client._call self.listeners = [] self.state_listeners = [] self.ephemeral_listeners = [] @@ -37,7 +54,14 @@ def set_user_profile(self, displayname=None, avatar_url=None, reason="Changing room profile information"): - member = self.client.api.get_membership(self.room_id, self.client.user_id) + # TODO: docstring + self._call( + partial(self.client.api.get_membership, self.room_id, self.client.user_id), + partial(self._set_membership, displayname, avatar_url, reason) + ) + + def _set_membership(self, displayname, avatar_url, reason, member): + """Wraps `api.set_membership` for use with `set_user_profile`""" if member["membership"] != "join": raise Exception("Can't set profile if you have not joined the room.") if displayname is None: @@ -55,14 +79,18 @@ def set_user_profile(self, ) def send_text(self, text): - """ Send a plain text message to the room. + """Send a plain text message to the room. Args: text (str): The message to send """ - return self.client.api.send_message(self.room_id, text) + return self._call( + partial(self.client.api.send_message, self.room_id, text), + lambda x: x + ) def get_html_content(self, html, body=None, msgtype="m.text"): + # TODO: docstring return { "body": body if body else re.sub('<[^<]+?>', '', html), "msgtype": msgtype, @@ -77,37 +105,59 @@ def send_html(self, html, body=None, msgtype="m.text"): html (str): The html formatted message to be sent. body (str): The body of the message to be sent (unformatted). """ - return self.client.api.send_message_event( - self.room_id, "m.room.message", self.get_html_content(html, body, msgtype)) + return self._call( + partial(self.client.api.send_message_event, + self.room_id, "m.room.message", + self.get_html_content(html, body, msgtype)), + lambda x: x + ) def set_account_data(self, type, account_data): - return self.client.api.set_room_account_data( - self.client.user_id, self.room_id, type, account_data) + # TODO: docstring + return self._call( + # TODO: api.set_room_account_data doesn't exist? + partial(self.client.api.set_room_account_data, + self.client.user_id, self.room_id, type, account_data), + lambda x: x + ) def get_tags(self): - return self.client.api.get_user_tags(self.client.user_id, self.room_id) + # TODO: docstring + return self._call( + partial(self.client.api.get_user_tags, + self.client.user_id, self.room_id), + lambda x: x + ) def remove_tag(self, tag): - return self.client.api.remove_user_tag( - self.client.user_id, self.room_id, tag + # TODO: docstring + return self._call( + partial(self.client.api.remove_user_tag, + self.client.user_id, self.room_id, tag), + lambda x: x ) def add_tag(self, tag, order=None, content=None): - return self.client.api.add_user_tag( - self.client.user_id, self.room_id, - tag, order, content + # TODO: docstring + return self._call( + partial(self.client.api.add_user_tag, + self.client.user_id, self.room_id, tag, order, content), + lambda x: x ) def send_emote(self, text): - """ Send a emote (/me style) message to the room. + """Send a emote (/me style) message to the room. Args: text (str): The message to send """ - return self.client.api.send_emote(self.room_id, text) + return self._call( + partial(self.client.api.send_emote, self.room_id, text), + lambda x: x + ) def send_file(self, url, name, **fileinfo): - """ Send a pre-uploaded file to the room. + """Send a pre-uploaded file to the room. See http://matrix.org/docs/spec/r0.2.0/client_server.html#m-file for fileinfo @@ -116,19 +166,23 @@ def send_file(self, url, name, **fileinfo): name (str): The filename of the image. fileinfo (): Extra information about the file """ - - return self.client.api.send_content( - self.room_id, url, name, "m.file", - extra_information=fileinfo + return self._call( + partial(self.client.api.send_content, + self.room_id, url, name, "m.file", extra_information=fileinfo), + lambda x: x ) def send_notice(self, text): - return self.client.api.send_notice(self.room_id, text) + # TODO: docstring + return self._call( + partial(self.client.api.send_notice, self.room_id, text), + lambda x: x + ) # See http://matrix.org/docs/spec/r0.0.1/client_server.html#m-image for the # imageinfo args. def send_image(self, url, name, **imageinfo): - """ Send a pre-uploaded image to the room. + """Send a pre-uploaded image to the room. See http://matrix.org/docs/spec/r0.0.1/client_server.html#m-image for imageinfo @@ -137,9 +191,11 @@ def send_image(self, url, name, **imageinfo): name (str): The filename of the image. imageinfo (): Extra information about the image. """ - return self.client.api.send_content( - self.room_id, url, name, "m.image", - extra_information=imageinfo + return self._call( + partial(self.client.api.send_content, + self.room_id, url, name, "m.image", + extra_information=imageinfo), + lambda x: x ) def send_location(self, geo_uri, name, thumb_url=None, **thumb_info): @@ -153,13 +209,17 @@ def send_location(self, geo_uri, name, thumb_url=None, **thumb_info): thumb_url (str): URL to the thumbnail of the location. thumb_info (): Metadata about the thumbnail, type ImageInfo. """ - return self.client.api.send_location(self.room_id, geo_uri, name, - thumb_url, thumb_info) + return self._call( + partial(self.client.api.send_location, + self.room_id, geo_uri, name, thumb_url, thumb_info), + lambda x: x + ) # See http://matrix.org/docs/spec/client_server/r0.2.0.html#m-video for the # videoinfo args. def send_video(self, url, name, **videoinfo): - """ Send a pre-uploaded video to the room. + """Send a pre-uploaded video to the room. + See http://matrix.org/docs/spec/client_server/r0.2.0.html#m-video for videoinfo @@ -168,13 +228,17 @@ def send_video(self, url, name, **videoinfo): name (str): The filename of the video. videoinfo (): Extra information about the video. """ - return self.client.api.send_content(self.room_id, url, name, "m.video", - extra_information=videoinfo) + return self._call( + partial(self.client.api.send_content, + self.room_id, url, name, "m.video", extra_information=videoinfo), + lambda x: x + ) # See http://matrix.org/docs/spec/client_server/r0.2.0.html#m-audio for the # audioinfo args. def send_audio(self, url, name, **audioinfo): - """ Send a pre-uploaded audio to the room. + """Send a pre-uploaded audio to the room. + See http://matrix.org/docs/spec/client_server/r0.2.0.html#m-audio for audioinfo @@ -183,11 +247,14 @@ def send_audio(self, url, name, **audioinfo): name (str): The filename of the audio. audioinfo (): Extra information about the audio. """ - return self.client.api.send_content(self.room_id, url, name, "m.audio", - extra_information=audioinfo) + return self._call( + partial(self.client.api.send_content, + self.room_id, url, name, "m.audio", extra_information=audioinfo), + lambda x: x + ) def add_listener(self, callback, event_type=None): - """ Add a callback handler for events going to this room. + """Add a callback handler for events going to this room. Args: callback (func(room, event)): Callback called when an event arrives. @@ -273,27 +340,37 @@ def _put_ephemeral_event(self, event): listener['callback'](self, event) def get_events(self): - """ Get the most recent events for this room. + """Get the most recent events for this room. Returns: events """ return self.events + def _handle_api_errors(self, api_callback): + """Returns `True` if api_callback doesn't raise `MatrixRequestError`""" + try: + api_callback + return True + except MatrixRequestError: + return False + def invite_user(self, user_id): - """ Invite a user to this room + """Invite a user to this room Args: user_id (str): The matrix user id of a user. Returns: boolean: The invitation was sent. + or + AsyncResult(bool) """ - try: - self.client.api.invite_user(self.room_id, user_id) - return True - except MatrixRequestError: - return False + return self._call( + partial(self._handle_api_errors, + partial(self.client.api.invite_user, self.room_id, user_id)), + lambda x: x + ) def kick_user(self, user_id, reason=""): """ Kick a user from this room @@ -303,12 +380,15 @@ def kick_user(self, user_id, reason=""): Returns: boolean: The user was kicked. + or + AsyncResult(bool) """ - try: - self.client.api.kick_user(self.room_id, user_id) - return True - except MatrixRequestError: - return False + return self._call( + partial(self._handle_api_errors, + partial(self.client.api.kick_user, self.room_id, user_id, + reason=reason)), + lambda x: x + ) def ban_user(self, user_id, reason): """ Ban a user from this room @@ -319,12 +399,15 @@ def ban_user(self, user_id, reason): Returns: boolean: The user was banned. + or + AsyncResult(bool) """ - try: - self.client.api.ban_user(self.room_id, user_id, reason) - return True - except MatrixRequestError: - return False + return self._call( + partial(self._handle_api_errors, + partial(self.client.api.ban_user, self.room_id, user_id, + reason=reason)), + lambda x: x + ) def unban_user(self, user_id): """Unban a user from this room @@ -334,19 +417,26 @@ def unban_user(self, user_id): Returns: boolean: The user was unbanned. + or + AsyncResult(bool) """ - try: - self.client.api.unban_user(self.room_id, user_id) - return True - except MatrixRequestError: - return False + return self._call( + partial(self._handle_api_errors, + partial(self.client.api.unban_user, self.room_id, user_id)), + lambda x: x + ) def leave(self): """ Leave the room. Returns: boolean: Leaving the room was successful. + or + AsyncResult(bool) """ + return self._call(self._leave_handler, lambda x: x) + + def _leave_helper(self): try: self.client.api.leave_room(self.room_id) del self.client.rooms[self.room_id] @@ -359,16 +449,21 @@ def update_room_name(self): Returns: boolean: True if the room name changed, False if not + or + AsyncResult(bool) """ - try: - response = self.client.api.get_room_name(self.room_id) + def _helper(response): if "name" in response and response["name"] != self.name: self.name = response["name"] return True else: return False - except MatrixRequestError: - return False + + return self._call( + partial(self._handle_api_errors, + partial(self.client.api.get_room_name, self.room_id)), + _helper + ) def set_room_name(self, name): """ Set room name @@ -376,13 +471,16 @@ def set_room_name(self, name): Returns: boolean: True if the name changed, False if not + or + AsyncResult(bool) """ - try: - self.client.api.set_room_name(self.room_id, name) + def _set_name(name): self.name = name - return True - except MatrixRequestError: - return False + return self._call( + partial(self._handle_api_errors, + partial(self.client.api.set_room_name, self.room_id, name)), + lambda _: _set_name(name) + ) def send_state_event(self, event_type, content, state_key): """ Send a state event to the room. @@ -392,11 +490,13 @@ def send_state_event(self, event_type, content, state_key): content (): An object with the content of the message. state_key (str, optional): A unique key to identify the state. """ - return self.client.api.send_state_event( - self.room_id, - event_type, - content, - state_key + return self._call( + partial(self.client.api.send_state_event, + self.room_id, + event_type, + content, + state_key), + lambda x: x ) def update_room_topic(self): @@ -404,16 +504,21 @@ def update_room_topic(self): Returns: boolean: True if the topic changed, False if not + or + AsyncResult(bool) """ - try: - response = self.client.api.get_room_topic(self.room_id) - if "topic" in response and response["topic"] != self.topic: - self.topic = response["topic"] - return True - else: + def _helper(): + try: + response = self.client.api.get_room_topic(self.room_id) + if "topic" in response and response["topic"] != self.topic: + self.topic = response["topic"] + return True + else: + return False + except MatrixRequestError: return False - except MatrixRequestError: - return False + + return self._call(_helper, lambda x: x) def set_room_topic(self, topic): """ Set room topic @@ -421,31 +526,39 @@ def set_room_topic(self, topic): Returns: boolean: True if the topic changed, False if not + or + AsyncResult(bool) """ - try: - self.client.api.set_room_topic(self.room_id, topic) + def _set_room_topic(topic): self.topic = topic - return True - except MatrixRequestError: - return False + return self._call( + partial(self._handle_api_errors, + partial(self.client.api.set_room_topic, self.room_id, topic)), + lambda _: set_room_topic(topic) + ) def update_aliases(self): """ Get aliases information from room state Returns: boolean: True if the aliases changed, False if not + or + AsyncResult(bool) """ - try: - response = self.client.api.get_room_state(self.room_id) - for chunk in response: - if "content" in chunk and "aliases" in chunk["content"]: - if chunk["content"]["aliases"] != self.aliases: - self.aliases = chunk["content"]["aliases"] - return True - else: - return False - except MatrixRequestError: - return False + def _helper(): + try: + response = self.client.api.get_room_state(self.room_id) + for chunk in response: + if "content" in chunk and "aliases" in chunk["content"]: + if chunk["content"]["aliases"] != self.aliases: + self.aliases = chunk["content"]["aliases"] + return True + else: + return False + except MatrixRequestError: + return False + + return self._call(_helper, lambda x: x) def add_room_alias(self, room_alias): """Add an alias to the room @@ -455,27 +568,32 @@ def add_room_alias(self, room_alias): Returns: bool: True if the alias was added, False otherwise. + or + AsyncResult(bool) """ - try: - self.client.api.set_room_alias(self.room_id, room_alias) - return True - except MatrixRequestError: - return False + return self._call( + partial(self._handle_api_errors, + partial(self.client.api.set_room_alias, self.room_id, room_alias)), + lambda x: x + ) def get_joined_members(self): """Query joined members of this room. Returns: {user_id: {"displayname": str or None}}: Dictionary of joined members. + or + AsyncResult(dict): Same structure as above. """ - response = self.client.api.get_room_members(self.room_id) - rtn = { - event["state_key"]: { - "displayname": event["content"].get("displayname"), - } for event in response["chunk"] if event["content"]["membership"] == "join" - } + def _helper(response): + return { + event["state_key"]: { + "displayname": event["content"].get("displayname"), + } for event in response["chunk"] if event["content"]["membership"] == "join" + } - return rtn + return self._call(partial(self.client.api.get_room_members, self.room_id), + _helper) def backfill_previous_messages(self, reverse=False, limit=10): """Backfill handling of previous messages. @@ -485,13 +603,17 @@ def backfill_previous_messages(self, reverse=False, limit=10): order (old to new), otherwise the order will be reversed (new to old). limit (int): Number of messages to go back. """ - res = self.client.api.get_room_messages(self.room_id, self.prev_batch, - direction="b", limit=limit) - events = res["chunk"] - if not reverse: - events = reversed(events) - for event in events: - self._put_event(event) + def _helper(res): + events = res["chunk"] + if not reverse: + events = reversed(events) + for event in events: + self._put_event(event) + + self._call(partial(self.client.api.get_room_messages, + self.room_id, self.prev_batch, direction="b", limit=limit), + _helper + ) @property def prev_batch(self): diff --git a/matrix_client/user.py b/matrix_client/user.py index 14e82935..9834cc8b 100644 --- a/matrix_client/user.py +++ b/matrix_client/user.py @@ -1,7 +1,25 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# Copyright 2017 Adam Beckmeyer +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from functools import partial + class User(object): """ The User class can be used to call user specific functions. """ - def __init__(self, api, user_id): + def __init__(self, api, user_id, caller): if not user_id.startswith("@"): raise ValueError("UserIDs start with @") @@ -10,6 +28,8 @@ def __init__(self, api, user_id): self.user_id = user_id self.api = api + # Caller may be synchronous or async depending on MatrixClient creating User + self._call = caller def get_display_name(self): """ Get this users display name. @@ -17,12 +37,23 @@ def get_display_name(self): Returns: str: Display Name + or + AsyncResult(str) """ - return self.api.get_display_name(self.user_id) + # TODO: shouldn't this method cache the user's display name? + return self._call( + partial(self.api.get_display_name, self.user_id), + # `api.get_display_name` already processes json for some reason + lambda x: x + ) def get_friendly_name(self): - display_name = self.api.get_display_name(self.user_id) - return display_name if display_name is not None else self.user_id + # TODO: docstring + return self._call( + partial(self.api.get_display_name, self.user_id), + # user_id is best identifier lacking display_name + lambda d: d if d is not None else self.user_id + ) def set_display_name(self, display_name): """ Set this users display name. @@ -30,12 +61,17 @@ def set_display_name(self, display_name): Args: display_name (str): Display Name """ - return self.api.set_display_name(self.user_id, display_name) + self._call( + partial(self.api.set_display_name, self.user_id, display_name), + lambda x: None + ) def get_avatar_url(self): - mxcurl = self.api.get_avatar_url(self.user_id) - url = self.api.get_download_url(mxcurl) - return url + # TODO: docstring + return self._call( + partial(self.api.get_avatar_url, self.user_id), + self.api.get_download_url + ) def set_avatar_url(self, avatar_url): """ Set this users avatar. @@ -43,4 +79,7 @@ def set_avatar_url(self, avatar_url): Args: avatar_url (str): mxc url from previously uploaded """ - return self.api.set_avatar_url(self.user_id, avatar_url) + self._call( + partial(self.api.set_avatar_url, self.user_id, avatar_url), + lambda x: None + ) From 45484433a5c6dac493a4e9b316ff3a172d5e7d8c Mon Sep 17 00:00:00 2001 From: Adam Beckmeyer Date: Sun, 17 Sep 2017 19:35:54 -0400 Subject: [PATCH 2/3] Fix various issues with initial PR Reorg of these changes into a set of commits that actually makes sense is still coming; don't worry. --- matrix_client/client.py | 32 +++++++++++++++++++++++++------- matrix_client/queue.py | 10 ++++++++++ matrix_client/room.py | 3 ++- setup.py | 3 ++- 4 files changed, 39 insertions(+), 9 deletions(-) diff --git a/matrix_client/client.py b/matrix_client/client.py index c84d9175..baeeacc1 100644 --- a/matrix_client/client.py +++ b/matrix_client/client.py @@ -491,11 +491,11 @@ def _upload(self, response): "The upload was successful, but content_uri wasn't found." ) - def _mkroom(self, response=None, room_id_or_alias=None): + def _mkroom(self, room_id_or_alias=None, response=None): if response and "room_id" in response: room_id_or_alias = response["room_id"] - self.rooms[room_id_or_alias] = Room(self, room_id) - return self.rooms[room_id] + self.rooms[room_id_or_alias] = Room(self, room_id_or_alias) + return self.rooms[room_id_or_alias] def _process_state_event(self, state_event, current_room): if "type" not in state_event: @@ -593,14 +593,32 @@ def remove_room_alias(self, room_alias): return False def _async_call(self, first_callback, final_callback): - """Call `final_callback` on result of `first_callback` asynchronously""" + """Call `final_callback` on result of `first_callback` asynchronously + + Args: + first_callback(callable): Callable with 0 args to be called first + final_callback(callable): Callable with 1 arg whose result will be + returned. Called with output from first_callback. + + Returns: + AsyncResult: Promise for the result of final_callback. + """ first_result = AsyncResult() - self.queue.put((first_callback, first_result)) + self.queue.matrix_put((first_callback, first_result)) final_result = AsyncResult() # lambda function will wait for first_result to be fulfilled - self.queue.put(lambda: final_callback(first_result.get()), final_result) + self.queue.matrix_put((lambda: final_callback(first_result.get()), final_result)) return final_result def _sync_call(self, first_callback, final_callback): - """Call `final_callback` on result of `first_callback` synchronously""" + """Call `final_callback` on result of `first_callback` synchronously + + Args: + first_callback(callable): Callable with 0 args to be called first + final_callback(callable): Callable with 1 arg whose result will be + returned. Called with output from first_callback. + + Returns: + Object: Result of final_callback + """ return final_callback(first_callback()) diff --git a/matrix_client/queue.py b/matrix_client/queue.py index 3206eecf..d0f5c17e 100644 --- a/matrix_client/queue.py +++ b/matrix_client/queue.py @@ -14,6 +14,7 @@ # limitations under the License. import gevent.queue +from gevent.event import AsyncResult import time import json from .errors import MatrixRequestError @@ -68,3 +69,12 @@ def call_forever(self): """Calls self.call forever.""" while True: self.call() + + def matrix_put(self, item, *args, **kwargs): + """Calls `self.put` after validating type of item.""" + if (type(item) == tuple and len(item) == 2 and + callable(item[0]) and type(item[1]) == AsyncResult): + + return self.put(item, *args, **kwargs) + else: + raise TypeError("Received %s when expecting (callable, AsyncResult)" % str(item)) diff --git a/matrix_client/room.py b/matrix_client/room.py index 3ec8b94c..d0a41af4 100644 --- a/matrix_client/room.py +++ b/matrix_client/room.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from functools import partial import re from uuid import uuid4 @@ -534,7 +535,7 @@ def _set_room_topic(topic): return self._call( partial(self._handle_api_errors, partial(self.client.api.set_room_topic, self.room_id, topic)), - lambda _: set_room_topic(topic) + lambda _: _set_room_topic(topic) ) def update_aliases(self): diff --git a/setup.py b/setup.py index a3bea171..a69d632b 100644 --- a/setup.py +++ b/setup.py @@ -44,7 +44,8 @@ def exec_file(names): ], keywords='chat sdk matrix matrix.org', install_requires=[ - 'requests' + 'requests', + 'gevent', ], extras_require={ 'test': ['tox', 'pytest', 'flake8', 'responses'], From 57e11d34b6d80950cbd4b2e85cd4d7e7cf982786 Mon Sep 17 00:00:00 2001 From: Adam Beckmeyer Date: Sun, 15 Oct 2017 19:04:27 -0400 Subject: [PATCH 3/3] Reformat some stuff --- matrix_client/client.py | 42 ++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/matrix_client/client.py b/matrix_client/client.py index baeeacc1..b54ce7df 100644 --- a/matrix_client/client.py +++ b/matrix_client/client.py @@ -460,11 +460,30 @@ def upload(self, content, content_type): MatrixUnexpectedResponse: If the homeserver gave a strange response MatrixRequestError: If the upload failed for some reason. """ + def _media_upload(self, content, content_type): + """Wraps `self.api.media_upload` to allow error handling.""" + try: + return self.api.media_upload(content, content_type) + except MatrixRequestError as e: + raise MatrixRequestError( + code=e.code, + content="Upload failed: %s" % e + ) + + def _upload(self, response): + """Helper function to be used as callback by `self.upload`""" + if "content_uri" in response: + return response["content_uri"] + else: + raise MatrixUnexpectedResponse( + "The upload was successful, but content_uri wasn't found." + ) + try: # If not async, exceptions can be handled and logged return self._call( - partial(self._media_upload, content, content_type), - self._upload + partial(_media_upload, content, content_type), + _upload ) except MatrixRequestError as e: raise MatrixRequestError( @@ -472,25 +491,6 @@ def upload(self, content, content_type): content="Upload failed: %s" % e ) - def _media_upload(self, content, content_type): - """Wraps `self.api.media_upload` to allow error handling.""" - try: - return self.api.media_upload(content, content_type) - except MatrixRequestError as e: - raise MatrixRequestError( - code=e.code, - content="Upload failed: %s" % e - ) - - def _upload(self, response): - """Helper function to be used as callback by `self.upload`""" - if "content_uri" in response: - return response["content_uri"] - else: - raise MatrixUnexpectedResponse( - "The upload was successful, but content_uri wasn't found." - ) - def _mkroom(self, room_id_or_alias=None, response=None): if response and "room_id" in response: room_id_or_alias = response["room_id"]