Skip to content

Add a basic typed client #116

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions examples/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Copyright (c) 2023-2025 - Restate Software, Inc., Restate GmbH
#
# This file is part of the Restate SDK for Python,
# which is released under the MIT license.
#
# You can find a copy of the license in file LICENSE in the root
# directory of this repository or package, or at
# https://github.com/restatedev/sdk-typescript/blob/main/LICENSE
#
"""client.py"""
# pylint: disable=C0116
# pylint: disable=W0613

import restate

from greeter import greet

async def main():
client = restate.create_client("http://localhost:8080")
res = await client.service_call(greet, arg="World")
print(res)

if __name__ == "__main__":
import asyncio
asyncio.run(main())
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Source = "https://github.com/restatedev/sdk-python"
test = ["pytest", "hypercorn", "pytest-asyncio==1.1.0"]
lint = ["mypy", "pylint"]
harness = ["testcontainers", "hypercorn", "httpx"]
client = ["httpx"]
serde = ["dacite", "pydantic"]

[build-system]
Expand Down
19 changes: 19 additions & 0 deletions python/restate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
Restate SDK for Python
"""

import typing

from .service import Service
from .object import VirtualObject
from .workflow import Workflow
Expand All @@ -26,6 +28,8 @@

from .endpoint import app

from .client_types import RestateClient, RestateClientSendHandle

try:
from .harness import test_harness # type: ignore
except ImportError:
Expand All @@ -35,6 +39,18 @@
def test_harness(app, follow_logs = False, restate_image = ""): # type: ignore
"""a dummy harness constructor to raise ImportError"""
raise ImportError("Install restate-sdk[harness] to use this feature")



try:
from .client import create_client # type: ignore
except ImportError:
# we don't have the appropriate dependencies installed

# pylint: disable=unused-argument, redefined-outer-name
def create_client(ingress: str, headers: typing.Optional[dict] = None) -> "RestateClient": # type: ignore
"""a dummy client constructor to raise ImportError"""
raise ImportError("Install restate-sdk[client] to use this feature")

__all__ = [
"Service",
Expand All @@ -54,6 +70,9 @@ def test_harness(app, follow_logs = False, restate_image = ""): # type: ignore
"TerminalError",
"app",
"test_harness",
"create_client",
"RestateClient",
"RestateClientSendHandle",
"gather",
"as_completed",
"wait_completed",
Expand Down
206 changes: 206 additions & 0 deletions python/restate/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
#
# Copyright (c) 2023-2025 - Restate Software, Inc., Restate GmbH
#
# This file is part of the Restate SDK for Python,
# which is released under the MIT license.
#
# You can find a copy of the license in file LICENSE in the root
# directory of this repository or package, or at
# https://github.com/restatedev/sdk-typescript/blob/main/LICENSE
#
"""
This is a basic remote client for the Restate service.
"""

from datetime import timedelta
import httpx
import typing

from .client_types import RestateClient, RestateClientSendHandle

from .context import HandlerType
from .serde import BytesSerde, JsonSerde, Serde
from .handler import handler_from_callable

I = typing.TypeVar('I')
O = typing.TypeVar('O')

class Client(RestateClient):
"""
A basic client for connecting to the Restate service.
"""

def __init__(self, ingress: str, headers: typing.Optional[dict] = None):
self.ingress = ingress
self.headers = headers or {}
self.client = httpx.AsyncClient(base_url=ingress, headers=self.headers)


async def do_call(self,
tpe: HandlerType[I, O],
parameter: I,
key: typing.Optional[str] = None,
send_delay: typing.Optional[timedelta] = None,
send: bool = False,
idempotency_key: str | None = None,
headers: typing.Dict[str,str] | None = None
) -> O:
"""Make an RPC call to the given handler"""
target_handler = handler_from_callable(tpe)
service=target_handler.service_tag.name
handler=target_handler.name
input_serde = target_handler.handler_io.input_serde
output_serde = target_handler.handler_io.output_serde

content_type = target_handler.handler_io.accept
if headers is None:
headers = {}
if headers.get('Content-Type') is None:
headers['Content-Type'] = content_type

return await self.do_raw_call(service=service,
handler=handler,
input_param=parameter,
input_serde=input_serde,
output_serde=output_serde,
key=key,
send_delay=send_delay,
send=send,
idempotency_key=idempotency_key,
headers=headers)

async def do_raw_call(self,
service: str,
handler:str,
input_param: I,
input_serde: Serde[I],
output_serde: Serde[O],
key: typing.Optional[str] = None,
send_delay: typing.Optional[timedelta] = None,
send: bool = False,
idempotency_key: str | None = None,
headers: typing.Dict[str, str] | None = None
) -> O:
"""Make an RPC call to the given handler"""
parameter = input_serde.serialize(input_param)
if headers is not None:
headers_kvs = list(headers.items())
else:
headers_kvs = []
if send_delay is not None:
ms = int(send_delay.total_seconds() * 1000)
else:
ms = None

res = await self.post(service=service, handler=handler, send=send, content=parameter, headers=headers_kvs, key=key, delay=ms, idempotency_key=idempotency_key)
return output_serde.deserialize(res) # type: ignore

async def post(self, /,
service: str,
handler: str,
send: bool,
content: bytes,
headers: typing.List[typing.Tuple[(str, str)]] | None = None,
key: str | None = None,
delay: int | None = None,
idempotency_key: str | None = None) -> bytes:
"""
Send a POST request to the Restate service.
"""
endpoint = service
if key:
endpoint += f"/{key}"
endpoint += f"/{handler}"
if send:
endpoint += "/send"
if delay is not None:
endpoint = endpoint + f"?delay={delay}"
dict_headers = dict(headers) if headers is not None else {}
if idempotency_key is not None:
dict_headers['Idempotency-Key'] = idempotency_key
res = await self.client.post(endpoint,
headers=dict_headers,
content=content)
res.raise_for_status()
return res.content


@typing.final
@typing.override
async def service_call(self,
tpe: HandlerType[I, O],
arg: I,
idempotency_key: str | None = None,
headers: typing.Dict[str, str] | None = None
) -> O:
coro = await self.do_call(tpe, arg, idempotency_key=idempotency_key, headers=headers)
return coro


@typing.final
@typing.override
async def object_call(self,
tpe: HandlerType[I, O],
key: str,
arg: I,
idempotency_key: str | None = None,
headers: typing.Dict[str, str] | None = None
) -> O:
coro = await self.do_call(tpe, arg, key, idempotency_key=idempotency_key, headers=headers)
return coro


@typing.final
@typing.override
async def workflow_call(self,
tpe: HandlerType[I, O],
key: str,
arg: I,
idempotency_key: str | None = None,
headers: typing.Dict[str, str] | None = None
) -> O:
return await self.object_call(tpe, key, arg, idempotency_key=idempotency_key, headers=headers)


@typing.final
@typing.override
async def generic_call(self, service: str, handler: str, arg: bytes, key: str | None = None, idempotency_key: str | None = None, headers: typing.Dict[str, str] | None = None) -> bytes:
serde = BytesSerde()
call_handle = await self.do_raw_call(service=service,
handler=handler,
input_param=arg,
input_serde=serde,
output_serde=serde,
key=key,
idempotency_key=idempotency_key,
headers=headers)
return call_handle

@typing.final
@typing.override
async def generic_send(self, service: str, handler: str, arg: bytes, key: str | None = None, send_delay: timedelta | None = None, idempotency_key: str | None = None, headers: typing.Dict[str, str] | None = None) -> RestateClientSendHandle:
serde = BytesSerde()
output_serde: Serde[dict] = JsonSerde()

send_handle_json = await self.do_raw_call(service=service,
handler=handler,
input_param=arg,
input_serde=serde,
output_serde=output_serde,
key=key,
send_delay=send_delay,
send=True,
idempotency_key=idempotency_key,
headers=headers)

return RestateClientSendHandle(send_handle_json.get('invocationId', ''), 200) # TODO: verify


def create_client(ingress: str, headers: typing.Optional[dict] = None) -> RestateClient:
"""
Create a new Restate client.
"""
return Client(ingress, headers)



82 changes: 82 additions & 0 deletions python/restate/client_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#
# Copyright (c) 2023-2025 - Restate Software, Inc., Restate GmbH
#
# This file is part of the Restate SDK for Python,
# which is released under the MIT license.
#
# You can find a copy of the license in file LICENSE in the root
# directory of this repository or package, or at
# https://github.com/restatedev/sdk-typescript/blob/main/LICENSE
#
"""
Type definitions for the Restate client.
"""
import abc
from datetime import timedelta
import typing

from .context import HandlerType

I = typing.TypeVar('I')
O = typing.TypeVar('O')

class RestateClientSendHandle:
"""
A handle for a send operation.
This is used to track the status of a send operation.
"""
def __init__(self, invocation_id: str, status_code: int):
self.invocation_id = invocation_id
self.status_code = status_code

class RestateClient(abc.ABC):
"""
An abstract base class for a Restate client.
This class defines the interface for a Restate client.
"""

@abc.abstractmethod
async def service_call(self,
tpe: HandlerType[I, O],
arg: I,
idempotency_key: str | None = None,
headers: typing.Dict[str, str] | None = None) -> O:
"""Make an RPC call to the given handler"""
pass

@abc.abstractmethod
async def object_call(self,
tpe: HandlerType[I, O],
key: str,
arg: I,
idempotency_key: str | None = None,
headers: typing.Dict[str, str] | None = None) -> O:
"""Make an RPC call to the given object handler"""
pass

@abc.abstractmethod
async def workflow_call(self,
tpe: HandlerType[I, O],
key: str,
arg: I,
idempotency_key: str | None = None,
headers: typing.Dict[str, str] | None = None) -> O:
"""Make an RPC call to the given workflow handler"""
pass

@abc.abstractmethod
async def generic_call(self, service: str, handler: str, arg: bytes,
key: str | None = None,
idempotency_key: str | None = None,
headers: typing.Dict[str, str] | None = None) -> bytes:
"""Make a generic RPC call to the given service and handler"""
pass

@abc.abstractmethod
async def generic_send(self, service: str, handler: str, arg: bytes,
key: str | None = None,
send_delay: timedelta | None = None,
idempotency_key: str | None = None,
headers: typing.Dict[str, str] | None = None) -> RestateClientSendHandle:
"""Make a generic send operation to the given service and handler"""
pass
Loading