Skip to content

Do not auto-retry gRPC-message-size-too-large errors #2604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,18 @@
import com.uber.m3.tally.Scope;
import com.uber.m3.tally.Stopwatch;
import com.uber.m3.util.ImmutableMap;
import io.grpc.StatusRuntimeException;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.QueryResultType;
import io.temporal.api.enums.v1.TaskQueueKind;
import io.temporal.api.enums.v1.WorkflowTaskFailedCause;
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.workflowservice.v1.*;
import io.temporal.failure.ApplicationFailure;
import io.temporal.internal.logging.LoggerTag;
import io.temporal.internal.retryer.GrpcMessageTooLargeException;
import io.temporal.internal.retryer.GrpcRetryer;
import io.temporal.payload.context.WorkflowSerializationContext;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.RpcRetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
Expand Down Expand Up @@ -394,73 +400,125 @@ public void handle(WorkflowTask task) throws Exception {
PollWorkflowTaskQueueResponse currentTask = nextWFTResponse.get();
nextWFTResponse = Optional.empty();
WorkflowTaskHandler.Result result = handleTask(currentTask, workflowTypeScope);
WorkflowTaskFailedCause taskFailedCause = null;
try {
RespondWorkflowTaskCompletedRequest taskCompleted = result.getTaskCompleted();
RespondWorkflowTaskFailedRequest taskFailed = result.getTaskFailed();
RespondQueryTaskCompletedRequest queryCompleted = result.getQueryCompleted();

if (taskCompleted != null) {
RespondWorkflowTaskCompletedRequest.Builder requestBuilder =
taskCompleted.toBuilder();
try (EagerActivitySlotsReservation activitySlotsReservation =
new EagerActivitySlotsReservation(eagerActivityDispatcher)) {
activitySlotsReservation.applyToRequest(requestBuilder);
RespondWorkflowTaskCompletedResponse response =
sendTaskCompleted(
currentTask.getTaskToken(),
requestBuilder,
result.getRequestRetryOptions(),
workflowTypeScope);
// If we were processing a speculative WFT the server may instruct us that the task
// was dropped by resting out event ID.
long resetEventId = response.getResetHistoryEventId();
if (resetEventId != 0) {
result.getResetEventIdHandle().apply(resetEventId);
if (queryCompleted != null) {
try {
sendDirectQueryCompletedResponse(
currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope);
} catch (StatusRuntimeException e) {
GrpcMessageTooLargeException tooLargeException =
GrpcMessageTooLargeException.tryWrap(e);
if (tooLargeException == null) {
throw e;
}
nextWFTResponse =
response.hasWorkflowTask()
? Optional.of(response.getWorkflowTask())
: Optional.empty();
// TODO we don't have to do this under the runId lock
activitySlotsReservation.handleResponse(response);
Failure failure =
grpcMessageTooLargeFailure(
workflowExecution.getWorkflowId(),
tooLargeException,
"Failed to send query response");
RespondQueryTaskCompletedRequest.Builder queryFailedBuilder =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed this in the initial review, we should be failing the workflow task here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked offline, In Java we seem to always use RespondQueryTaskCompletedRequest for queries, even if there was something that would normally fail the workflow task.

RespondQueryTaskCompletedRequest.newBuilder()
.setTaskToken(currentTask.getTaskToken())
.setNamespace(namespace)
.setCompletedType(QueryResultType.QUERY_RESULT_TYPE_FAILED)
.setErrorMessage(failure.getMessage())
.setFailure(failure);
sendDirectQueryCompletedResponse(
currentTask.getTaskToken(), queryFailedBuilder, workflowTypeScope);
}
} else {
try {
if (taskCompleted != null) {
RespondWorkflowTaskCompletedRequest.Builder requestBuilder =
taskCompleted.toBuilder();
try (EagerActivitySlotsReservation activitySlotsReservation =
new EagerActivitySlotsReservation(eagerActivityDispatcher)) {
activitySlotsReservation.applyToRequest(requestBuilder);
RespondWorkflowTaskCompletedResponse response =
sendTaskCompleted(
currentTask.getTaskToken(),
requestBuilder,
result.getRequestRetryOptions(),
workflowTypeScope);
// If we were processing a speculative WFT the server may instruct us that the
// task was dropped by resting out event ID.
long resetEventId = response.getResetHistoryEventId();
if (resetEventId != 0) {
result.getResetEventIdHandle().apply(resetEventId);
}
nextWFTResponse =
response.hasWorkflowTask()
? Optional.of(response.getWorkflowTask())
: Optional.empty();
// TODO we don't have to do this under the runId lock
activitySlotsReservation.handleResponse(response);
}
} else if (taskFailed != null) {
taskFailedCause = taskFailed.getCause();
sendTaskFailed(
currentTask.getTaskToken(),
taskFailed.toBuilder(),
result.getRequestRetryOptions(),
workflowTypeScope);
}
} catch (GrpcMessageTooLargeException e) {
// Only fail workflow task on the first attempt, subsequent failures of the same
// workflow task should timeout.
if (currentTask.getAttempt() > 1) {
throw e;
}

releaseReason = SlotReleaseReason.error(e);
handleReportingFailure(
e, currentTask, result, workflowExecution, workflowTypeScope);
// setting/replacing failure cause for metrics purposes
taskFailedCause =
WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE;

String messagePrefix =
String.format(
"Failed to send workflow task %s",
taskFailed == null ? "completion" : "failure");
RespondWorkflowTaskFailedRequest.Builder taskFailedBuilder =
RespondWorkflowTaskFailedRequest.newBuilder()
.setFailure(
grpcMessageTooLargeFailure(
workflowExecution.getWorkflowId(), e, messagePrefix))
.setCause(
WorkflowTaskFailedCause
.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE);
sendTaskFailed(
currentTask.getTaskToken(),
taskFailedBuilder,
result.getRequestRetryOptions(),
workflowTypeScope);
}
} else if (taskFailed != null) {
sendTaskFailed(
currentTask.getTaskToken(),
taskFailed.toBuilder(),
result.getRequestRetryOptions(),
workflowTypeScope);
} else if (queryCompleted != null) {
sendDirectQueryCompletedResponse(
currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope);
}
} catch (Exception e) {
logExceptionDuringResultReporting(e, currentTask, result);
releaseReason = SlotReleaseReason.error(e);
// if we failed to report the workflow task completion back to the server,
// our cached version of the workflow may be more advanced than the server is aware of.
// We should discard this execution and perform a clean replay based on what server
// knows next time.
cache.invalidate(
workflowExecution, workflowTypeScope, "Failed result reporting to the server", e);
handleReportingFailure(e, currentTask, result, workflowExecution, workflowTypeScope);
throw e;
}

if (result.getTaskFailed() != null) {
Scope workflowTaskFailureScope = workflowTypeScope;
if (result
.getTaskFailed()
.getCause()
.equals(
WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR)) {
workflowTaskFailureScope =
workflowTaskFailureScope.tagged(
ImmutableMap.of(TASK_FAILURE_TYPE, "NonDeterminismError"));
} else {
workflowTaskFailureScope =
workflowTaskFailureScope.tagged(
ImmutableMap.of(TASK_FAILURE_TYPE, "WorkflowError"));
if (taskFailedCause != null) {
String taskFailureType;
switch (taskFailedCause) {
case WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR:
taskFailureType = "NonDeterminismError";
break;
case WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE:
taskFailureType = "GrpcMessageTooLarge";
break;
default:
taskFailureType = "WorkflowError";
}
Scope workflowTaskFailureScope =
workflowTypeScope.tagged(ImmutableMap.of(TASK_FAILURE_TYPE, taskFailureType));
// we don't trigger the counter in case of the legacy query
// (which never has taskFailed set)
workflowTaskFailureScope
Expand Down Expand Up @@ -617,5 +675,34 @@ private void logExceptionDuringResultReporting(
e);
}
}

private void handleReportingFailure(
Exception e,
PollWorkflowTaskQueueResponse currentTask,
WorkflowTaskHandler.Result result,
WorkflowExecution workflowExecution,
Scope workflowTypeScope) {
logExceptionDuringResultReporting(e, currentTask, result);
// if we failed to report the workflow task completion back to the server,
// our cached version of the workflow may be more advanced than the server is aware of.
// We should discard this execution and perform a clean replay based on what server
// knows next time.
cache.invalidate(
workflowExecution, workflowTypeScope, "Failed result reporting to the server", e);
}

private Failure grpcMessageTooLargeFailure(
String workflowId, GrpcMessageTooLargeException e, String messagePrefix) {
ApplicationFailure applicationFailure =
ApplicationFailure.newBuilder()
.setMessage(messagePrefix + ": " + e.getMessage())
.setType(GrpcMessageTooLargeException.class.getSimpleName())
.build();
applicationFailure.setStackTrace(new StackTraceElement[0]); // don't serialize stack trace
return options
.getDataConverter()
.withContext(new WorkflowSerializationContext(namespace, workflowId))
.exceptionToFailure(applicationFailure);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.temporal.testUtils;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.LoggerFactory;

public class LoggerUtils {
public static SilenceLoggers silenceLoggers(Class<?>... classes) {
return new SilenceLoggers(classes);
}

public static class SilenceLoggers implements AutoCloseable {
private final List<Logger> loggers;
List<Level> oldLogLevels;

public SilenceLoggers(Class<?>... classes) {
loggers =
Arrays.stream(classes)
.map(LoggerFactory::getLogger)
.filter(Logger.class::isInstance)
.map(Logger.class::cast)
.collect(Collectors.toList());
oldLogLevels = new ArrayList<>();
for (Logger logger : loggers) {
oldLogLevels.add(logger.getLevel());
logger.setLevel(Level.OFF);
}
}

@Override
public void close() {
for (int i = 0; i < loggers.size(); i++) {
loggers.get(i).setLevel(oldLogLevels.get(i));
}
}
}
}
Loading
Loading