diff --git a/examples/client.py b/examples/client.py new file mode 100644 index 0000000..6e87daa --- /dev/null +++ b/examples/client.py @@ -0,0 +1,26 @@ +# +# Copyright (c) 2023-2025 - Restate Software, Inc., Restate GmbH +# +# This file is part of the Restate SDK for Python, +# which is released under the MIT license. +# +# You can find a copy of the license in file LICENSE in the root +# directory of this repository or package, or at +# https://github.com/restatedev/sdk-typescript/blob/main/LICENSE +# +"""client.py""" +# pylint: disable=C0116 +# pylint: disable=W0613 + +import restate + +from greeter import greet + +async def main(): + client = restate.create_client("http://localhost:8080") + res = await client.service_call(greet, arg="World") + print(res) + +if __name__ == "__main__": + import asyncio + asyncio.run(main()) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index e45d3e2..e30cb01 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ Source = "https://github.com/restatedev/sdk-python" test = ["pytest", "hypercorn", "pytest-asyncio==1.1.0"] lint = ["mypy", "pylint"] harness = ["testcontainers", "hypercorn", "httpx"] +client = ["httpx"] serde = ["dacite", "pydantic"] [build-system] diff --git a/python/restate/__init__.py b/python/restate/__init__.py index 252f8ee..00a1d03 100644 --- a/python/restate/__init__.py +++ b/python/restate/__init__.py @@ -12,6 +12,8 @@ Restate SDK for Python """ +import typing + from .service import Service from .object import VirtualObject from .workflow import Workflow @@ -26,6 +28,8 @@ from .endpoint import app +from .client_types import RestateClient, RestateClientSendHandle + try: from .harness import test_harness # type: ignore except ImportError: @@ -35,6 +39,18 @@ def test_harness(app, follow_logs = False, restate_image = ""): # type: ignore """a dummy harness constructor to raise ImportError""" raise ImportError("Install restate-sdk[harness] to use this feature") + + + +try: + from .client import create_client # type: ignore +except ImportError: + # we don't have the appropriate dependencies installed + + # pylint: disable=unused-argument, redefined-outer-name + def create_client(ingress: str, headers: typing.Optional[dict] = None) -> "RestateClient": # type: ignore + """a dummy client constructor to raise ImportError""" + raise ImportError("Install restate-sdk[client] to use this feature") __all__ = [ "Service", @@ -54,6 +70,9 @@ def test_harness(app, follow_logs = False, restate_image = ""): # type: ignore "TerminalError", "app", "test_harness", + "create_client", + "RestateClient", + "RestateClientSendHandle", "gather", "as_completed", "wait_completed", diff --git a/python/restate/client.py b/python/restate/client.py new file mode 100644 index 0000000..9c9a170 --- /dev/null +++ b/python/restate/client.py @@ -0,0 +1,206 @@ +# +# Copyright (c) 2023-2025 - Restate Software, Inc., Restate GmbH +# +# This file is part of the Restate SDK for Python, +# which is released under the MIT license. +# +# You can find a copy of the license in file LICENSE in the root +# directory of this repository or package, or at +# https://github.com/restatedev/sdk-typescript/blob/main/LICENSE +# +""" +This is a basic remote client for the Restate service. +""" + +from datetime import timedelta +import httpx +import typing + +from .client_types import RestateClient, RestateClientSendHandle + +from .context import HandlerType +from .serde import BytesSerde, JsonSerde, Serde +from .handler import handler_from_callable + +I = typing.TypeVar('I') +O = typing.TypeVar('O') + +class Client(RestateClient): + """ + A basic client for connecting to the Restate service. + """ + + def __init__(self, ingress: str, headers: typing.Optional[dict] = None): + self.ingress = ingress + self.headers = headers or {} + self.client = httpx.AsyncClient(base_url=ingress, headers=self.headers) + + + async def do_call(self, + tpe: HandlerType[I, O], + parameter: I, + key: typing.Optional[str] = None, + send_delay: typing.Optional[timedelta] = None, + send: bool = False, + idempotency_key: str | None = None, + headers: typing.Dict[str,str] | None = None + ) -> O: + """Make an RPC call to the given handler""" + target_handler = handler_from_callable(tpe) + service=target_handler.service_tag.name + handler=target_handler.name + input_serde = target_handler.handler_io.input_serde + output_serde = target_handler.handler_io.output_serde + + content_type = target_handler.handler_io.accept + if headers is None: + headers = {} + if headers.get('Content-Type') is None: + headers['Content-Type'] = content_type + + return await self.do_raw_call(service=service, + handler=handler, + input_param=parameter, + input_serde=input_serde, + output_serde=output_serde, + key=key, + send_delay=send_delay, + send=send, + idempotency_key=idempotency_key, + headers=headers) + + async def do_raw_call(self, + service: str, + handler:str, + input_param: I, + input_serde: Serde[I], + output_serde: Serde[O], + key: typing.Optional[str] = None, + send_delay: typing.Optional[timedelta] = None, + send: bool = False, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None + ) -> O: + """Make an RPC call to the given handler""" + parameter = input_serde.serialize(input_param) + if headers is not None: + headers_kvs = list(headers.items()) + else: + headers_kvs = [] + if send_delay is not None: + ms = int(send_delay.total_seconds() * 1000) + else: + ms = None + + res = await self.post(service=service, handler=handler, send=send, content=parameter, headers=headers_kvs, key=key, delay=ms, idempotency_key=idempotency_key) + return output_serde.deserialize(res) # type: ignore + + async def post(self, /, + service: str, + handler: str, + send: bool, + content: bytes, + headers: typing.List[typing.Tuple[(str, str)]] | None = None, + key: str | None = None, + delay: int | None = None, + idempotency_key: str | None = None) -> bytes: + """ + Send a POST request to the Restate service. + """ + endpoint = service + if key: + endpoint += f"/{key}" + endpoint += f"/{handler}" + if send: + endpoint += "/send" + if delay is not None: + endpoint = endpoint + f"?delay={delay}" + dict_headers = dict(headers) if headers is not None else {} + if idempotency_key is not None: + dict_headers['Idempotency-Key'] = idempotency_key + res = await self.client.post(endpoint, + headers=dict_headers, + content=content) + res.raise_for_status() + return res.content + + + @typing.final + @typing.override + async def service_call(self, + tpe: HandlerType[I, O], + arg: I, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None + ) -> O: + coro = await self.do_call(tpe, arg, idempotency_key=idempotency_key, headers=headers) + return coro + + + @typing.final + @typing.override + async def object_call(self, + tpe: HandlerType[I, O], + key: str, + arg: I, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None + ) -> O: + coro = await self.do_call(tpe, arg, key, idempotency_key=idempotency_key, headers=headers) + return coro + + + @typing.final + @typing.override + async def workflow_call(self, + tpe: HandlerType[I, O], + key: str, + arg: I, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None + ) -> O: + return await self.object_call(tpe, key, arg, idempotency_key=idempotency_key, headers=headers) + + + @typing.final + @typing.override + async def generic_call(self, service: str, handler: str, arg: bytes, key: str | None = None, idempotency_key: str | None = None, headers: typing.Dict[str, str] | None = None) -> bytes: + serde = BytesSerde() + call_handle = await self.do_raw_call(service=service, + handler=handler, + input_param=arg, + input_serde=serde, + output_serde=serde, + key=key, + idempotency_key=idempotency_key, + headers=headers) + return call_handle + + @typing.final + @typing.override + async def generic_send(self, service: str, handler: str, arg: bytes, key: str | None = None, send_delay: timedelta | None = None, idempotency_key: str | None = None, headers: typing.Dict[str, str] | None = None) -> RestateClientSendHandle: + serde = BytesSerde() + output_serde: Serde[dict] = JsonSerde() + + send_handle_json = await self.do_raw_call(service=service, + handler=handler, + input_param=arg, + input_serde=serde, + output_serde=output_serde, + key=key, + send_delay=send_delay, + send=True, + idempotency_key=idempotency_key, + headers=headers) + + return RestateClientSendHandle(send_handle_json.get('invocationId', ''), 200) # TODO: verify + + +def create_client(ingress: str, headers: typing.Optional[dict] = None) -> RestateClient: + """ + Create a new Restate client. + """ + return Client(ingress, headers) + + + diff --git a/python/restate/client_types.py b/python/restate/client_types.py new file mode 100644 index 0000000..c9c069b --- /dev/null +++ b/python/restate/client_types.py @@ -0,0 +1,82 @@ +# +# Copyright (c) 2023-2025 - Restate Software, Inc., Restate GmbH +# +# This file is part of the Restate SDK for Python, +# which is released under the MIT license. +# +# You can find a copy of the license in file LICENSE in the root +# directory of this repository or package, or at +# https://github.com/restatedev/sdk-typescript/blob/main/LICENSE +# +""" +Type definitions for the Restate client. +""" +import abc +from datetime import timedelta +import typing + +from .context import HandlerType + +I = typing.TypeVar('I') +O = typing.TypeVar('O') + +class RestateClientSendHandle: + """ + A handle for a send operation. + This is used to track the status of a send operation. + """ + def __init__(self, invocation_id: str, status_code: int): + self.invocation_id = invocation_id + self.status_code = status_code + +class RestateClient(abc.ABC): + """ + An abstract base class for a Restate client. + This class defines the interface for a Restate client. + """ + + @abc.abstractmethod + async def service_call(self, + tpe: HandlerType[I, O], + arg: I, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None) -> O: + """Make an RPC call to the given handler""" + pass + + @abc.abstractmethod + async def object_call(self, + tpe: HandlerType[I, O], + key: str, + arg: I, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None) -> O: + """Make an RPC call to the given object handler""" + pass + + @abc.abstractmethod + async def workflow_call(self, + tpe: HandlerType[I, O], + key: str, + arg: I, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None) -> O: + """Make an RPC call to the given workflow handler""" + pass + + @abc.abstractmethod + async def generic_call(self, service: str, handler: str, arg: bytes, + key: str | None = None, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None) -> bytes: + """Make a generic RPC call to the given service and handler""" + pass + + @abc.abstractmethod + async def generic_send(self, service: str, handler: str, arg: bytes, + key: str | None = None, + send_delay: timedelta | None = None, + idempotency_key: str | None = None, + headers: typing.Dict[str, str] | None = None) -> RestateClientSendHandle: + """Make a generic send operation to the given service and handler""" + pass \ No newline at end of file