From eb8563563ddb799209927314192458df1b2d4c27 Mon Sep 17 00:00:00 2001 From: sbahling Date: Mon, 17 Oct 2022 18:30:43 +0200 Subject: [PATCH 1/2] Add awaited send_message functions for curio and trio implemenations In order to send a message over an awaited server socket (i.e. when you need external servers to reply to your listening server socket) the sock.sendto call needs to also be awaited. This commit adds async/await versions of the send_message and send_bundle functions that are called by the OSCCurioServer and OSCTrioServer send_message methods. --- oscpy/client.py | 49 ++++++++++++++++++++++++++++++++++++ oscpy/server/curio_server.py | 47 ++++++++++++++++++++++++++++++++++ oscpy/server/trio_server.py | 47 ++++++++++++++++++++++++++++++++++ tests/test_server.py | 41 +++++++++++++++++++++++------- 4 files changed, 175 insertions(+), 9 deletions(-) diff --git a/oscpy/client.py b/oscpy/client.py index d1c1856..f1fd347 100644 --- a/oscpy/client.py +++ b/oscpy/client.py @@ -150,3 +150,52 @@ def send_bundle(self, messages, timetag=None, safer=False): ) self.stats += stats return stats + + +# async versions for curio and trio implemenations +# sock.sendto on an awaited server socket needs to be awaited as well + +async def async_send_message( + osc_address, values, ip_address, port, sock=SOCK, safer=False, + encoding='', encoding_errors='strict' +): + """Send an osc message to a socket address async version. + + See `send_message` for usage information + """ + if platform != 'win32' and sock.family == socket.AF_UNIX: + address = ip_address + else: + address = (ip_address, port) + + message, stats = format_message( + osc_address, values, encoding=encoding, + encoding_errors=encoding_errors + ) + + await sock.sendto(message, address) + if safer: + sleep(10e-9) + + return stats + + +async def async_send_bundle( + messages, ip_address, port, timetag=None, sock=None, safer=False, + encoding='', encoding_errors='strict' +): + """Send a bundle built from the `messages` iterable. + + See `send_bundle` for usage information. + """ + if not sock: + sock = SOCK + bundle, stats = format_bundle( + messages, timetag=timetag, encoding=encoding, + encoding_errors=encoding_errors + ) + await sock.sendto(bundle, (ip_address, port)) + if safer: + sleep(10e-9) + + return stats diff --git a/oscpy/server/curio_server.py b/oscpy/server/curio_server.py index 1a6e4b3..b228755 100644 --- a/oscpy/server/curio_server.py +++ b/oscpy/server/curio_server.py @@ -5,6 +5,7 @@ from curio import TaskGroup, socket from oscpy.server import OSCBaseServer, UDP_MAX_SIZE +from oscpy.client import async_send_bundle, async_send_message logging.basicConfig() logger = logging.getLogger(__name__) @@ -23,6 +24,52 @@ def get_socket(family, addr): sock.bind(addr) return sock + async def send_message(self, + osc_address, values, ip_address, port, sock=None, safer=False, + encoding='', encoding_errors='strict' + ): + if not sock and self.default_socket: + sock = self.default_socket + elif not sock: + raise RuntimeError('no default socket yet and no socket provided') + stats = await async_send_message( + osc_address, + values, + ip_address, + port, + sock=sock, + safer=safer, + encoding=self.encoding, + encoding_errors=self.encoding_errors + ) + self.stats_sent += stats + return stats + + async def send_bundle( + self, messages, ip_address, port, timetag=None, sock=None, safer=False + ): + """Shortcut to the client's `send_bundle` method. + + Use the `default_socket` of the server by default. + See `client.send_bundle` for more info about the parameters. + """ + if not sock and self.default_socket: + sock = self.default_socket + elif not sock: + raise RuntimeError('no default socket yet and no socket provided') + + stats = await async_send_bundle( + messages, + ip_address, + port, + sock=sock, + safer=safer, + encoding=self.encoding, + encoding_errors=self.encoding_errors + ) + self.stats_sent += stats + return stats + async def _listen(self, sock): async with TaskGroup(wait=all) as g: self.task_groups[sock] = g diff --git a/oscpy/server/trio_server.py b/oscpy/server/trio_server.py index d21eeb3..5a3a33f 100644 --- a/oscpy/server/trio_server.py +++ b/oscpy/server/trio_server.py @@ -6,6 +6,7 @@ from trio import socket, open_nursery, move_on_after from oscpy.server import OSCBaseServer, UDP_MAX_SIZE +from oscpy.client import async_send_bundle, async_send_message logging.basicConfig() logger = logging.getLogger(__name__) @@ -24,6 +25,52 @@ async def get_socket(family, addr): await sock.bind(addr) return sock + async def send_message(self, + osc_address, values, ip_address, port, sock=None, safer=False, + encoding='', encoding_errors='strict' + ): + if not sock and self.default_socket: + sock = self.default_socket + elif not sock: + raise RuntimeError('no default socket yet and no socket provided') + stats = await async_send_message( + osc_address, + values, + ip_address, + port, + sock=sock, + safer=safer, + encoding=self.encoding, + encoding_errors=self.encoding_errors + ) + self.stats_sent += stats + return stats + + async def send_bundle( + self, messages, ip_address, port, timetag=None, sock=None, safer=False + ): + """Shortcut to the client's `send_bundle` method. + + Use the `default_socket` of the server by default. + See `client.send_bundle` for more info about the parameters. + """ + if not sock and self.default_socket: + sock = self.default_socket + elif not sock: + raise RuntimeError('no default socket yet and no socket provided') + + stats = await async_send_bundle( + messages, + ip_address, + port, + sock=sock, + safer=safer, + encoding=self.encoding, + encoding_errors=self.encoding_errors + ) + self.stats_sent += stats + return stats + async def listen( self, address='localhost', port=0, default=False, family='inet' ): diff --git a/tests/test_server.py b/tests/test_server.py index f7e8985..2e591d2 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -136,12 +136,31 @@ def test_terminate_server(cls): assert osc.join_server(timeout=0.1) assert not osc._thread.is_alive() +@pytest.mark.parametrize("cls", server_classes) +def test_send_message_using_default_socket(cls): + + event = Event() + + def success(*values): + event.set() + + osc = cls() + sock = _await(osc.listen, osc) + address = sock.getsockname()[0] + port = sock.getsockname()[1] + osc.bind(b'/success', success, sock) + + _await(osc.listen, osc, kwargs=dict(default=True)) + _await(osc.send_message, osc, args=[b'/success', [], address, port]) + + runner(osc, timeout=.2) + assert event.is_set() @pytest.mark.parametrize("cls", server_classes) def test_send_message_without_socket(cls): osc = cls() with pytest.raises(RuntimeError): - osc.send_message(b'/test', [], 'localhost', 0) + _await(osc.send_message, osc, args=[b'/test', [], 'localhost', 0]) @pytest.mark.parametrize("cls", server_classes) @@ -187,15 +206,16 @@ def broken_callback(*values): def test_send_bundle_without_socket(cls): osc = cls() with pytest.raises(RuntimeError): - osc.send_bundle([], 'localhost', 0) + _await(osc.send_bundle, osc, args=[[], 'localhost', 0]) sock = _await(osc.listen, osc, kwargs={'default': True}) - osc.send_bundle( - ( - (b'/test', []), - ), - 'localhost', 1 - ) + _await(osc.send_bundle, osc, + args=[( + (b'/test', []), + ), + 'localhost', 1 + ] + ) @pytest.mark.parametrize("cls", server_classes) @@ -964,7 +984,10 @@ def callback(index): client.send_message('/callback', [0]) # sever sends message on different port, might crash the server on windows: - osc.send_message('/callback', ["nobody is going to receive this"], ip_address='localhost', port=port + 1) + _await(osc.send_message, osc, + args=['/callback', ["nobody is going to receive this"]], + kwargs=dict(ip_address='localhost', port=port + 1) + ) # client sends message to server again. if server is dead, message # will not be received: From 234bfa979838a535f33edcdf7ff822d59f8a96c1 Mon Sep 17 00:00:00 2001 From: sbahling Date: Mon, 17 Oct 2022 18:30:43 +0200 Subject: [PATCH 2/2] Add awaited send_message functions for curio and trio implemenations In order to send a message over an awaited server socket (i.e. when you need external servers to reply to your listening server socket) the sock.sendto call needs to also be awaited. This commit adds async/await versions of the send_message and send_bundle functions that are called by the OSCCurioServer and OSCTrioServer send_message and send_bundle methods. --- oscpy/client.py | 49 ++++++++++++++++++++++++++++++++++++ oscpy/server/curio_server.py | 47 ++++++++++++++++++++++++++++++++++ oscpy/server/trio_server.py | 47 ++++++++++++++++++++++++++++++++++ tests/test_server.py | 41 +++++++++++++++++++++++------- 4 files changed, 175 insertions(+), 9 deletions(-) diff --git a/oscpy/client.py b/oscpy/client.py index d1c1856..f1fd347 100644 --- a/oscpy/client.py +++ b/oscpy/client.py @@ -150,3 +150,52 @@ def send_bundle(self, messages, timetag=None, safer=False): ) self.stats += stats return stats + + +# async versions for curio and trio implemenations +# sock.sendto on an awaited server socket needs to be awaited as well + +async def async_send_message( + osc_address, values, ip_address, port, sock=SOCK, safer=False, + encoding='', encoding_errors='strict' +): + """Send an osc message to a socket address async version. + + See `send_message` for usage information + """ + if platform != 'win32' and sock.family == socket.AF_UNIX: + address = ip_address + else: + address = (ip_address, port) + + message, stats = format_message( + osc_address, values, encoding=encoding, + encoding_errors=encoding_errors + ) + + await sock.sendto(message, address) + if safer: + sleep(10e-9) + + return stats + + +async def async_send_bundle( + messages, ip_address, port, timetag=None, sock=None, safer=False, + encoding='', encoding_errors='strict' +): + """Send a bundle built from the `messages` iterable. + + See `send_bundle` for usage information. + """ + if not sock: + sock = SOCK + bundle, stats = format_bundle( + messages, timetag=timetag, encoding=encoding, + encoding_errors=encoding_errors + ) + await sock.sendto(bundle, (ip_address, port)) + if safer: + sleep(10e-9) + + return stats diff --git a/oscpy/server/curio_server.py b/oscpy/server/curio_server.py index 1a6e4b3..b228755 100644 --- a/oscpy/server/curio_server.py +++ b/oscpy/server/curio_server.py @@ -5,6 +5,7 @@ from curio import TaskGroup, socket from oscpy.server import OSCBaseServer, UDP_MAX_SIZE +from oscpy.client import async_send_bundle, async_send_message logging.basicConfig() logger = logging.getLogger(__name__) @@ -23,6 +24,52 @@ def get_socket(family, addr): sock.bind(addr) return sock + async def send_message(self, + osc_address, values, ip_address, port, sock=None, safer=False, + encoding='', encoding_errors='strict' + ): + if not sock and self.default_socket: + sock = self.default_socket + elif not sock: + raise RuntimeError('no default socket yet and no socket provided') + stats = await async_send_message( + osc_address, + values, + ip_address, + port, + sock=sock, + safer=safer, + encoding=self.encoding, + encoding_errors=self.encoding_errors + ) + self.stats_sent += stats + return stats + + async def send_bundle( + self, messages, ip_address, port, timetag=None, sock=None, safer=False + ): + """Shortcut to the client's `send_bundle` method. + + Use the `default_socket` of the server by default. + See `client.send_bundle` for more info about the parameters. + """ + if not sock and self.default_socket: + sock = self.default_socket + elif not sock: + raise RuntimeError('no default socket yet and no socket provided') + + stats = await async_send_bundle( + messages, + ip_address, + port, + sock=sock, + safer=safer, + encoding=self.encoding, + encoding_errors=self.encoding_errors + ) + self.stats_sent += stats + return stats + async def _listen(self, sock): async with TaskGroup(wait=all) as g: self.task_groups[sock] = g diff --git a/oscpy/server/trio_server.py b/oscpy/server/trio_server.py index d21eeb3..5a3a33f 100644 --- a/oscpy/server/trio_server.py +++ b/oscpy/server/trio_server.py @@ -6,6 +6,7 @@ from trio import socket, open_nursery, move_on_after from oscpy.server import OSCBaseServer, UDP_MAX_SIZE +from oscpy.client import async_send_bundle, async_send_message logging.basicConfig() logger = logging.getLogger(__name__) @@ -24,6 +25,52 @@ async def get_socket(family, addr): await sock.bind(addr) return sock + async def send_message(self, + osc_address, values, ip_address, port, sock=None, safer=False, + encoding='', encoding_errors='strict' + ): + if not sock and self.default_socket: + sock = self.default_socket + elif not sock: + raise RuntimeError('no default socket yet and no socket provided') + stats = await async_send_message( + osc_address, + values, + ip_address, + port, + sock=sock, + safer=safer, + encoding=self.encoding, + encoding_errors=self.encoding_errors + ) + self.stats_sent += stats + return stats + + async def send_bundle( + self, messages, ip_address, port, timetag=None, sock=None, safer=False + ): + """Shortcut to the client's `send_bundle` method. + + Use the `default_socket` of the server by default. + See `client.send_bundle` for more info about the parameters. + """ + if not sock and self.default_socket: + sock = self.default_socket + elif not sock: + raise RuntimeError('no default socket yet and no socket provided') + + stats = await async_send_bundle( + messages, + ip_address, + port, + sock=sock, + safer=safer, + encoding=self.encoding, + encoding_errors=self.encoding_errors + ) + self.stats_sent += stats + return stats + async def listen( self, address='localhost', port=0, default=False, family='inet' ): diff --git a/tests/test_server.py b/tests/test_server.py index f7e8985..2e591d2 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -136,12 +136,31 @@ def test_terminate_server(cls): assert osc.join_server(timeout=0.1) assert not osc._thread.is_alive() +@pytest.mark.parametrize("cls", server_classes) +def test_send_message_using_default_socket(cls): + + event = Event() + + def success(*values): + event.set() + + osc = cls() + sock = _await(osc.listen, osc) + address = sock.getsockname()[0] + port = sock.getsockname()[1] + osc.bind(b'/success', success, sock) + + _await(osc.listen, osc, kwargs=dict(default=True)) + _await(osc.send_message, osc, args=[b'/success', [], address, port]) + + runner(osc, timeout=.2) + assert event.is_set() @pytest.mark.parametrize("cls", server_classes) def test_send_message_without_socket(cls): osc = cls() with pytest.raises(RuntimeError): - osc.send_message(b'/test', [], 'localhost', 0) + _await(osc.send_message, osc, args=[b'/test', [], 'localhost', 0]) @pytest.mark.parametrize("cls", server_classes) @@ -187,15 +206,16 @@ def broken_callback(*values): def test_send_bundle_without_socket(cls): osc = cls() with pytest.raises(RuntimeError): - osc.send_bundle([], 'localhost', 0) + _await(osc.send_bundle, osc, args=[[], 'localhost', 0]) sock = _await(osc.listen, osc, kwargs={'default': True}) - osc.send_bundle( - ( - (b'/test', []), - ), - 'localhost', 1 - ) + _await(osc.send_bundle, osc, + args=[( + (b'/test', []), + ), + 'localhost', 1 + ] + ) @pytest.mark.parametrize("cls", server_classes) @@ -964,7 +984,10 @@ def callback(index): client.send_message('/callback', [0]) # sever sends message on different port, might crash the server on windows: - osc.send_message('/callback', ["nobody is going to receive this"], ip_address='localhost', port=port + 1) + _await(osc.send_message, osc, + args=['/callback', ["nobody is going to receive this"]], + kwargs=dict(ip_address='localhost', port=port + 1) + ) # client sends message to server again. if server is dead, message # will not be received: