Open
Description
Describe the bug
The following test sometimes fails with temporalio.exceptions.ApplicationError: Client should never see this error!
tests/worker/test_workflow.py::test_workflow_return_is_honored_when_it_precedes_signal_completion_command
Minimal Reproduction
Running the test 10,000 times reproduces it fairly reliably:
zsh
set -e
for i in $(seq 10000); do; poe test tests/worker/test_workflow.py -k test_workflow_return_is_honored_when_it_precedes_signal_completion_command; done
Environment/Versions
- OS and processor: ARM Ubuntu (CI), x86 Mac (CI), M4 Mac (local)
- Temporal Version: SDK main @ 257f143
- Are you using Docker or Kubernetes or building Temporal from source? Both
Additional context
CI jobs with failure:
https://github.com/temporalio/sdk-python/actions/runs/14937905636/job/41969535426
https://github.com/temporalio/sdk-python/actions/runs/14980669571/job/42083795290
Terminal output:
=================================== FAILURES ===================================
__ test_workflow_return_is_honored_when_it_precedes_signal_completion_command __
temporalio.exceptions.ApplicationError: Client should never see this error!
The above exception was the direct cause of the following exception:
client = <temporalio.client.Client object at 0xfff417c82210>
main_workflow_returns_before_signal_completions = True
async def _do_first_completion_command_is_honored_test(
client: Client, main_workflow_returns_before_signal_completions: bool
):
workflow_cls: Union[
Type[FirstCompletionCommandIsHonoredPingPongWorkflow],
Type[FirstCompletionCommandIsHonoredWorkflow],
] = (
FirstCompletionCommandIsHonoredPingPongWorkflow
if main_workflow_returns_before_signal_completions
else FirstCompletionCommandIsHonoredWorkflow
)
async with Worker(
client,
task_queue="tq",
workflows=[workflow_cls],
) as worker:
handle = await client.start_workflow(
workflow_cls.run,
id=f"wf-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
await handle.signal(workflow_cls.this_signal_executes_second)
await handle.signal(workflow_cls.this_signal_executes_first)
try:
> result = await handle.result()
tests/worker/test_workflow.py:6060:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <temporalio.client.WorkflowHandle object at 0xfff40eb1cf80>
async def result(
self,
*,
follow_runs: bool = True,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> ReturnType:
"""Wait for result of the workflow.
This will use :py:attr:`result_run_id` if present to base the result on.
To use another run ID, a new handle must be created via
:py:meth:`Client.get_workflow_handle`.
Args:
follow_runs: If true (default), workflow runs will be continually
fetched, until the most recent one is found. If false, the first
result is used.
rpc_metadata: Headers used on the RPC call. Keys here override
client-level RPC metadata keys.
rpc_timeout: Optional RPC deadline to set for each RPC call. Note,
this is the timeout for each history RPC call not this overall
function.
Returns:
Result of the workflow after being converted by the data converter.
Raises:
WorkflowFailureError: Workflow failed, was cancelled, was
terminated, or timed out. Use the
:py:attr:`WorkflowFailureError.cause` to see the underlying
reason.
Exception: Other possible failures during result fetching.
"""
# We have to maintain our own run ID because it can change if we follow
# executions
hist_run_id = self._result_run_id
while True:
async for event in self._fetch_history_events_for_run(
hist_run_id,
wait_new_event=True,
event_filter_type=WorkflowHistoryEventFilterType.CLOSE_EVENT,
skip_archival=True,
rpc_metadata=rpc_metadata,
rpc_timeout=rpc_timeout,
):
if event.HasField("workflow_execution_completed_event_attributes"):
complete_attr = event.workflow_execution_completed_event_attributes
# Follow execution
if follow_runs and complete_attr.new_execution_run_id:
hist_run_id = complete_attr.new_execution_run_id
break
# Ignoring anything after the first response like TypeScript
type_hints = [self._result_type] if self._result_type else None
results = await self._client.data_converter.decode_wrapper(
complete_attr.result,
type_hints,
)
if not results:
return cast(ReturnType, None)
elif len(results) > 1:
warnings.warn(f"Expected single result, got {len(results)}")
return cast(ReturnType, results[0])
elif event.HasField("workflow_execution_failed_event_attributes"):
fail_attr = event.workflow_execution_failed_event_attributes
# Follow execution
if follow_runs and fail_attr.new_execution_run_id:
hist_run_id = fail_attr.new_execution_run_id
break
> raise WorkflowFailureError(
cause=await self._client.data_converter.decode_failure(
fail_attr.failure
),
E temporalio.client.WorkflowFailureError: Workflow execution failed
temporalio/client.py:[163](https://github.com/temporalio/sdk-python/actions/runs/14937905636/job/41969535426#step:14:164)7: WorkflowFailureError
During handling of the above exception, another exception occurred:
client = <temporalio.client.Client object at 0xfff417c82210>
async def test_workflow_return_is_honored_when_it_precedes_signal_completion_command(
client: Client,
):
> await _do_first_completion_command_is_honored_test(
client, main_workflow_returns_before_signal_completions=True
)
tests/worker/test_workflow.py:6031:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
client = <temporalio.client.Client object at 0xfff417c82210>
main_workflow_returns_before_signal_completions = True
async def _do_first_completion_command_is_honored_test(
client: Client, main_workflow_returns_before_signal_completions: bool
):
workflow_cls: Union[
Type[FirstCompletionCommandIsHonoredPingPongWorkflow],
Type[FirstCompletionCommandIsHonoredWorkflow],
] = (
FirstCompletionCommandIsHonoredPingPongWorkflow
if main_workflow_returns_before_signal_completions
else FirstCompletionCommandIsHonoredWorkflow
)
async with Worker(
client,
task_queue="tq",
workflows=[workflow_cls],
) as worker:
handle = await client.start_workflow(
workflow_cls.run,
id=f"wf-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
await handle.signal(workflow_cls.this_signal_executes_second)
await handle.signal(workflow_cls.this_signal_executes_first)
try:
result = await handle.result()
except WorkflowFailureError as err:
if main_workflow_returns_before_signal_completions:
> raise RuntimeError(
"Expected no error due to main workflow coroutine returning first"
)
E RuntimeError: Expected no error due to main workflow coroutine returning first
tests/worker/test_workflow.py:6063: RuntimeError
------------------------------ Captured log call -------------------------------
21:25:24 [ DEBUG] Workflow raised failure with run ID 0196b6f0-4e20-78dc-9011-350d605a0452 (_workflow_instance.py:2003)
Traceback (most recent call last):
File "/home/runner/work/sdk-python/sdk-python/temporalio/worker/_workflow_instance.py", line 1982, in _run_top_level_workflow_function
await coro
File "/home/runner/work/sdk-python/sdk-python/temporalio/testing/_workflow.py", line 522, in handle_signal
return await super().handle_signal(input)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/sdk-python/sdk-python/temporalio/worker/_interceptor.py", line 337, in handle_signal
return await self.next.handle_signal(input)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/sdk-python/sdk-python/temporalio/worker/_workflow_instance.py", line 2372, in handle_signal
await handler(*input.args)
File "/home/runner/work/sdk-python/sdk-python/temporalio/workflow.py", line [177](https://github.com/temporalio/sdk-python/actions/runs/14937905636/job/41969535426#step:14:178)4, in with_object
return await fn(obj, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/sdk-python/sdk-python/tests/worker/test_workflow.py", line 6001, in this_signal_executes_second
raise ApplicationError("Client should never see this error!")
temporalio.exceptions.ApplicationError: Client should never see this error!
21:25:24 [ DEBUG] Workflow raised failure with run ID 0196b6f0-4e20-78dc-9011-350d605a0452 (_workflow_instance.py:2003)
Traceback (most recent call last):
File "/home/runner/work/sdk-python/sdk-python/temporalio/worker/_workflow_instance.py", line 1982, in _run_top_level_workflow_function
await coro
File "/home/runner/work/sdk-python/sdk-python/temporalio/testing/_workflow.py", line 522, in handle_signal
return await super().handle_signal(input)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/sdk-python/sdk-python/temporalio/worker/_interceptor.py", line 337, in handle_signal
return await self.next.handle_signal(input)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/sdk-python/sdk-python/temporalio/worker/_workflow_instance.py", line 2372, in handle_signal
await handler(*input.args)
File "/home/runner/work/sdk-python/sdk-python/temporalio/workflow.py", line 1774, in with_object
return await fn(obj, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/sdk-python/sdk-python/tests/worker/test_workflow.py", line 5990, in this_signal_executes_first
raise ApplicationError(
...<2 lines>...
)
temporalio.exceptions.ApplicationError: Client should see this error unless doing ping-pong (in which case main coroutine returns first)
21:25:24 [ DEBUG] Evicting workflow with run ID 0[196](https://github.com/temporalio/sdk-python/actions/runs/14937905636/job/41969535426#step:14:197)b6f0-4e20-78dc-9011-350d605a0452, message: Workflow completed (_workflow.py:365)
21:25:24 [ INFO] Beginning worker shutdown, will wait 0:00:00 before cancelling activities (_worker.py:640)
=============================== warnings summary ===============================
tests/worker/test_activity.py::test_sync_activity_process_non_picklable
tests/worker/test_activity.py::test_sync_activity_process_failure
tests/worker/test_activity.py::test_sync_activity_process_cancel
tests/worker/test_activity.py::test_sync_activity_process_cancel_uncaught
tests/worker/test_activity.py::test_sync_activity_process_heartbeat_details
tests/worker/test_activity.py::test_sync_activity_process_non_picklable_heartbeat_details
tests/worker/test_activity.py::test_sync_activity_dynamic_process
/home/runner/work/sdk-python/sdk-python/tests/worker/test_activity.py:1331: UserWarning: Worker max_concurrent_activities is 50 but activity_executor's max_workers is only 2
async with Worker(**worker_config):
tests/worker/test_activity.py::test_sync_activity_process_worker_shutdown_graceful
/home/runner/work/sdk-python/sdk-python/tests/worker/test_activity.py:934: UserWarning: Worker max_concurrent_activities is 50 but activity_executor's max_workers is only 2
act_worker = Worker(
tests/worker/test_activity.py::test_sync_activity_process_executor_crash
/home/runner/work/sdk-python/sdk-python/tests/worker/test_activity.py:991: UserWarning: Worker max_concurrent_activities is 50 but activity_executor's max_workers is only 2
act_worker = Worker(
tests/worker/workflow_sandbox/test_runner.py::test_workflow_sandbox_global_state[sandboxed_passthrough_modules0]
/home/runner/work/sdk-python/sdk-python/temporalio/worker/workflow_sandbox/_importer.py:[234](https://github.com/temporalio/sdk-python/actions/runs/14937905636/job/41969535426#step:14:235): RuntimeWarning: coroutine '_WorkflowInstanceImpl._apply_initialize_workflow.<locals>.run_workflow' was never awaited
mod = importlib.__import__(name, globals, locals, fromlist, level)
Enable tracemalloc to get traceback where the object was allocated.
See https://docs.pytest.org/en/stable/how-to/capture-warnings.html#resource-warnings for more info.
-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
- generated xml file: /home/runner/work/sdk-python/sdk-python/junit-xml/3.13--ubuntu-arm.xml -
=========================== short test summary info ============================
FAILED tests/worker/test_workflow.py::test_workflow_return_is_honored_when_it_precedes_signal_completion_command - RuntimeError: Expected no error due to main workflow coroutine returning first
====== 1 failed, 386 passed, 8 skipped, 10 warnings in 271.45s (0:04:31) =======
Error: Process completed with exit code 1.