Skip to content

Fix missing timeout marker for local activities #919

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 30 additions & 14 deletions core/src/worker/activities/local_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,20 +406,36 @@ impl LocalActivityManager {
}
}
CancelOrTimeout::Timeout { run_id, resolution } => {
let tt = self
.dat
.lock()
.la_info
.get(&ExecutingLAId {
run_id,
seq_num: resolution.seq,
})
.as_ref()
.map(|lai| lai.task_token.clone());
if let Some(task_token) = tt {
Some(NextPendingLAAction::Autocomplete(
self.complete(&task_token, resolution.result),
))
let mut dat = self.dat.lock();
let exec_id = ExecutingLAId {
run_id: run_id.clone(),
seq_num: resolution.seq,
};
if let Some(lai) = dat.la_info.get(&exec_id) {
let tt = lai.task_token.clone();
if dat.outstanding_activity_tasks.contains_key(&tt) {
Some(NextPendingLAAction::Autocomplete(
self.complete(&tt, resolution.result),
))
} else {
// schedule_to_close timeout can sometimes trigger before we add
// TaskToken to dat.outstanding_activity_tasks. We need to report
// timeout result so a marker is recorded.
let lai = dat.la_info.remove(&exec_id).unwrap();
if let Some(bot) = lai.backing_off_task {
bot.abort();
}
if self.workflows_have_shut_down.is_cancelled() {
self.set_shutdown_complete_if_ready(&mut dat);
}
Some(NextPendingLAAction::Autocomplete(
LACompleteAction::Report {
run_id,
resolution,
task: None,
},
))
}
} else {
// This timeout is for a no-longer-tracked activity, so, whatever
None
Expand Down
83 changes: 82 additions & 1 deletion tests/integ_tests/workflow_tests/local_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use temporal_sdk::{
interceptors::{FailOnNondeterminismInterceptor, WorkerInterceptor},
};
use temporal_sdk_core::replay::HistoryForReplay;
use temporal_sdk_core_protos::temporal::api::failure::v1::failure::FailureInfo::TimeoutFailureInfo;
use temporal_sdk_core_protos::temporal::api::history::v1::history_event;
use temporal_sdk_core_protos::{
TestHistoryBuilder,
coresdk::{
Expand All @@ -26,14 +28,16 @@ use temporal_sdk_core_protos::{
},
temporal::api::{
common::v1::RetryPolicy,
enums::v1::{TimeoutType, UpdateWorkflowExecutionLifecycleStage},
enums::v1::{EventType, TimeoutType, UpdateWorkflowExecutionLifecycleStage},
update::v1::WaitPolicy,
},
};
use temporal_sdk_core_test_utils::{
CoreWfStarter, WorkflowHandleExt, history_from_proto_binary, init_core_replay_preloaded,
replay_sdk_worker, workflows::la_problem_workflow,
};
use tokio::join;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;

pub(crate) async fn one_local_activity_wf(ctx: WfContext) -> WorkflowResult<()> {
Expand Down Expand Up @@ -145,6 +149,83 @@ pub(crate) async fn local_act_fanout_wf(ctx: WfContext) -> WorkflowResult<()> {
Ok(().into())
}

#[tokio::test]
async fn local_activity_timeout_marker() {
let wf_name = "local_activity_timeout_marker";
let mut starter = CoreWfStarter::new(wf_name);
let mut worker = starter.worker().await;
static ACTS_STARTED: Semaphore = Semaphore::const_new(0);

worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move {
let local_activity = ctx.local_activity(LocalActivityOptions {
schedule_to_close_timeout: Some(Duration::from_millis(200)),
activity_type: "stop_activity".to_string(),
input: "hi!".as_json_payload().expect("serializes fine"),
..Default::default()
});
local_activity.await;
Ok(().into())
});

worker.register_activity(
"stop_activity",
|_ctx: ActContext, _: String| async move {
ACTS_STARTED.add_permits(1);
Result::<(), _>::Err(anyhow!("Oh no I failed!").into())
},
);

let run_id = worker
.submit_wf(
wf_name.to_owned(),
wf_name.to_owned(),
vec![],
WorkflowOptions {
execution_timeout: Some(Duration::from_secs(1)),
..Default::default()
},
)
.await
.unwrap();

let handle = worker.inner_mut().shutdown_handle();
let shutdowner = async {
let _ = ACTS_STARTED.acquire().await;
handle();
};
let runner = async {
worker.run_until_done().await.unwrap();
};
join!(shutdowner, runner);

let client = starter.get_client().await;
let history = client
.get_workflow_execution_history(wf_name.to_owned(), Some(run_id), vec![])
.await
.unwrap()
.history
.unwrap();
let marker = history
.events
.iter()
.find(|he| he.event_type() == EventType::MarkerRecorded)
.expect("expected marker recorded event");

if let Some(history_event::Attributes::MarkerRecordedEventAttributes(attr)) =
marker.clone().attributes
{
let failure = attr.failure.unwrap().failure_info.unwrap();
match failure {
TimeoutFailureInfo(failure) => {
assert_eq!(failure.timeout_type(), TimeoutType::ScheduleToClose);
}
_ => panic!("Expected a timeout failure"),
}
} else {
unreachable!("We already assert MarkerRecorded event type");
}
}

#[tokio::test]
async fn local_act_fanout() {
let wf_name = "local_act_fanout";
Expand Down
Loading