From 4c8db1569b0eb5f8283493113a07c206ca3e6257 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Thu, 15 May 2025 16:17:53 -0700 Subject: [PATCH] Add fix, with test --- .../src/worker/activities/local_activities.rs | 44 ++++++---- .../workflow_tests/local_activities.rs | 83 ++++++++++++++++++- 2 files changed, 112 insertions(+), 15 deletions(-) diff --git a/core/src/worker/activities/local_activities.rs b/core/src/worker/activities/local_activities.rs index ae7822404..d0cbd9909 100644 --- a/core/src/worker/activities/local_activities.rs +++ b/core/src/worker/activities/local_activities.rs @@ -406,20 +406,36 @@ impl LocalActivityManager { } } CancelOrTimeout::Timeout { run_id, resolution } => { - let tt = self - .dat - .lock() - .la_info - .get(&ExecutingLAId { - run_id, - seq_num: resolution.seq, - }) - .as_ref() - .map(|lai| lai.task_token.clone()); - if let Some(task_token) = tt { - Some(NextPendingLAAction::Autocomplete( - self.complete(&task_token, resolution.result), - )) + let mut dat = self.dat.lock(); + let exec_id = ExecutingLAId { + run_id: run_id.clone(), + seq_num: resolution.seq, + }; + if let Some(lai) = dat.la_info.get(&exec_id) { + let tt = lai.task_token.clone(); + if dat.outstanding_activity_tasks.contains_key(&tt) { + Some(NextPendingLAAction::Autocomplete( + self.complete(&tt, resolution.result), + )) + } else { + // schedule_to_close timeout can sometimes trigger before we add + // TaskToken to dat.outstanding_activity_tasks. We need to report + // timeout result so a marker is recorded. + let lai = dat.la_info.remove(&exec_id).unwrap(); + if let Some(bot) = lai.backing_off_task { + bot.abort(); + } + if self.workflows_have_shut_down.is_cancelled() { + self.set_shutdown_complete_if_ready(&mut dat); + } + Some(NextPendingLAAction::Autocomplete( + LACompleteAction::Report { + run_id, + resolution, + task: None, + }, + )) + } } else { // This timeout is for a no-longer-tracked activity, so, whatever None diff --git a/tests/integ_tests/workflow_tests/local_activities.rs b/tests/integ_tests/workflow_tests/local_activities.rs index 90bd025d0..29f65fc57 100644 --- a/tests/integ_tests/workflow_tests/local_activities.rs +++ b/tests/integ_tests/workflow_tests/local_activities.rs @@ -16,6 +16,8 @@ use temporal_sdk::{ interceptors::{FailOnNondeterminismInterceptor, WorkerInterceptor}, }; use temporal_sdk_core::replay::HistoryForReplay; +use temporal_sdk_core_protos::temporal::api::failure::v1::failure::FailureInfo::TimeoutFailureInfo; +use temporal_sdk_core_protos::temporal::api::history::v1::history_event; use temporal_sdk_core_protos::{ TestHistoryBuilder, coresdk::{ @@ -26,7 +28,7 @@ use temporal_sdk_core_protos::{ }, temporal::api::{ common::v1::RetryPolicy, - enums::v1::{TimeoutType, UpdateWorkflowExecutionLifecycleStage}, + enums::v1::{EventType, TimeoutType, UpdateWorkflowExecutionLifecycleStage}, update::v1::WaitPolicy, }, }; @@ -34,6 +36,8 @@ use temporal_sdk_core_test_utils::{ CoreWfStarter, WorkflowHandleExt, history_from_proto_binary, init_core_replay_preloaded, replay_sdk_worker, workflows::la_problem_workflow, }; +use tokio::join; +use tokio::sync::Semaphore; use tokio_util::sync::CancellationToken; pub(crate) async fn one_local_activity_wf(ctx: WfContext) -> WorkflowResult<()> { @@ -145,6 +149,83 @@ pub(crate) async fn local_act_fanout_wf(ctx: WfContext) -> WorkflowResult<()> { Ok(().into()) } +#[tokio::test] +async fn local_activity_timeout_marker() { + let wf_name = "local_activity_timeout_marker"; + let mut starter = CoreWfStarter::new(wf_name); + let mut worker = starter.worker().await; + static ACTS_STARTED: Semaphore = Semaphore::const_new(0); + + worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move { + let local_activity = ctx.local_activity(LocalActivityOptions { + schedule_to_close_timeout: Some(Duration::from_millis(200)), + activity_type: "stop_activity".to_string(), + input: "hi!".as_json_payload().expect("serializes fine"), + ..Default::default() + }); + local_activity.await; + Ok(().into()) + }); + + worker.register_activity( + "stop_activity", + |_ctx: ActContext, _: String| async move { + ACTS_STARTED.add_permits(1); + Result::<(), _>::Err(anyhow!("Oh no I failed!").into()) + }, + ); + + let run_id = worker + .submit_wf( + wf_name.to_owned(), + wf_name.to_owned(), + vec![], + WorkflowOptions { + execution_timeout: Some(Duration::from_secs(1)), + ..Default::default() + }, + ) + .await + .unwrap(); + + let handle = worker.inner_mut().shutdown_handle(); + let shutdowner = async { + let _ = ACTS_STARTED.acquire().await; + handle(); + }; + let runner = async { + worker.run_until_done().await.unwrap(); + }; + join!(shutdowner, runner); + + let client = starter.get_client().await; + let history = client + .get_workflow_execution_history(wf_name.to_owned(), Some(run_id), vec![]) + .await + .unwrap() + .history + .unwrap(); + let marker = history + .events + .iter() + .find(|he| he.event_type() == EventType::MarkerRecorded) + .expect("expected marker recorded event"); + + if let Some(history_event::Attributes::MarkerRecordedEventAttributes(attr)) = + marker.clone().attributes + { + let failure = attr.failure.unwrap().failure_info.unwrap(); + match failure { + TimeoutFailureInfo(failure) => { + assert_eq!(failure.timeout_type(), TimeoutType::ScheduleToClose); + } + _ => panic!("Expected a timeout failure"), + } + } else { + unreachable!("We already assert MarkerRecorded event type"); + } +} + #[tokio::test] async fn local_act_fanout() { let wf_name = "local_act_fanout";