Skip to content

[Bug] cancelled timer callback causes asyncio.exceptions.InvalidStateError #782

Open
@dandavison

Description

@dandavison

The workflow below succeeds, but an exception occurs and is logged by the worker. In the example below this exception does not cause a workflow or workflow task failure, but we should nevertheless prevent it. What happens is:

  1. An asyncio.Task is created, which blocks on a timer.
  2. The task is cancelled, which throws asyncio.CancelledError into the coroutine, and cancels the sleep future.
  3. But SDK internals have set a callback that attempts to resolve the future. This callback still fires at the set timer time, despite the future having been cancelled in the interim, causing asyncio.exceptions.InvalidStateError. See

self._timer_impl(
duration,
_TimerOptions(user_metadata=user_metadata),
lambda: fut.set_result(None),
)

🔴 caught asyncio.CancelledError when sleeping in task
Exception in callback _WorkflowInstanceImpl.workflow_sleep.<locals>.<lambda>() at /Users/dan/src/temporalio/sdk-python/temporalio/worker/_workflow_instance.py:1456
handle: <_TimerHandle when=1741139892.760706 _WorkflowInstanceImpl.workflow_sleep.<locals>.<lambda>() at /Users/dan/src/temporalio/sdk-python/temporalio/worker/_workflow_instance.py:1456>
Traceback (most recent call last):
  File "/Users/dan/.local/share/uv/python/cpython-3.13.1-macos-aarch64-none/lib/python3.13/asyncio/events.py", line 89, in _run
    self._context.run(self._callback, *self._args)
    ~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/dan/src/temporalio/sdk-python/temporalio/worker/_workflow_instance.py", line 1456, in <lambda>
    lambda: fut.set_result(None),
            ~~~~~~~~~~~~~~^^^^^^
asyncio.exceptions.InvalidStateError: invalid state
Result: Hello, World!
import asyncio
import uuid

from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker


@workflow.defn
class SayHello:
    async def _my_task(self) -> None:
        try:
            await workflow.sleep(6, summary="my task")
        except asyncio.CancelledError:
            print("🔴 caught asyncio.CancelledError when sleeping in task")

    @workflow.run
    async def run(self) -> str:
        task = asyncio.create_task(self._my_task())
        await workflow.sleep(2, summary="let the loop start")

        try:
            task.cancel()
            await task
        except BaseException:
            assert False, "unreachable"

        await workflow.sleep(15)
        return "Hello, World!"


async def main():
    client = await Client.connect("localhost:7233")
    task_queue = "timer-callback-exception-task-queue"
    async with Worker(
        client,
        task_queue=task_queue,
        workflows=[SayHello],
    ):
        result = await client.execute_workflow(
            SayHello.run,
            id=str(uuid.uuid4()),
            task_queue=task_queue,
        )
        print(f"Result: {result}")


if __name__ == "__main__":
    asyncio.run(main())

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions