diff --git a/reqrespactivity/readme.md b/reqrespactivity/readme.md new file mode 100644 index 00000000..eb5d3872 --- /dev/null +++ b/reqrespactivity/readme.md @@ -0,0 +1,64 @@ + +# Request/Response Sample with Activity-Based Responses + +This sample demonstrates how to send a request and get a response from a Temporal workflow via a response activity. + +In this example, the workflow accepts requests (signals) to uppercase a string and then provides the response via a callback response activity. Because the response is delivered by an activity execution, the requester must have its own worker running. + +## Running + +Follow these steps to run the sample: + +1. **Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use):** + +2. **Run the Worker:** + In one terminal, run the worker that executes the workflow and activity: + ```bash + python worker.py + ``` + +3. **Start the Workflow:** + In another terminal, start the workflow instance: + ```bash + python starter.py + ``` + +4. **Run the Requester:** + In a third terminal, run the requester that sends a request every second: + ```bash + python requester_run.py + ``` + This will send requests like `foo0`, `foo1`, etc., to be uppercased by the workflow. You should see output similar to: + ``` + Requested uppercase for 'foo0', got: 'FOO0' + Requested uppercase for 'foo1', got: 'FOO1' + ... + ``` + +Multiple instances of these processes can be run in separate terminals to confirm that they work independently. + +## Comparison with Query-Based Responses + +In the [reqrespquery](../reqrespquery) sample, responses are fetched by periodically polling the workflow using queries. This sample, however, uses activity-based responses, which has the following pros and cons: + +**Pros:** + +* Activity-based responses are often faster due to pushing rather than polling. +* The workflow does not need to explicitly store the response state. +* The workflow can detect whether a response was actually received. + +**Cons:** + +* Activity-based responses require a worker on the caller (requester) side. +* They record the response in history as an activity execution. +* They can only occur while the workflow is running. + +## Explanation of Continue-As-New + +Workflows have a limit on history size. When the event count grows too large, a workflow can return a `ContinueAsNew` error to atomically start a new workflow execution. To prevent data loss, signals must be drained and any pending futures completed before a new execution starts. + +In this sample, which is designed to run long-term, a `ContinueAsNew` is performed once the request count reaches a specified limit, provided there are no in-flight signal requests or executing activities. (If signals are processed too quickly or activities take too long, the workflow might never idle long enough for a `ContinueAsNew` to be triggered.) Careful tuning of signal and activity handling (including setting appropriate retry policies) is essential to ensure that the workflow can transition smoothly to a new execution when needed. + +## License + +This sample is released under the MIT License. diff --git a/reqrespactivity/requester.py b/reqrespactivity/requester.py new file mode 100644 index 00000000..daf63a28 --- /dev/null +++ b/reqrespactivity/requester.py @@ -0,0 +1,77 @@ +# requester.py +import asyncio +import uuid +from dataclasses import dataclass +from temporalio import activity +from temporalio.client import Client +from temporalio.worker import Worker + +# Global variable to hold the current Requester instance. +global_requester_instance = None + +@dataclass +class Request: + id: str + input: str + response_activity: str + response_task_queue: str + +@dataclass +class Response: + id: str + output: str + error: str = "" + +# Define the response activity as a top-level function with the decorator. +@activity.defn +async def response_activity(response: Response): + global global_requester_instance + if global_requester_instance: + fut = global_requester_instance.pending.pop(response.id, None) + if fut: + fut.set_result(response) + else: + raise Exception("No requester instance available") + +class Requester: + def __init__(self, client: Client, target_workflow_id: str): + self.client = client + self.target_workflow_id = target_workflow_id + self.task_queue = "requester-" + str(uuid.uuid4()) + self.pending = {} # Maps request IDs to asyncio.Future objects + + async def start_worker(self): + global global_requester_instance + global_requester_instance = self # Set the global reference + self.worker = Worker( + self.client, + task_queue=self.task_queue, + activities=[response_activity], + ) + # Run the worker in the background. + asyncio.create_task(self.worker.run()) + + async def close(self): + await self.worker.shutdown() + + async def request_uppercase(self, text: str) -> str: + req_id = str(uuid.uuid4()) + req = Request( + id=req_id, + input=text, + response_activity="response_activity", # Must match the name of the decorated function + response_task_queue=self.task_queue, + ) + loop = asyncio.get_running_loop() + fut = loop.create_future() + self.pending[req_id] = fut + + # Get a handle to the workflow and send the signal. + handle = self.client.get_workflow_handle(self.target_workflow_id) + await handle.signal("request", req) + + # Wait for the callback to return the response. + response: Response = await fut + if response.error: + raise Exception(response.error) + return response.output diff --git a/reqrespactivity/requester_run.py b/reqrespactivity/requester_run.py new file mode 100644 index 00000000..3a797c20 --- /dev/null +++ b/reqrespactivity/requester_run.py @@ -0,0 +1,23 @@ +# requester_run.py +import asyncio +from temporalio.client import Client +from requester import Requester + +async def main(): + client = await Client.connect("localhost:7233") + workflow_id = "reqrespactivity_workflow" + requester = Requester(client, workflow_id) + await requester.start_worker() + try: + i = 0 + while True: + text = f"foo{i}" # Create request similar to the Go sample: foo0, foo1, etc. + result = await requester.request_uppercase(text) + print(f"Requested uppercase for '{text}', got: '{result}'") + await asyncio.sleep(1) + i += 1 + finally: + await requester.close() + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/reqrespactivity/starter.py b/reqrespactivity/starter.py new file mode 100644 index 00000000..24e0e4b9 --- /dev/null +++ b/reqrespactivity/starter.py @@ -0,0 +1,17 @@ +# starter.py +import asyncio +from temporalio.client import Client +from workflow import UppercaseWorkflow + +async def main(): + client = await Client.connect("localhost:7233") + workflow_id = "reqrespactivity_workflow" + handle = await client.start_workflow( + UppercaseWorkflow.run, + id=workflow_id, + task_queue="reqrespactivity", + ) + print(f"Started workflow with ID: {handle.id}") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/reqrespactivity/worker.py b/reqrespactivity/worker.py new file mode 100644 index 00000000..6c864b23 --- /dev/null +++ b/reqrespactivity/worker.py @@ -0,0 +1,19 @@ +# worker.py +import asyncio +from temporalio.client import Client +from temporalio.worker import Worker +from workflow import UppercaseWorkflow, uppercase_activity + +async def main(): + client = await Client.connect("localhost:7233") + worker = Worker( + client, + task_queue="reqrespactivity", + workflows=[UppercaseWorkflow], + activities=[uppercase_activity], + ) + print("Worker started on task queue 'reqrespactivity'") + await worker.run() + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/reqrespactivity/workflow.py b/reqrespactivity/workflow.py new file mode 100644 index 00000000..83e8a58f --- /dev/null +++ b/reqrespactivity/workflow.py @@ -0,0 +1,60 @@ +# workflow.py +import asyncio +from datetime import timedelta +from dataclasses import dataclass +from temporalio import workflow, activity + +# Define data models similar to the Go structs. +@dataclass +class Request: + id: str + input: str + response_activity: str + response_task_queue: str + +@dataclass +class Response: + id: str + output: str + error: str = "" + +# Activity to convert text to uppercase. +@activity.defn +async def uppercase_activity(text: str) -> str: + return text.upper() + +# Workflow that listens for "request" signals. +@workflow.defn +class UppercaseWorkflow: + def __init__(self): + self.requests = [] # Buffer for incoming requests + + @workflow.signal + def request(self, req: Request): + self.requests.append(req) + + @workflow.run + async def run(self): + # Continuously process incoming requests. + while True: + if self.requests: + req = self.requests.pop(0) + try: + # Execute the uppercase activity. + result = await workflow.execute_activity( + uppercase_activity, + req.input, + start_to_close_timeout=timedelta(seconds=5), + ) + resp = Response(id=req.id, output=result) + except Exception as e: + resp = Response(id=req.id, output="", error=str(e)) + # Call back the requester via the designated response activity. + await workflow.execute_activity( + req.response_activity, + resp, + task_queue=req.response_task_queue, + start_to_close_timeout=timedelta(seconds=10), + ) + else: + await workflow.sleep(1)