From 23c35d445b149071d71e040a8b790f15b138c2d1 Mon Sep 17 00:00:00 2001 From: Maciej Dudkowski Date: Tue, 15 Jul 2025 12:59:05 -0400 Subject: [PATCH 1/6] Do not auto-retry gRPC-message-size-too-large errors --- .../temporal/failure/ApplicationFailure.java | 4 +- .../internal/worker/WorkflowWorker.java | 160 ++++++++++++------ .../workflow/GrpcMessageTooLargeTest.java | 105 ++++++++++++ .../retryer/GrpcMessageTooLargeException.java | 7 + .../internal/retryer/GrpcRetryerUtils.java | 19 ++- .../testservice/GRPCServerHelper.java | 19 ++- .../testservice/InProcessGRPCServer.java | 72 +++++++- .../retryer/GrpcAsyncRetryerTest.java | 45 +++++ .../internal/retryer/GrpcSyncRetryerTest.java | 38 +++++ 9 files changed, 402 insertions(+), 67 deletions(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/GrpcMessageTooLargeTest.java create mode 100644 temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcMessageTooLargeException.java diff --git a/temporal-sdk/src/main/java/io/temporal/failure/ApplicationFailure.java b/temporal-sdk/src/main/java/io/temporal/failure/ApplicationFailure.java index 91d496bf49..0aba428138 100644 --- a/temporal-sdk/src/main/java/io/temporal/failure/ApplicationFailure.java +++ b/temporal-sdk/src/main/java/io/temporal/failure/ApplicationFailure.java @@ -236,7 +236,9 @@ public static final class Builder { private Duration nextRetryDelay; private ApplicationErrorCategory category; - private Builder() {} + private Builder() { + category = ApplicationErrorCategory.UNSPECIFIED; + } private Builder(ApplicationFailure options) { if (options == null) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java index 636ae2fccd..c1bd7ce3c2 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java @@ -12,9 +12,13 @@ import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.TaskQueueKind; import io.temporal.api.enums.v1.WorkflowTaskFailedCause; +import io.temporal.api.failure.v1.Failure; import io.temporal.api.workflowservice.v1.*; +import io.temporal.failure.ApplicationFailure; import io.temporal.internal.logging.LoggerTag; +import io.temporal.internal.retryer.GrpcMessageTooLargeException; import io.temporal.internal.retryer.GrpcRetryer; +import io.temporal.payload.context.WorkflowSerializationContext; import io.temporal.serviceclient.MetricsTag; import io.temporal.serviceclient.RpcRetryOptions; import io.temporal.serviceclient.WorkflowServiceStubs; @@ -394,73 +398,108 @@ public void handle(WorkflowTask task) throws Exception { PollWorkflowTaskQueueResponse currentTask = nextWFTResponse.get(); nextWFTResponse = Optional.empty(); WorkflowTaskHandler.Result result = handleTask(currentTask, workflowTypeScope); + WorkflowTaskFailedCause taskFailedCause = null; try { RespondWorkflowTaskCompletedRequest taskCompleted = result.getTaskCompleted(); RespondWorkflowTaskFailedRequest taskFailed = result.getTaskFailed(); RespondQueryTaskCompletedRequest queryCompleted = result.getQueryCompleted(); - if (taskCompleted != null) { - RespondWorkflowTaskCompletedRequest.Builder requestBuilder = - taskCompleted.toBuilder(); - try (EagerActivitySlotsReservation activitySlotsReservation = - new EagerActivitySlotsReservation(eagerActivityDispatcher)) { - activitySlotsReservation.applyToRequest(requestBuilder); - RespondWorkflowTaskCompletedResponse response = - sendTaskCompleted( - currentTask.getTaskToken(), - requestBuilder, - result.getRequestRetryOptions(), - workflowTypeScope); - // If we were processing a speculative WFT the server may instruct us that the task - // was dropped by resting out event ID. - long resetEventId = response.getResetHistoryEventId(); - if (resetEventId != 0) { - result.getResetEventIdHandle().apply(resetEventId); - } - nextWFTResponse = - response.hasWorkflowTask() - ? Optional.of(response.getWorkflowTask()) - : Optional.empty(); - // TODO we don't have to do this under the runId lock - activitySlotsReservation.handleResponse(response); - } - } else if (taskFailed != null) { - sendTaskFailed( - currentTask.getTaskToken(), - taskFailed.toBuilder(), - result.getRequestRetryOptions(), - workflowTypeScope); - } else if (queryCompleted != null) { + if (queryCompleted != null) { sendDirectQueryCompletedResponse( currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope); + } else { + try { + if (taskCompleted != null) { + RespondWorkflowTaskCompletedRequest.Builder requestBuilder = + taskCompleted.toBuilder(); + try (EagerActivitySlotsReservation activitySlotsReservation = + new EagerActivitySlotsReservation(eagerActivityDispatcher)) { + activitySlotsReservation.applyToRequest(requestBuilder); + RespondWorkflowTaskCompletedResponse response = + sendTaskCompleted( + currentTask.getTaskToken(), + requestBuilder, + result.getRequestRetryOptions(), + workflowTypeScope); + // If we were processing a speculative WFT the server may instruct us that the + // task was dropped by resting out event ID. + long resetEventId = response.getResetHistoryEventId(); + if (resetEventId != 0) { + result.getResetEventIdHandle().apply(resetEventId); + } + nextWFTResponse = + response.hasWorkflowTask() + ? Optional.of(response.getWorkflowTask()) + : Optional.empty(); + // TODO we don't have to do this under the runId lock + activitySlotsReservation.handleResponse(response); + } + } else if (taskFailed != null) { + taskFailedCause = taskFailed.getCause(); + sendTaskFailed( + currentTask.getTaskToken(), + taskFailed.toBuilder(), + result.getRequestRetryOptions(), + workflowTypeScope); + } + } catch (GrpcMessageTooLargeException e) { + releaseReason = SlotReleaseReason.error(e); + handleReportingFailure( + e, currentTask, result, workflowExecution, workflowTypeScope); + // replacing failure cause for metrics purposes + taskFailedCause = + WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE; + + String message = + String.format( + "Failed to send workflow task %s: %s", + taskFailed == null ? "completion" : "failure", e.getMessage()); + ApplicationFailure applicationFailure = + ApplicationFailure.newBuilder() + .setMessage(message) + .setType("GrpcMessageTooLargeException") + .setNonRetryable(true) + .build(); + Failure failure = + options + .getDataConverter() + .withContext( + new WorkflowSerializationContext( + namespace, workflowExecution.getWorkflowId())) + .exceptionToFailure(applicationFailure); + RespondWorkflowTaskFailedRequest.Builder taskFailedBuilder = + RespondWorkflowTaskFailedRequest.newBuilder() + .setFailure(failure) + .setCause( + WorkflowTaskFailedCause + .WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE); + sendTaskFailed( + currentTask.getTaskToken(), + taskFailedBuilder, + result.getRequestRetryOptions(), + workflowTypeScope); + } } } catch (Exception e) { - logExceptionDuringResultReporting(e, currentTask, result); releaseReason = SlotReleaseReason.error(e); - // if we failed to report the workflow task completion back to the server, - // our cached version of the workflow may be more advanced than the server is aware of. - // We should discard this execution and perform a clean replay based on what server - // knows next time. - cache.invalidate( - workflowExecution, workflowTypeScope, "Failed result reporting to the server", e); + handleReportingFailure(e, currentTask, result, workflowExecution, workflowTypeScope); throw e; } - if (result.getTaskFailed() != null) { - Scope workflowTaskFailureScope = workflowTypeScope; - if (result - .getTaskFailed() - .getCause() - .equals( - WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR)) { - workflowTaskFailureScope = - workflowTaskFailureScope.tagged( - ImmutableMap.of(TASK_FAILURE_TYPE, "NonDeterminismError")); - } else { - workflowTaskFailureScope = - workflowTaskFailureScope.tagged( - ImmutableMap.of(TASK_FAILURE_TYPE, "WorkflowError")); + if (taskFailedCause != null) { + String taskFailureType; + switch (taskFailedCause) { + case WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR: + taskFailureType = "NonDeterminismError"; + break; + case WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE: + taskFailureType = "GrpcMessageTooLarge"; + break; + default: + taskFailureType = "WorkflowError"; } + Scope workflowTaskFailureScope = + workflowTypeScope.tagged(ImmutableMap.of(TASK_FAILURE_TYPE, taskFailureType)); // we don't trigger the counter in case of the legacy query // (which never has taskFailed set) workflowTaskFailureScope @@ -617,5 +656,20 @@ private void logExceptionDuringResultReporting( e); } } + + private void handleReportingFailure( + Exception e, + PollWorkflowTaskQueueResponse currentTask, + WorkflowTaskHandler.Result result, + WorkflowExecution workflowExecution, + Scope workflowTypeScope) { + logExceptionDuringResultReporting(e, currentTask, result); + // if we failed to report the workflow task completion back to the server, + // our cached version of the workflow may be more advanced than the server is aware of. + // We should discard this execution and perform a clean replay based on what server + // knows next time. + cache.invalidate( + workflowExecution, workflowTypeScope, "Failed result reporting to the server", e); + } } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/GrpcMessageTooLargeTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/GrpcMessageTooLargeTest.java new file mode 100644 index 0000000000..30c577ba0e --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/GrpcMessageTooLargeTest.java @@ -0,0 +1,105 @@ +package io.temporal.workflow; + +import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; + +import io.temporal.activity.ActivityOptions; +import io.temporal.api.enums.v1.EventType; +import io.temporal.api.enums.v1.WorkflowTaskFailedCause; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowFailedException; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowServiceException; +import io.temporal.failure.ApplicationFailure; +import io.temporal.failure.TimeoutFailure; +import io.temporal.internal.retryer.GrpcMessageTooLargeException; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.shared.TestActivities; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import java.util.List; +import org.junit.Rule; +import org.junit.Test; + +public class GrpcMessageTooLargeTest { + private static final String VERY_LARGE_DATA; + + static { + String argPiece = "Very Large Data "; + int argRepeats = 500_000; // circa 8MB, double the 4MB limit + StringBuilder argBuilder = new StringBuilder(argPiece.length() * argRepeats); + for (int i = 0; i < argRepeats; i++) { + argBuilder.append(argPiece); + } + VERY_LARGE_DATA = argBuilder.toString(); + } + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestWorkflowImpl.class) + .setActivityImplementations(new TestActivityImpl()) + .build(); + + @Test + public void workflowStartTooLarge() { + TestWorkflows.TestWorkflowStringArg workflow = + testWorkflowRule.newWorkflowStub(TestWorkflows.TestWorkflowStringArg.class); + WorkflowServiceException e = + assertThrows( + WorkflowServiceException.class, + () -> WorkflowClient.start(workflow::execute, VERY_LARGE_DATA)); + assertTrue(e.getCause() instanceof GrpcMessageTooLargeException); + } + + @Test + public void activityStartTooLarge() { + WorkflowOptions options = + WorkflowOptions.newBuilder() + .setWorkflowRunTimeout(Duration.ofSeconds(1)) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .build(); + TestWorkflows.TestWorkflowStringArg workflow = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub(TestWorkflows.TestWorkflowStringArg.class, options); + + WorkflowFailedException e = + assertThrows(WorkflowFailedException.class, () -> workflow.execute("")); + assertTrue(e.getCause() instanceof TimeoutFailure); + + List events = + testWorkflowRule.getHistoryEvents( + e.getExecution().getWorkflowId(), EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED); + assertFalse(events.isEmpty()); + for (HistoryEvent event : events) { + assertEquals( + WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE, + event.getWorkflowTaskFailedEventAttributes().getCause()); + } + } + + public static class TestWorkflowImpl implements TestWorkflows.TestWorkflowStringArg { + @Override + public void execute(String arg) { + Workflow.newActivityStub( + TestActivities.TestActivity1.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(1)) + .validateAndBuildWithDefaults()) + .execute(VERY_LARGE_DATA); + } + } + + public static class TestActivityImpl implements TestActivities.TestActivity1 { + @Override + public String execute(String arg) { + throw ApplicationFailure.newBuilder() + .setMessage("This activity should not start executing") + .setType("TestFailure") + .setNonRetryable(true) + .build(); + } + } +} diff --git a/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcMessageTooLargeException.java b/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcMessageTooLargeException.java new file mode 100644 index 0000000000..bf0476153d --- /dev/null +++ b/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcMessageTooLargeException.java @@ -0,0 +1,7 @@ +package io.temporal.internal.retryer; + +public class GrpcMessageTooLargeException extends RuntimeException { + public GrpcMessageTooLargeException(io.grpc.StatusRuntimeException cause) { + super(cause.getMessage(), cause); + } +} diff --git a/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcRetryerUtils.java b/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcRetryerUtils.java index 00c48c7acf..4cd6a8223c 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcRetryerUtils.java +++ b/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcRetryerUtils.java @@ -29,9 +29,9 @@ class GrpcRetryerUtils { @Nonnull StatusRuntimeException currentException, @Nonnull RpcRetryOptions options, GetSystemInfoResponse.Capabilities serverCapabilities) { - Status.Code code = currentException.getStatus().getCode(); + Status status = currentException.getStatus(); - switch (code) { + switch (status.getCode()) { // CANCELLED and DEADLINE_EXCEEDED usually considered non-retryable in GRPC world, for // example: // https://github.com/grpc-ecosystem/go-grpc-middleware/blob/master/retry/retry.go#L287 @@ -56,9 +56,22 @@ class GrpcRetryerUtils { // By default, we keep retrying with DEADLINE_EXCEEDED assuming that it's the deadline of // one attempt which expired, but not the whole sequence. break; + case RESOURCE_EXHAUSTED: + // Retry RESOURCE_EXHAUSTED unless the max message size was exceeded + if (status.getDescription() != null + && (status.getDescription().startsWith("grpc: received message larger than max") + || status + .getDescription() + .startsWith("grpc: message after decompression larger than max") + || status + .getDescription() + .startsWith("grpc: received message after decompression larger than max"))) { + return new GrpcMessageTooLargeException(currentException); + } + break; default: for (RpcRetryOptions.DoNotRetryItem pair : options.getDoNotRetry()) { - if (pair.getCode() == code + if (pair.getCode() == status.getCode() && (pair.getDetailsClass() == null || StatusUtils.hasFailure(currentException, pair.getDetailsClass()))) { return currentException; diff --git a/temporal-serviceclient/src/main/java/io/temporal/internal/testservice/GRPCServerHelper.java b/temporal-serviceclient/src/main/java/io/temporal/internal/testservice/GRPCServerHelper.java index bdc971e38f..2e5f987ae2 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/internal/testservice/GRPCServerHelper.java +++ b/temporal-serviceclient/src/main/java/io/temporal/internal/testservice/GRPCServerHelper.java @@ -1,24 +1,31 @@ package io.temporal.internal.testservice; -import io.grpc.BindableService; -import io.grpc.ServerBuilder; -import io.grpc.ServerServiceDefinition; +import io.grpc.*; import io.grpc.health.v1.HealthCheckResponse; import io.grpc.protobuf.services.HealthStatusManager; import java.util.Collection; +import java.util.Collections; +import java.util.List; // TODO move to temporal-testing or temporal-test-server modules after WorkflowServiceStubs cleanup public class GRPCServerHelper { public static void registerServicesAndHealthChecks( Collection services, ServerBuilder toServerBuilder) { + registerServicesAndHealthChecks(services, toServerBuilder, Collections.emptyList()); + } + + public static void registerServicesAndHealthChecks( + Collection services, + ServerBuilder toServerBuilder, + List interceptors) { HealthStatusManager healthStatusManager = new HealthStatusManager(); for (BindableService service : services) { - ServerServiceDefinition serverServiceDefinition = service.bindService(); - toServerBuilder.addService(serverServiceDefinition); + toServerBuilder.addService(ServerInterceptors.intercept(service.bindService(), interceptors)); healthStatusManager.setStatus( service.bindService().getServiceDescriptor().getName(), HealthCheckResponse.ServingStatus.SERVING); } - toServerBuilder.addService(healthStatusManager.getHealthService()); + toServerBuilder.addService( + ServerInterceptors.intercept(healthStatusManager.getHealthService(), interceptors)); } } diff --git a/temporal-serviceclient/src/main/java/io/temporal/internal/testservice/InProcessGRPCServer.java b/temporal-serviceclient/src/main/java/io/temporal/internal/testservice/InProcessGRPCServer.java index 07cf127d00..3728a26455 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/internal/testservice/InProcessGRPCServer.java +++ b/temporal-serviceclient/src/main/java/io/temporal/internal/testservice/InProcessGRPCServer.java @@ -1,13 +1,13 @@ package io.temporal.internal.testservice; -import io.grpc.BindableService; -import io.grpc.ManagedChannel; -import io.grpc.Server; +import com.google.protobuf.MessageLite; +import io.grpc.*; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -45,7 +45,8 @@ public InProcessGRPCServer(Collection services, boolean createC String serverName = InProcessServerBuilder.generateName(); try { InProcessServerBuilder inProcessServerBuilder = InProcessServerBuilder.forName(serverName); - GRPCServerHelper.registerServicesAndHealthChecks(services, inProcessServerBuilder); + GRPCServerHelper.registerServicesAndHealthChecks( + services, inProcessServerBuilder, Collections.singletonList(new MessageSizeChecker())); server = inProcessServerBuilder.build().start(); } catch (IOException unexpected) { throw new RuntimeException(unexpected); @@ -101,4 +102,67 @@ public Server getServer() { public ManagedChannel getChannel() { return channel; } + + /** + * This interceptor is needed for testing RESOURCE_EXHAUSTED error handling because in-process + * gRPC server doesn't check and cannot be configured to check message size. + */ + public static class MessageSizeChecker implements ServerInterceptor { + private final int maxMessageSize; + + public MessageSizeChecker() { + this(4 * 1024 * 1024); // matching gRPC's default 4MB + } + + public MessageSizeChecker(int maxMessageSize) { + this.maxMessageSize = maxMessageSize; + } + + @Override + public ServerCall.Listener interceptCall( + ServerCall call, Metadata headers, ServerCallHandler next) { + call.request(1); + return new Listener<>(call, headers, next); + } + + private class Listener extends ForwardingServerCallListener { + private final ServerCall call; + private final Metadata headers; + private final ServerCallHandler next; + private ServerCall.Listener delegate; + private boolean delegateSet; + + public Listener( + ServerCall call, Metadata headers, ServerCallHandler next) { + this.call = call; + this.headers = headers; + this.next = next; + delegate = new ServerCall.Listener() {}; + delegateSet = false; + } + + @Override + protected ServerCall.Listener delegate() { + return delegate; + } + + @Override + public void onMessage(ReqT message) { + int size = ((MessageLite) message).getSerializedSize(); + if (size > maxMessageSize) { + call.close( + Status.RESOURCE_EXHAUSTED.withDescription( + String.format( + "grpc: received message larger than max (%d vs. %d)", size, maxMessageSize)), + new Metadata()); + } else { + if (!delegateSet) { + delegateSet = true; + delegate = next.startCall(call, headers); + } + super.onMessage(message); + } + } + } + } } diff --git a/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java b/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java index c8ac464caf..46e71d7b1d 100644 --- a/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java +++ b/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java @@ -389,4 +389,49 @@ public void testResourceExhaustedFailure() throws InterruptedException { "We should retry RESOURCE_EXHAUSTED failures using congestionInitialInterval.", elapsedTime >= 2000); } + + @Test + public void testMessageLargerThanMaxFailureAsync() throws InterruptedException { + RpcRetryOptions options = + RpcRetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(1000)) + .setMaximumInterval(Duration.ofMillis(1000)) + .setMaximumJitterCoefficient(0) + .validateBuildWithDefaults(); + + for (String description : + new String[] { + "grpc: received message larger than max (2000 vs. 1000)", + "grpc: message after decompression larger than max (2000 vs. 1000)", + "grpc: received message after decompression larger than max (2000 vs. 1000)", + }) { + long start = System.currentTimeMillis(); + final AtomicInteger attempts = new AtomicInteger(); + ExecutionException e = + assertThrows( + ExecutionException.class, + () -> + new GrpcAsyncRetryer<>( + scheduledExecutor, + () -> { + if (attempts.incrementAndGet() > 1) + fail( + "We should not retry on RESOURCE_EXHAUSTED with description: " + + description); + CompletableFuture result = new CompletableFuture<>(); + result.completeExceptionally( + new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription(description))); + return result; + }, + new GrpcRetryer.GrpcRetryerOptions(options, null), + GetSystemInfoResponse.Capabilities.getDefaultInstance()) + .retry() + .get()); + assertTrue(e.getCause() instanceof StatusRuntimeException); + Status status = ((StatusRuntimeException) e.getCause()).getStatus(); + assertEquals(Status.Code.RESOURCE_EXHAUSTED, status.getCode()); + assertEquals(description, status.getDescription()); + } + } } diff --git a/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcSyncRetryerTest.java b/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcSyncRetryerTest.java index 0ad531625b..fe5abefd7e 100644 --- a/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcSyncRetryerTest.java +++ b/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcSyncRetryerTest.java @@ -324,4 +324,42 @@ public void testCongestionAndJitterAreNotMandatory() { assertEquals(CONGESTION_INITIAL_INTERVAL, options.getCongestionInitialInterval()); assertEquals(MAXIMUM_JITTER_COEFFICIENT, options.getMaximumJitterCoefficient(), 0.01); } + + @Test + public void testMessageLargerThanMaxFailure() { + RpcRetryOptions options = + RpcRetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(1000)) + .setMaximumInterval(Duration.ofMillis(1000)) + .setMaximumJitterCoefficient(0) + .validateBuildWithDefaults(); + + for (String description : + new String[] { + "grpc: received message larger than max (2000 vs. 1000)", + "grpc: message after decompression larger than max (2000 vs. 1000)", + "grpc: received message after decompression larger than max (2000 vs. 1000)", + }) { + long start = System.currentTimeMillis(); + final AtomicInteger attempts = new AtomicInteger(); + StatusRuntimeException e = + assertThrows( + StatusRuntimeException.class, + () -> + DEFAULT_SYNC_RETRYER.retry( + () -> { + if (attempts.incrementAndGet() > 1) { + fail( + "We should not retry on RESOURCE_EXHAUSTED with description: " + + description); + } + throw new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription(description)); + }, + new GrpcRetryer.GrpcRetryerOptions(options, null), + GetSystemInfoResponse.Capabilities.getDefaultInstance())); + assertEquals(Status.Code.RESOURCE_EXHAUSTED, e.getStatus().getCode()); + assertEquals(description, e.getStatus().getDescription()); + } + } } From 87af03666ad422cc387d218ba65aaadd34631ac3 Mon Sep 17 00:00:00 2001 From: Maciej Dudkowski Date: Tue, 22 Jul 2025 10:36:29 -0400 Subject: [PATCH 2/6] Fixed GrpcRetryer tests --- .../io/temporal/internal/retryer/GrpcAsyncRetryerTest.java | 6 ++---- .../io/temporal/internal/retryer/GrpcSyncRetryerTest.java | 7 +++---- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java b/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java index 46e71d7b1d..fb1acda084 100644 --- a/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java +++ b/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java @@ -428,10 +428,8 @@ public void testMessageLargerThanMaxFailureAsync() throws InterruptedException { GetSystemInfoResponse.Capabilities.getDefaultInstance()) .retry() .get()); - assertTrue(e.getCause() instanceof StatusRuntimeException); - Status status = ((StatusRuntimeException) e.getCause()).getStatus(); - assertEquals(Status.Code.RESOURCE_EXHAUSTED, status.getCode()); - assertEquals(description, status.getDescription()); + assertTrue(e.getCause() instanceof GrpcMessageTooLargeException); + assertEquals(Status.Code.RESOURCE_EXHAUSTED + ": " + description, e.getMessage()); } } } diff --git a/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcSyncRetryerTest.java b/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcSyncRetryerTest.java index fe5abefd7e..2d43a84b45 100644 --- a/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcSyncRetryerTest.java +++ b/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcSyncRetryerTest.java @@ -342,9 +342,9 @@ public void testMessageLargerThanMaxFailure() { }) { long start = System.currentTimeMillis(); final AtomicInteger attempts = new AtomicInteger(); - StatusRuntimeException e = + GrpcMessageTooLargeException e = assertThrows( - StatusRuntimeException.class, + GrpcMessageTooLargeException.class, () -> DEFAULT_SYNC_RETRYER.retry( () -> { @@ -358,8 +358,7 @@ public void testMessageLargerThanMaxFailure() { }, new GrpcRetryer.GrpcRetryerOptions(options, null), GetSystemInfoResponse.Capabilities.getDefaultInstance())); - assertEquals(Status.Code.RESOURCE_EXHAUSTED, e.getStatus().getCode()); - assertEquals(description, e.getStatus().getDescription()); + assertEquals(Status.Code.RESOURCE_EXHAUSTED + ": " + description, e.getMessage()); } } } From a4e0fee3c5269f0135259fcd9cfb8a5c59909c18 Mon Sep 17 00:00:00 2001 From: Maciej Dudkowski Date: Tue, 22 Jul 2025 13:10:12 -0400 Subject: [PATCH 3/6] Fixed GrpcAsyncRetryerTest failure --- .../java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java b/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java index fb1acda084..1bbea4057c 100644 --- a/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java +++ b/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java @@ -429,7 +429,7 @@ public void testMessageLargerThanMaxFailureAsync() throws InterruptedException { .retry() .get()); assertTrue(e.getCause() instanceof GrpcMessageTooLargeException); - assertEquals(Status.Code.RESOURCE_EXHAUSTED + ": " + description, e.getMessage()); + assertEquals(Status.Code.RESOURCE_EXHAUSTED + ": " + description, e.getCause().getMessage()); } } } From 44b1e900f8b029eae0fd94cd660c08b96a8c516b Mon Sep 17 00:00:00 2001 From: Maciej Dudkowski Date: Tue, 22 Jul 2025 14:35:24 -0400 Subject: [PATCH 4/6] Rebased with master --- .../src/main/java/io/temporal/failure/ApplicationFailure.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/failure/ApplicationFailure.java b/temporal-sdk/src/main/java/io/temporal/failure/ApplicationFailure.java index 0aba428138..91d496bf49 100644 --- a/temporal-sdk/src/main/java/io/temporal/failure/ApplicationFailure.java +++ b/temporal-sdk/src/main/java/io/temporal/failure/ApplicationFailure.java @@ -236,9 +236,7 @@ public static final class Builder { private Duration nextRetryDelay; private ApplicationErrorCategory category; - private Builder() { - category = ApplicationErrorCategory.UNSPECIFIED; - } + private Builder() {} private Builder(ApplicationFailure options) { if (options == null) { From 6444df37100e16f36f04887e4bec7a366e05b051 Mon Sep 17 00:00:00 2001 From: Maciej Dudkowski Date: Mon, 28 Jul 2025 12:39:28 -0400 Subject: [PATCH 5/6] Handling message too large error when sending query response. Various other changes. --- .../internal/worker/WorkflowWorker.java | 73 +++++--- .../io/temporal/testUtils/LoggerUtils.java | 41 +++++ .../workflow/GrpcMessageTooLargeTest.java | 169 ++++++++++++++---- .../retryer/GrpcMessageTooLargeException.java | 33 +++- .../internal/retryer/GrpcRetryerUtils.java | 12 +- .../retryer/GrpcAsyncRetryerTest.java | 2 +- .../internal/retryer/GrpcSyncRetryerTest.java | 2 +- 7 files changed, 268 insertions(+), 64 deletions(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/testUtils/LoggerUtils.java diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java index c1bd7ce3c2..8b2e3e1384 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java @@ -9,7 +9,9 @@ import com.uber.m3.tally.Scope; import com.uber.m3.tally.Stopwatch; import com.uber.m3.util.ImmutableMap; +import io.grpc.StatusRuntimeException; import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.QueryResultType; import io.temporal.api.enums.v1.TaskQueueKind; import io.temporal.api.enums.v1.WorkflowTaskFailedCause; import io.temporal.api.failure.v1.Failure; @@ -405,8 +407,30 @@ public void handle(WorkflowTask task) throws Exception { RespondQueryTaskCompletedRequest queryCompleted = result.getQueryCompleted(); if (queryCompleted != null) { - sendDirectQueryCompletedResponse( - currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope); + try { + sendDirectQueryCompletedResponse( + currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope); + } catch (StatusRuntimeException e) { + GrpcMessageTooLargeException tooLargeException = + GrpcMessageTooLargeException.tryWrap(e); + if (tooLargeException == null) { + throw e; + } + Failure failure = + grpcMessageTooLargeFailure( + workflowExecution.getWorkflowId(), + tooLargeException, + "Failed to send query response"); + RespondQueryTaskCompletedRequest.Builder queryFailedBuilder = + RespondQueryTaskCompletedRequest.newBuilder() + .setTaskToken(currentTask.getTaskToken()) + .setNamespace(namespace) + .setCompletedType(QueryResultType.QUERY_RESULT_TYPE_FAILED) + .setErrorMessage(failure.getMessage()) + .setFailure(failure); + sendDirectQueryCompletedResponse( + currentTask.getTaskToken(), queryFailedBuilder, workflowTypeScope); + } } else { try { if (taskCompleted != null) { @@ -443,33 +467,28 @@ public void handle(WorkflowTask task) throws Exception { workflowTypeScope); } } catch (GrpcMessageTooLargeException e) { + // Only fail workflow task on the first attempt, subsequent failures of the same + // workflow task should timeout. + if (currentTask.getAttempt() > 1) { + throw e; + } + releaseReason = SlotReleaseReason.error(e); handleReportingFailure( e, currentTask, result, workflowExecution, workflowTypeScope); - // replacing failure cause for metrics purposes + // setting/replacing failure cause for metrics purposes taskFailedCause = WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE; - String message = + String messagePrefix = String.format( - "Failed to send workflow task %s: %s", - taskFailed == null ? "completion" : "failure", e.getMessage()); - ApplicationFailure applicationFailure = - ApplicationFailure.newBuilder() - .setMessage(message) - .setType("GrpcMessageTooLargeException") - .setNonRetryable(true) - .build(); - Failure failure = - options - .getDataConverter() - .withContext( - new WorkflowSerializationContext( - namespace, workflowExecution.getWorkflowId())) - .exceptionToFailure(applicationFailure); + "Failed to send workflow task %s", + taskFailed == null ? "completion" : "failure"); RespondWorkflowTaskFailedRequest.Builder taskFailedBuilder = RespondWorkflowTaskFailedRequest.newBuilder() - .setFailure(failure) + .setFailure( + grpcMessageTooLargeFailure( + workflowExecution.getWorkflowId(), e, messagePrefix)) .setCause( WorkflowTaskFailedCause .WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE); @@ -671,5 +690,19 @@ private void handleReportingFailure( cache.invalidate( workflowExecution, workflowTypeScope, "Failed result reporting to the server", e); } + + private Failure grpcMessageTooLargeFailure( + String workflowId, GrpcMessageTooLargeException e, String messagePrefix) { + ApplicationFailure applicationFailure = + ApplicationFailure.newBuilder() + .setMessage(messagePrefix + ": " + e.getMessage()) + .setType(GrpcMessageTooLargeException.class.getSimpleName()) + .build(); + applicationFailure.setStackTrace(new StackTraceElement[0]); // don't serialize stack trace + return options + .getDataConverter() + .withContext(new WorkflowSerializationContext(namespace, workflowId)) + .exceptionToFailure(applicationFailure); + } } } diff --git a/temporal-sdk/src/test/java/io/temporal/testUtils/LoggerUtils.java b/temporal-sdk/src/test/java/io/temporal/testUtils/LoggerUtils.java new file mode 100644 index 0000000000..95afa7f13b --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/testUtils/LoggerUtils.java @@ -0,0 +1,41 @@ +package io.temporal.testUtils; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.slf4j.LoggerFactory; + +public class LoggerUtils { + public static SilenceLoggers silenceLoggers(Class... classes) { + return new SilenceLoggers(classes); + } + + public static class SilenceLoggers implements AutoCloseable { + private final List loggers; + List oldLogLevels; + + public SilenceLoggers(Class... classes) { + loggers = + Arrays.stream(classes) + .map(LoggerFactory::getLogger) + .filter(Logger.class::isInstance) + .map(Logger.class::cast) + .collect(Collectors.toList()); + oldLogLevels = new ArrayList<>(); + for (Logger logger : loggers) { + oldLogLevels.add(logger.getLevel()); + logger.setLevel(Level.OFF); + } + } + + @Override + public void close() { + for (int i = 0; i < loggers.size(); i++) { + loggers.get(i).setLevel(oldLogLevels.get(i)); + } + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/GrpcMessageTooLargeTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/GrpcMessageTooLargeTest.java index 30c577ba0e..09de219aae 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/GrpcMessageTooLargeTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/GrpcMessageTooLargeTest.java @@ -7,22 +7,23 @@ import io.temporal.api.enums.v1.EventType; import io.temporal.api.enums.v1.WorkflowTaskFailedCause; import io.temporal.api.history.v1.HistoryEvent; -import io.temporal.client.WorkflowClient; -import io.temporal.client.WorkflowFailedException; -import io.temporal.client.WorkflowOptions; -import io.temporal.client.WorkflowServiceException; +import io.temporal.client.*; import io.temporal.failure.ApplicationFailure; import io.temporal.failure.TimeoutFailure; +import io.temporal.internal.replay.ReplayWorkflowTaskHandler; import io.temporal.internal.retryer.GrpcMessageTooLargeException; +import io.temporal.internal.worker.PollerOptions; +import io.temporal.testUtils.LoggerUtils; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.shared.TestActivities; -import io.temporal.workflow.shared.TestWorkflows; import java.time.Duration; import java.util.List; import org.junit.Rule; import org.junit.Test; public class GrpcMessageTooLargeTest { + private static final String QUERY_ERROR_MESSAGE = + "Failed to send query response: RESOURCE_EXHAUSTED: grpc: received message larger than max"; private static final String VERY_LARGE_DATA; static { @@ -36,16 +37,27 @@ public class GrpcMessageTooLargeTest { } @Rule - public SDKTestWorkflowRule testWorkflowRule = + public SDKTestWorkflowRule activityStartWorkflowRule = SDKTestWorkflowRule.newBuilder() - .setWorkflowTypes(TestWorkflowImpl.class) + .setWorkflowTypes(ActivityStartWorkflowImpl.class) .setActivityImplementations(new TestActivityImpl()) .build(); + @Rule + public SDKTestWorkflowRule failureWorkflowRule = + SDKTestWorkflowRule.newBuilder().setWorkflowTypes(FailureWorkflowImpl.class).build(); + + @Rule + public SDKTestWorkflowRule querySuccessWorkflowRule = + SDKTestWorkflowRule.newBuilder().setWorkflowTypes(QuerySuccessWorkflowImpl.class).build(); + + @Rule + public SDKTestWorkflowRule queryFailureWorkflowRule = + SDKTestWorkflowRule.newBuilder().setWorkflowTypes(QueryFailureWorkflowImpl.class).build(); + @Test public void workflowStartTooLarge() { - TestWorkflows.TestWorkflowStringArg workflow = - testWorkflowRule.newWorkflowStub(TestWorkflows.TestWorkflowStringArg.class); + TestWorkflow workflow = createWorkflowStub(TestWorkflow.class, activityStartWorkflowRule); WorkflowServiceException e = assertThrows( WorkflowServiceException.class, @@ -55,40 +67,137 @@ public void workflowStartTooLarge() { @Test public void activityStartTooLarge() { - WorkflowOptions options = - WorkflowOptions.newBuilder() - .setWorkflowRunTimeout(Duration.ofSeconds(1)) - .setTaskQueue(testWorkflowRule.getTaskQueue()) - .build(); - TestWorkflows.TestWorkflowStringArg workflow = - testWorkflowRule - .getWorkflowClient() - .newWorkflowStub(TestWorkflows.TestWorkflowStringArg.class, options); + TestWorkflow workflow = createWorkflowStub(TestWorkflow.class, activityStartWorkflowRule); WorkflowFailedException e = assertThrows(WorkflowFailedException.class, () -> workflow.execute("")); assertTrue(e.getCause() instanceof TimeoutFailure); + String workflowId = WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId(); + assertTrue( + activityStartWorkflowRule + .getHistoryEvents(workflowId, EventType.EVENT_TYPE_ACTIVITY_TASK_FAILED) + .isEmpty()); List events = - testWorkflowRule.getHistoryEvents( - e.getExecution().getWorkflowId(), EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED); - assertFalse(events.isEmpty()); - for (HistoryEvent event : events) { + activityStartWorkflowRule.getHistoryEvents( + workflowId, EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED); + assertEquals(1, events.size()); + assertEquals( + WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE, + events.get(0).getWorkflowTaskFailedEventAttributes().getCause()); + } + + @Test + public void workflowFailureTooLarge() { + // Avoding logging exception with very large data + try (LoggerUtils.SilenceLoggers sl = + LoggerUtils.silenceLoggers(ReplayWorkflowTaskHandler.class, PollerOptions.class)) { + TestWorkflow workflow = createWorkflowStub(TestWorkflow.class, failureWorkflowRule); + + WorkflowFailedException e = + assertThrows(WorkflowFailedException.class, () -> workflow.execute("")); + + assertTrue(e.getCause() instanceof TimeoutFailure); + String workflowId = WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId(); + List events = + failureWorkflowRule.getHistoryEvents( + workflowId, EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED); + assertEquals(1, events.size()); assertEquals( WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE, - event.getWorkflowTaskFailedEventAttributes().getCause()); + events.get(0).getWorkflowTaskFailedEventAttributes().getCause()); + } + } + + @Test + public void queryResultTooLarge() { + TestWorkflowWithQuery workflow = + createWorkflowStub(TestWorkflowWithQuery.class, querySuccessWorkflowRule); + workflow.execute(); + + WorkflowQueryException e = assertThrows(WorkflowQueryException.class, workflow::query); + + assertNotNull(e.getCause()); + // The exception will not contain the original failure object, so instead of type check we're + // checking the message to ensure the correct error is being sent. + assertTrue(e.getCause().getMessage().contains(QUERY_ERROR_MESSAGE)); + } + + @Test + public void queryErrorTooLarge() { + TestWorkflowWithQuery workflow = + createWorkflowStub(TestWorkflowWithQuery.class, queryFailureWorkflowRule); + workflow.execute(); + + WorkflowQueryException e = assertThrows(WorkflowQueryException.class, workflow::query); + + assertNotNull(e.getCause()); + assertTrue(e.getCause().getMessage().contains(QUERY_ERROR_MESSAGE)); + } + + private static T createWorkflowStub(Class clazz, SDKTestWorkflowRule workflowRule) { + WorkflowOptions options = + WorkflowOptions.newBuilder() + .setWorkflowRunTimeout(Duration.ofSeconds(1)) + .setWorkflowTaskTimeout(Duration.ofMillis(250)) + .setTaskQueue(workflowRule.getTaskQueue()) + .build(); + return workflowRule.getWorkflowClient().newWorkflowStub(clazz, options); + } + + @WorkflowInterface + public interface TestWorkflow { + @WorkflowMethod + void execute(String arg); + } + + @WorkflowInterface + public interface TestWorkflowWithQuery { + @WorkflowMethod + void execute(); + + @QueryMethod + String query(); + } + + public static class ActivityStartWorkflowImpl implements TestWorkflow { + private final TestActivities.TestActivity1 activity = + Workflow.newActivityStub( + TestActivities.TestActivity1.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(1)) + .validateAndBuildWithDefaults()); + + @Override + public void execute(String arg) { + activity.execute(VERY_LARGE_DATA); } } - public static class TestWorkflowImpl implements TestWorkflows.TestWorkflowStringArg { + public static class FailureWorkflowImpl implements TestWorkflow { @Override public void execute(String arg) { - Workflow.newActivityStub( - TestActivities.TestActivity1.class, - ActivityOptions.newBuilder() - .setStartToCloseTimeout(Duration.ofSeconds(1)) - .validateAndBuildWithDefaults()) - .execute(VERY_LARGE_DATA); + throw new RuntimeException(VERY_LARGE_DATA); + } + } + + public static class QuerySuccessWorkflowImpl implements TestWorkflowWithQuery { + @Override + public void execute() {} + + @Override + public String query() { + return VERY_LARGE_DATA; + } + } + + public static class QueryFailureWorkflowImpl implements TestWorkflowWithQuery { + @Override + public void execute() {} + + @Override + public String query() { + throw new RuntimeException(VERY_LARGE_DATA); } } diff --git a/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcMessageTooLargeException.java b/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcMessageTooLargeException.java index bf0476153d..efe3c0c076 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcMessageTooLargeException.java +++ b/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcMessageTooLargeException.java @@ -1,7 +1,34 @@ package io.temporal.internal.retryer; -public class GrpcMessageTooLargeException extends RuntimeException { - public GrpcMessageTooLargeException(io.grpc.StatusRuntimeException cause) { - super(cause.getMessage(), cause); +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import javax.annotation.Nullable; + +/** + * Internal exception used to mark when StatusRuntimeException is caused by message being too large. + * Exceptions are only wrapped if {@link GrpcRetryer} was used, which is an implementation detail + * and not always the case - user code should catch {@link StatusRuntimeException}. + */ +public class GrpcMessageTooLargeException extends StatusRuntimeException { + private GrpcMessageTooLargeException(Status status, @Nullable Metadata trailers) { + super(status, trailers); + } + + public static @Nullable GrpcMessageTooLargeException tryWrap(StatusRuntimeException exception) { + Status status = exception.getStatus(); + if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED + && status.getDescription() != null + && (status.getDescription().startsWith("grpc: received message larger than max") + || status + .getDescription() + .startsWith("grpc: message after decompression larger than max") + || status + .getDescription() + .startsWith("grpc: received message after decompression larger than max"))) { + return new GrpcMessageTooLargeException(status.withCause(exception), exception.getTrailers()); + } else { + return null; + } } } diff --git a/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcRetryerUtils.java b/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcRetryerUtils.java index 4cd6a8223c..695210555e 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcRetryerUtils.java +++ b/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcRetryerUtils.java @@ -58,15 +58,9 @@ class GrpcRetryerUtils { break; case RESOURCE_EXHAUSTED: // Retry RESOURCE_EXHAUSTED unless the max message size was exceeded - if (status.getDescription() != null - && (status.getDescription().startsWith("grpc: received message larger than max") - || status - .getDescription() - .startsWith("grpc: message after decompression larger than max") - || status - .getDescription() - .startsWith("grpc: received message after decompression larger than max"))) { - return new GrpcMessageTooLargeException(currentException); + GrpcMessageTooLargeException e = GrpcMessageTooLargeException.tryWrap(currentException); + if (e != null) { + return e; } break; default: diff --git a/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java b/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java index 1bbea4057c..bd7f780f4f 100644 --- a/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java +++ b/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcAsyncRetryerTest.java @@ -405,7 +405,6 @@ public void testMessageLargerThanMaxFailureAsync() throws InterruptedException { "grpc: message after decompression larger than max (2000 vs. 1000)", "grpc: received message after decompression larger than max (2000 vs. 1000)", }) { - long start = System.currentTimeMillis(); final AtomicInteger attempts = new AtomicInteger(); ExecutionException e = assertThrows( @@ -430,6 +429,7 @@ public void testMessageLargerThanMaxFailureAsync() throws InterruptedException { .get()); assertTrue(e.getCause() instanceof GrpcMessageTooLargeException); assertEquals(Status.Code.RESOURCE_EXHAUSTED + ": " + description, e.getCause().getMessage()); + assertTrue(e.getCause().getCause() instanceof StatusRuntimeException); } } } diff --git a/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcSyncRetryerTest.java b/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcSyncRetryerTest.java index 2d43a84b45..a797a4148d 100644 --- a/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcSyncRetryerTest.java +++ b/temporal-serviceclient/src/test/java/io/temporal/internal/retryer/GrpcSyncRetryerTest.java @@ -340,7 +340,6 @@ public void testMessageLargerThanMaxFailure() { "grpc: message after decompression larger than max (2000 vs. 1000)", "grpc: received message after decompression larger than max (2000 vs. 1000)", }) { - long start = System.currentTimeMillis(); final AtomicInteger attempts = new AtomicInteger(); GrpcMessageTooLargeException e = assertThrows( @@ -359,6 +358,7 @@ public void testMessageLargerThanMaxFailure() { new GrpcRetryer.GrpcRetryerOptions(options, null), GetSystemInfoResponse.Capabilities.getDefaultInstance())); assertEquals(Status.Code.RESOURCE_EXHAUSTED + ": " + description, e.getMessage()); + assertTrue(e.getCause() instanceof StatusRuntimeException); } } } From 340154a122fd2c485c0d9b51163eda5f28d75cd4 Mon Sep 17 00:00:00 2001 From: Maciej Dudkowski Date: Tue, 29 Jul 2025 09:34:25 -0400 Subject: [PATCH 6/6] Reverted minor change in GrpcRetryerUtils that's no longer needed. --- .../java/io/temporal/internal/retryer/GrpcRetryerUtils.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcRetryerUtils.java b/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcRetryerUtils.java index 695210555e..12b135bb0d 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcRetryerUtils.java +++ b/temporal-serviceclient/src/main/java/io/temporal/internal/retryer/GrpcRetryerUtils.java @@ -29,9 +29,9 @@ class GrpcRetryerUtils { @Nonnull StatusRuntimeException currentException, @Nonnull RpcRetryOptions options, GetSystemInfoResponse.Capabilities serverCapabilities) { - Status status = currentException.getStatus(); + Status.Code code = currentException.getStatus().getCode(); - switch (status.getCode()) { + switch (code) { // CANCELLED and DEADLINE_EXCEEDED usually considered non-retryable in GRPC world, for // example: // https://github.com/grpc-ecosystem/go-grpc-middleware/blob/master/retry/retry.go#L287 @@ -65,7 +65,7 @@ class GrpcRetryerUtils { break; default: for (RpcRetryOptions.DoNotRetryItem pair : options.getDoNotRetry()) { - if (pair.getCode() == status.getCode() + if (pair.getCode() == code && (pair.getDetailsClass() == null || StatusUtils.hasFailure(currentException, pair.getDetailsClass()))) { return currentException;