diff --git a/temporal-sdk/src/main/java/io/temporal/client/ActivityPausedException.java b/temporal-sdk/src/main/java/io/temporal/client/ActivityPausedException.java index a8b2b72ba4..c3c3839d1f 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/ActivityPausedException.java +++ b/temporal-sdk/src/main/java/io/temporal/client/ActivityPausedException.java @@ -1,12 +1,14 @@ package io.temporal.client; import io.temporal.activity.ActivityInfo; +import io.temporal.common.Experimental; /*** * Indicates that the activity was paused by the user. * *

Catching this exception directly is discouraged and catching the parent class {@link ActivityCompletionException} is recommended instead.
*/ +@Experimental public final class ActivityPausedException extends ActivityCompletionException { public ActivityPausedException(ActivityInfo info) { super(info); diff --git a/temporal-sdk/src/main/java/io/temporal/client/ActivityResetException.java b/temporal-sdk/src/main/java/io/temporal/client/ActivityResetException.java new file mode 100644 index 0000000000..c2c51037ca --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/ActivityResetException.java @@ -0,0 +1,20 @@ +package io.temporal.client; + +import io.temporal.activity.ActivityInfo; +import io.temporal.common.Experimental; + +/*** + * Indicates that the activity attempt was reset by the user. + * + *

Catching this exception directly is discouraged and catching the parent class {@link ActivityCompletionException} is recommended instead.
+ */ +@Experimental +public final class ActivityResetException extends ActivityCompletionException { + public ActivityResetException(ActivityInfo info) { + super(info); + } + + public ActivityResetException() { + super(); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java index add22280fa..0ad49f9485 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java @@ -226,6 +226,8 @@ private void sendHeartbeatRequest(Object details) { metricsScope); if (status.getCancelRequested()) { lastException = new ActivityCanceledException(info); + } else if (status.getActivityReset()) { + lastException = new ActivityResetException(info); } else if (status.getActivityPaused()) { lastException = new ActivityPausedException(info); } else { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/external/ManualActivityCompletionClientImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/client/external/ManualActivityCompletionClientImpl.java index f48589c6f1..0e68b107b5 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/external/ManualActivityCompletionClientImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/external/ManualActivityCompletionClientImpl.java @@ -11,9 +11,7 @@ import io.temporal.api.common.v1.Payloads; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.workflowservice.v1.*; -import io.temporal.client.ActivityCanceledException; -import io.temporal.client.ActivityCompletionFailureException; -import io.temporal.client.ActivityNotExistsException; +import io.temporal.client.*; import io.temporal.common.converter.DataConverter; import io.temporal.failure.CanceledFailure; import io.temporal.internal.client.ActivityClientHelper; @@ -190,6 +188,10 @@ public void recordHeartbeat(@Nullable Object details) throws CanceledFailure { metricsScope); if (status.getCancelRequested()) { throw new ActivityCanceledException(); + } else if (status.getActivityReset()) { + throw new ActivityResetException(); + } else if (status.getActivityPaused()) { + throw new ActivityPausedException(); } } else { RecordActivityTaskHeartbeatByIdResponse status = @@ -203,6 +205,10 @@ public void recordHeartbeat(@Nullable Object details) throws CanceledFailure { metricsScope); if (status.getCancelRequested()) { throw new ActivityCanceledException(); + } else if (status.getActivityReset()) { + throw new ActivityResetException(); + } else if (status.getActivityPaused()) { + throw new ActivityPausedException(); } } } catch (Exception e) { diff --git a/temporal-sdk/src/test/java/io/temporal/activity/ActivityResetTest.java b/temporal-sdk/src/test/java/io/temporal/activity/ActivityResetTest.java new file mode 100644 index 0000000000..670892aaa1 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/activity/ActivityResetTest.java @@ -0,0 +1,113 @@ +package io.temporal.activity; + +import static org.junit.Assume.assumeTrue; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.workflow.v1.PendingActivityInfo; +import io.temporal.api.workflowservice.v1.ResetActivityRequest; +import io.temporal.client.ActivityResetException; +import io.temporal.client.WorkflowStub; +import io.temporal.common.converter.GlobalDataConverter; +import io.temporal.testing.internal.SDKTestOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.Async; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestActivities; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class ActivityResetTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestWorkflowImpl.class) + .setActivityImplementations(new HeartBeatingActivityImpl()) + .build(); + + @Test + public void activityReset() { + assumeTrue( + "Test Server doesn't support activity pause", SDKTestWorkflowRule.useExternalService); + + TestWorkflows.TestWorkflowReturnString workflow = + testWorkflowRule.newWorkflowStub(TestWorkflows.TestWorkflowReturnString.class); + Assert.assertEquals("I am stopped after reset", workflow.execute()); + Assert.assertEquals( + 1, + WorkflowStub.fromTyped(workflow) + .describe() + .getRawDescription() + .getPendingActivitiesCount()); + PendingActivityInfo activityInfo = + WorkflowStub.fromTyped(workflow).describe().getRawDescription().getPendingActivities(0); + Assert.assertEquals( + "1", + GlobalDataConverter.get() + .fromPayload( + activityInfo.getHeartbeatDetails().getPayloads(0), String.class, String.class)); + } + + public static class TestWorkflowImpl implements TestWorkflows.TestWorkflowReturnString { + + private final TestActivities.TestActivity1 activities = + Workflow.newActivityStub( + TestActivities.TestActivity1.class, + SDKTestOptions.newActivityOptions20sScheduleToClose()); + + @Override + public String execute() { + Async.function(activities::execute, ""); + Workflow.sleep(Duration.ofSeconds(1)); + return activities.execute("CompleteOnPause"); + } + } + + public static class HeartBeatingActivityImpl implements TestActivities.TestActivity1 { + private final AtomicInteger resetCounter = new AtomicInteger(0); + + @Override + public String execute(String arg) { + ActivityInfo info = Activity.getExecutionContext().getInfo(); + // Have the activity pause itself + Activity.getExecutionContext() + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .resetActivity( + ResetActivityRequest.newBuilder() + .setNamespace(info.getNamespace()) + .setExecution( + WorkflowExecution.newBuilder() + .setWorkflowId(info.getWorkflowId()) + .setRunId(info.getRunId()) + .build()) + .setId(info.getActivityId()) + .build()); + while (true) { + try { + Thread.sleep(1000); + // Check if the activity has been reset, and the activity info shows we are on the 1st + // attempt. + if (resetCounter.get() >= 1 + && Activity.getExecutionContext().getInfo().getAttempt() == 1) { + return "I am stopped after reset"; + } + // Heartbeat and verify that the correct exception is thrown + Activity.getExecutionContext().heartbeat("1"); + } catch (ActivityResetException pe) { + // Counter is incremented to track the number of resets + resetCounter.addAndGet(1); + // This will fail the attempt, and the activity will be retried. + throw pe; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } +}