Skip to content

Fix autoscaler returning fatal error on GOAWAY if no scaling seen #973

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ static TEMPORAL_NAMESPACE_HEADER_KEY: &str = "temporal-namespace";

/// Key used to communicate when a GRPC message is too large
pub static MESSAGE_TOO_LARGE_KEY: &str = "message-too-large";
/// Key used to indicate a error was returned by the retryer because of the short-circuit predicate
pub static ERROR_RETURNED_DUE_TO_SHORT_CIRCUIT: &str = "short-circuit";

/// The server times out polls after 60 seconds. Set our timeout to be slightly beyond that.
const LONG_POLL_TIMEOUT: Duration = Duration::from_secs(70);
Expand Down
15 changes: 12 additions & 3 deletions client/src/retry.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
Client, IsWorkerTaskLongPoll, MESSAGE_TOO_LARGE_KEY, NamespacedClient, NoRetryOnMatching,
Result, RetryConfig, raw::IsUserLongPoll,
Client, ERROR_RETURNED_DUE_TO_SHORT_CIRCUIT, IsWorkerTaskLongPoll, MESSAGE_TOO_LARGE_KEY,
NamespacedClient, NoRetryOnMatching, Result, RetryConfig, raw::IsUserLongPoll,
};
use backoff::{Clock, SystemClock, backoff::Backoff, exponential::ExponentialBackoff};
use futures_retry::{ErrorHandler, FutureRetry, RetryPolicy};
Expand Down Expand Up @@ -214,6 +214,10 @@ where
if let Some(sc) = self.retry_short_circuit.as_ref()
&& (sc.predicate)(&e)
{
e.metadata_mut().insert(
ERROR_RETURNED_DUE_TO_SHORT_CIRCUIT,
tonic::metadata::MetadataValue::from(0),
);
return RetryPolicy::ForwardError(e);
}

Expand Down Expand Up @@ -441,7 +445,12 @@ mod tests {
FixedClock(Instant::now()),
);
let result = err_handler.handle(1, Status::new(Code::ResourceExhausted, "leave me alone"));
assert_matches!(result, RetryPolicy::ForwardError(_))
let e = assert_matches!(result, RetryPolicy::ForwardError(e) => e);
assert!(
e.metadata()
.get(ERROR_RETURNED_DUE_TO_SHORT_CIRCUIT)
.is_some()
);
}

#[tokio::test]
Expand Down
77 changes: 63 additions & 14 deletions core/src/pollers/poll_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{
},
time::Duration,
};
use temporal_client::NoRetryOnMatching;
use temporal_client::{ERROR_RETURNED_DUE_TO_SHORT_CIRCUIT, NoRetryOnMatching};
use temporal_sdk_core_api::worker::{
ActivitySlotKind, NexusSlotKind, PollerBehavior, SlotKind, WorkflowSlotKind,
};
Expand Down Expand Up @@ -538,20 +538,27 @@ impl PollScalerReportHandle {
}
}
Err(e) => {
// We should only see (and react to) errors in autoscaling mode
if matches!(self.behavior, PollerBehavior::Autoscaling { .. })
&& self.ever_saw_scaling_decision.load(Ordering::Relaxed)
{
debug!("Got error from server while polling: {:?}", e);
if e.code() == Code::ResourceExhausted {
// Scale down significantly for resource exhaustion
self.change_target(usize::saturating_div, 2);
} else {
// Other codes that would normally have made us back off briefly can
// reclaim this poller
self.change_target(usize::saturating_sub, 1);
if matches!(self.behavior, PollerBehavior::Autoscaling { .. }) {
// We should only react to errors in autoscaling mode if we saw a scaling
// decision
if self.ever_saw_scaling_decision.load(Ordering::Relaxed) {
debug!("Got error from server while polling: {:?}", e);
if e.code() == Code::ResourceExhausted {
// Scale down significantly for resource exhaustion
self.change_target(usize::saturating_div, 2);
} else {
// Other codes that would normally have made us back off briefly can
// reclaim this poller
self.change_target(usize::saturating_sub, 1);
}
}
return false;
// Only propagate errors out if they weren't because of the short-circuiting
// logic. IE: We don't want to fail callers because we said we wanted to know
// about ResourceExhausted errors, but we haven't seen a scaling decision yet,
// so we're not reacting to errors, only propagating them.
return !e
.metadata()
.contains_key(ERROR_RETURNED_DUE_TO_SHORT_CIRCUIT);
Comment on lines +555 to +561
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I get some background here? I expect Tonic to hide go-away and implicitly handle reconnects. We send a go away every 5m on average. Is there a situation where a regular non-worker client use may see some go-away?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bug in hyper or tonic that was worked around here: #811

But, the short-circuit that the autoscaler turns on so it can scale better on otherwise non-fatal errors makes this come through as fatal - so it gets ignored specifically here.

Copy link
Member

@cretz cretz Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(marking approved anyways, just trying to understand some strangeness here)

So IIUC the server by default sends a GoAway after 5m (aka soft connection close to tell you to stop making more calls after whatever is in flight) and then hard-closes the TCP connection after 7m (because 2m is enough time between soft and hard for you to never hit this in a properly behaving client).

So somehow we're sending RPC calls even after soft close and therefore hitting the 7m limit? If that is the case, there can be data loss in a rare/racy way if hard TCP death occurs during gRPC call (in our case it'd be a task timeout because server might send us task and then close conn). Or maybe Tonic is the one that's eagerly returning a Code::Cancelled w/ "connection closed" before it ever even makes the call? Obviously not important if it all works today, but it is a bit confusing to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the latter thing is the explanation, but yeah I agree it's confusing.

}
}
}
Expand Down Expand Up @@ -748,4 +755,46 @@ mod tests {
pb.poll().await.unwrap().unwrap();
pb.shutdown().await;
}

#[tokio::test]
async fn autoscale_wont_fail_caller_on_short_circuited_error() {
let mut mock_client = mock_manual_worker_client();
mock_client
.expect_poll_workflow_task()
.times(1)
.returning(move |_, _| {
async {
let mut st = tonic::Status::cancelled("whatever");
st.metadata_mut()
.insert(ERROR_RETURNED_DUE_TO_SHORT_CIRCUIT, 1.into());
Err(st)
}
.boxed()
});
mock_client
.expect_poll_workflow_task()
.times(1)
.returning(move |_, _| async { Ok(Default::default()) }.boxed());

let pb = LongPollBuffer::new_workflow_task(
Arc::new(mock_client),
"sometq".to_string(),
None,
PollerBehavior::Autoscaling {
minimum: 1,
maximum: 1,
initial: 1,
},
fixed_size_permit_dealer(1),
CancellationToken::new(),
None::<fn(usize)>,
WorkflowTaskOptions {
wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(1)))),
},
);

// Should not see error, unwraps should get empty response
pb.poll().await.unwrap().unwrap();
pb.shutdown().await;
}
}
24 changes: 14 additions & 10 deletions tests/integ_tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ struct GenericService<F> {
}
impl<F> Service<tonic::codegen::http::Request<Body>> for GenericService<F>
where
F: FnMut() -> Response<Body>,
F: FnMut() -> BoxFuture<'static, Response<Body>>,
{
type Response = Response<Body>;
type Error = Infallible;
Expand All @@ -133,7 +133,7 @@ where
)
.unwrap();
let r = (self.response_maker)();
async move { Ok(r) }.boxed()
async move { Ok(r.await) }.boxed()
}
}
impl<F> NamedService for GenericService<F> {
Expand All @@ -144,12 +144,12 @@ struct FakeServer {
addr: std::net::SocketAddr,
shutdown_tx: oneshot::Sender<()>,
header_rx: tokio::sync::mpsc::UnboundedReceiver<String>,
server_handle: tokio::task::JoinHandle<()>,
pub server_handle: tokio::task::JoinHandle<()>,
}

async fn fake_server<F>(response_maker: F) -> FakeServer
where
F: FnMut() -> Response<Body> + Clone + Send + Sync + 'static,
F: FnMut() -> BoxFuture<'static, Response<Body>> + Clone + Send + Sync + 'static,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this change while trying to get the fake server to create the goaway. Couldn't do it, but, it's probably useful for the fake responses to be async anyway.

{
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let (header_tx, header_rx) = tokio::sync::mpsc::unbounded_channel();
Expand Down Expand Up @@ -191,7 +191,7 @@ impl FakeServer {

#[tokio::test]
async fn timeouts_respected_one_call_fake_server() {
let mut fs = fake_server(|| Response::new(Body::empty())).await;
let mut fs = fake_server(|| async { Response::new(Body::empty()) }.boxed()).await;
let header_rx = &mut fs.header_rx;

let mut opts = get_integ_server_options();
Expand Down Expand Up @@ -260,7 +260,11 @@ async fn non_retryable_errors() {
Code::Unauthenticated,
Code::Unimplemented,
] {
let mut fs = fake_server(move || Status::new(code, "bla").into_http()).await;
let mut fs = fake_server(move || {
let s = Status::new(code, "bla").into_http();
async { s }.boxed()
})
.await;

let mut opts = get_integ_server_options();
let uri = format!("http://localhost:{}", fs.addr.port())
Expand Down Expand Up @@ -292,13 +296,13 @@ async fn retryable_errors() {
{
let count = Arc::new(AtomicUsize::new(0));
let mut fs = fake_server(move || {
dbg!("Making resp");
let prev = count.fetch_add(1, Ordering::Relaxed);
if prev < 3 {
let r = if prev < 3 {
Status::new(code, "bla").into_http()
} else {
make_ok_response(RespondActivityTaskCanceledResponse::default())
}
};
async { r }.boxed()
})
.await;

Expand Down Expand Up @@ -335,7 +339,7 @@ async fn namespace_header_attached_to_relevant_calls() {
.add_service(GenericService {
header_to_parse: "Temporal-Namespace",
header_tx,
response_maker: || Response::new(Body::empty()),
response_maker: || async { Response::new(Body::empty()) }.boxed(),
})
.serve_with_incoming_shutdown(
tokio_stream::wrappers::TcpListenerStream::new(listener),
Expand Down
13 changes: 13 additions & 0 deletions tests/integ_tests/workflow_tests/activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use temporal_sdk::{
ActContext, ActExitValue, ActivityError, ActivityOptions, CancellableFuture, WfContext,
WfExitValue, WorkflowResult,
};
use temporal_sdk_core_api::worker::PollerBehavior;
use temporal_sdk_core_protos::{
DEFAULT_ACTIVITY_TYPE, TaskToken,
coresdk::{
Expand Down Expand Up @@ -1066,11 +1067,23 @@ async fn activity_can_be_cancelled_by_local_timeout() {

#[tokio::test]
#[ignore] // Runs forever, used to manually attempt to repro spurious activity completion rpc errs
// Unfortunately there is no way to unit test this as tonic doesn't publicly expose the necessary
// machinery to construct the right kind of error.
async fn long_activity_timeout_repro() {
let wf_name = "long_activity_timeout_repro";
let mut starter = CoreWfStarter::new(wf_name);
starter
.worker_config
.workflow_task_poller_behavior(PollerBehavior::Autoscaling {
minimum: 1,
maximum: 10,
initial: 5,
})
.activity_task_poller_behavior(PollerBehavior::Autoscaling {
minimum: 1,
maximum: 10,
initial: 5,
})
.local_timeout_buffer_for_activities(Duration::from_secs(0));
let mut worker = starter.worker().await;
worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move {
Expand Down
Loading