From 9c81a10d1f9edbacfcea3abd80b73eccf82fc73f Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Tue, 3 Jun 2025 08:35:45 -0700 Subject: [PATCH] Throw clear error if workflow stub is misused --- .../internal/sync/ActivityStubImpl.java | 8 + .../internal/sync/ChildWorkflowStubImpl.java | 10 + .../sync/ExternalWorkflowStubImpl.java | 9 + .../internal/sync/LocalActivityStubImpl.java | 8 + .../internal/sync/NexusServiceStubImpl.java | 10 + .../io/temporal/workflow/StubReuseTest.java | 318 ++++++++++++++++++ 6 files changed, 363 insertions(+) create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/StubReuseTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityStubImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityStubImpl.java index 8ed9e62be3..cfef301701 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityStubImpl.java @@ -13,6 +13,13 @@ final class ActivityStubImpl extends ActivityStubBase { private final WorkflowOutboundCallsInterceptor activityExecutor; private final Functions.Proc assertReadOnly; + private void assertSameWorkflow() { + if (activityExecutor != WorkflowInternal.getWorkflowOutboundInterceptor()) { + throw new IllegalStateException( + "Activity stub belongs to a different workflow. Create a new stub for each workflow instance."); + } + } + static ActivityStub newInstance( ActivityOptions options, WorkflowOutboundCallsInterceptor activityExecutor, @@ -34,6 +41,7 @@ static ActivityStub newInstance( @Override public Promise executeAsync( String activityName, Class resultClass, Type resultType, Object... args) { + assertSameWorkflow(); this.assertReadOnly.apply(); return activityExecutor .executeActivity( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/ChildWorkflowStubImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/ChildWorkflowStubImpl.java index 8d38d7cab1..693ada0246 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/ChildWorkflowStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/ChildWorkflowStubImpl.java @@ -18,6 +18,13 @@ class ChildWorkflowStubImpl implements ChildWorkflowStub { private final CompletablePromise execution; private final Functions.Proc1 assertReadOnly; + private void assertSameWorkflow() { + if (outboundCallsInterceptor != WorkflowInternal.getWorkflowOutboundInterceptor()) { + throw new IllegalStateException( + "Child workflow stub belongs to a different workflow. Create a new stub for each workflow instance."); + } + } + ChildWorkflowStubImpl( String workflowType, ChildWorkflowOptions options, @@ -60,6 +67,7 @@ public R execute(Class resultClass, Object... args) { @Override public R execute(Class resultClass, Type resultType, Object... args) { + assertSameWorkflow(); assertReadOnly.apply("schedule child workflow"); Promise result = executeAsync(resultClass, resultType, args); if (AsyncInternal.isAsync()) { @@ -83,6 +91,7 @@ public Promise executeAsync(Class resultClass, Object... args) { @Override public Promise executeAsync(Class resultClass, Type resultType, Object... args) { + assertSameWorkflow(); assertReadOnly.apply("schedule child workflow"); ChildWorkflowOutput result = outboundCallsInterceptor.executeChildWorkflow( @@ -100,6 +109,7 @@ public Promise executeAsync(Class resultClass, Type resultType, Object @Override public void signal(String signalName, Object... args) { + assertSameWorkflow(); assertReadOnly.apply("signal workflow"); Promise signaled = outboundCallsInterceptor diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/ExternalWorkflowStubImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/ExternalWorkflowStubImpl.java index c268726613..bf465e6386 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/ExternalWorkflowStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/ExternalWorkflowStubImpl.java @@ -13,6 +13,13 @@ class ExternalWorkflowStubImpl implements ExternalWorkflowStub { private final WorkflowExecution execution; private Functions.Proc1 assertReadOnly; + private void assertSameWorkflow() { + if (outboundCallsInterceptor != WorkflowInternal.getWorkflowOutboundInterceptor()) { + throw new IllegalStateException( + "External workflow stub belongs to a different workflow. Create a new stub for each workflow instance."); + } + } + public ExternalWorkflowStubImpl( WorkflowExecution execution, WorkflowOutboundCallsInterceptor outboundCallsInterceptor, @@ -29,6 +36,7 @@ public WorkflowExecution getExecution() { @Override public void signal(String signalName, Object... args) { + assertSameWorkflow(); assertReadOnly.apply("signal external workflow"); Promise signaled = outboundCallsInterceptor @@ -52,6 +60,7 @@ public void signal(String signalName, Object... args) { @Override public void cancel() { + assertSameWorkflow(); assertReadOnly.apply("cancel external workflow"); Promise cancelRequested = outboundCallsInterceptor diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityStubImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityStubImpl.java index 6744c26cde..577c423946 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityStubImpl.java @@ -13,6 +13,13 @@ class LocalActivityStubImpl extends ActivityStubBase { private final WorkflowOutboundCallsInterceptor activityExecutor; private final Functions.Proc assertReadOnly; + private void assertSameWorkflow() { + if (activityExecutor != WorkflowInternal.getWorkflowOutboundInterceptor()) { + throw new IllegalStateException( + "Local Activity stub belongs to a different workflow. Create a new stub for each workflow instance."); + } + } + static ActivityStub newInstance( LocalActivityOptions options, WorkflowOutboundCallsInterceptor activityExecutor, @@ -34,6 +41,7 @@ private LocalActivityStubImpl( @Override public Promise executeAsync( String activityName, Class resultClass, Type resultType, Object... args) { + assertSameWorkflow(); this.assertReadOnly.apply(); return activityExecutor .executeLocalActivity( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/NexusServiceStubImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/NexusServiceStubImpl.java index 821b92f075..782f47076f 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/NexusServiceStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/NexusServiceStubImpl.java @@ -13,6 +13,13 @@ public class NexusServiceStubImpl implements NexusServiceStub { private final WorkflowOutboundCallsInterceptor outboundCallsInterceptor; private final Functions.Proc1 assertReadOnly; + private void assertSameWorkflow() { + if (outboundCallsInterceptor != WorkflowInternal.getWorkflowOutboundInterceptor()) { + throw new IllegalStateException( + "Nexus service stub belongs to a different workflow. Create a new stub for each workflow instance."); + } + } + public NexusServiceStubImpl( String name, NexusServiceOptions options, @@ -31,6 +38,7 @@ public R execute(String operationName, Class resultClass, Object arg) { @Override public R execute(String operationName, Class resultClass, Type resultType, Object arg) { + assertSameWorkflow(); assertReadOnly.apply("execute nexus operation"); Promise result = executeAsync(operationName, resultClass, resultType, arg); if (AsyncInternal.isAsync()) { @@ -55,6 +63,7 @@ public Promise executeAsync(String operationName, Class resultClass, O @Override public Promise executeAsync( String operationName, Class resultClass, Type resultType, Object arg) { + assertSameWorkflow(); assertReadOnly.apply("execute nexus operation"); NexusOperationOptions mergedOptions = NexusOperationOptions.newBuilder(options.getOperationOptions()) @@ -82,6 +91,7 @@ public NexusOperationHandle start(String operationName, Class resultCl @Override public NexusOperationHandle start( String operationName, Class resultClass, Type resultType, Object arg) { + assertSameWorkflow(); assertReadOnly.apply("schedule nexus operation"); NexusOperationOptions mergedOptions = NexusOperationOptions.newBuilder(options.getOperationOptions()) diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/StubReuseTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/StubReuseTest.java new file mode 100644 index 0000000000..2b3a8a0d4f --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/StubReuseTest.java @@ -0,0 +1,318 @@ +package io.temporal.workflow; + +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.activity.ActivityOptions; +import io.temporal.activity.LocalActivityOptions; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowFailedException; +import io.temporal.client.WorkflowOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.shared.TestActivities; +import io.temporal.workflow.shared.TestNexusServices; +import java.time.Duration; +import java.util.UUID; +import org.junit.Rule; +import org.junit.Test; + +public class StubReuseTest { + static TestActivities.TestActivity1 storedActivity; + static TestActivities.TestActivity1 storedLocalActivity; + static SimpleChildWorkflow storedChild; + static TargetWorkflow storedExternal; + static TestNexusServices.TestNexusService1 storedNexus; + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes( + StoreActivityWorkflowImpl.class, + UseActivityWorkflowImpl.class, + StoreLocalActivityWorkflowImpl.class, + UseLocalActivityWorkflowImpl.class, + StoreChildWorkflowWorkflowImpl.class, + UseChildWorkflowWorkflowImpl.class, + TargetWorkflowImpl.class, + StoreExternalWorkflowImpl.class, + UseExternalWorkflowImpl.class, + StoreNexusServiceWorkflowImpl.class, + UseNexusServiceWorkflowImpl.class, + SimpleChildWorkflowImpl.class) + .setActivityImplementations(new SimpleActivityImpl()) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @Test + public void activityStubReuseFails() { + StoreActivityWorkflow store = + testWorkflowRule.newWorkflowStubTimeoutOptions(StoreActivityWorkflow.class); + store.execute(testWorkflowRule.getTaskQueue()); + UseActivityWorkflow use = + testWorkflowRule.newWorkflowStubTimeoutOptions(UseActivityWorkflow.class); + WorkflowFailedException e = + assertThrows(WorkflowFailedException.class, () -> use.execute("unused")); + assertTrue(e.getCause().getMessage().contains("belongs to a different workflow")); + } + + @Test + public void localActivityStubReuseFails() { + StoreLocalActivityWorkflow store = + testWorkflowRule.newWorkflowStubTimeoutOptions(StoreLocalActivityWorkflow.class); + store.execute(testWorkflowRule.getTaskQueue()); + UseLocalActivityWorkflow use = + testWorkflowRule.newWorkflowStubTimeoutOptions(UseLocalActivityWorkflow.class); + WorkflowFailedException e = + assertThrows(WorkflowFailedException.class, () -> use.execute("unused")); + assertTrue(e.getCause().getMessage().contains("belongs to a different workflow")); + } + + @Test + public void childWorkflowStubReuseFails() { + StoreChildWorkflowWorkflow store = + testWorkflowRule.newWorkflowStubTimeoutOptions(StoreChildWorkflowWorkflow.class); + store.execute(testWorkflowRule.getTaskQueue()); + UseChildWorkflowWorkflow use = + testWorkflowRule.newWorkflowStubTimeoutOptions(UseChildWorkflowWorkflow.class); + WorkflowFailedException e = + assertThrows(WorkflowFailedException.class, () -> use.execute("input")); + assertTrue(e.getCause().getMessage().contains("belongs to a different workflow")); + } + + @Test + public void externalWorkflowStubReuseFails() { + WorkflowOptions opts = + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setWorkflowId("target-" + UUID.randomUUID()) + .build(); + TargetWorkflow target = + testWorkflowRule.getWorkflowClient().newWorkflowStub(TargetWorkflow.class, opts); + WorkflowClient.start(target::execute); + + StoreExternalWorkflow store = + testWorkflowRule.newWorkflowStubTimeoutOptions(StoreExternalWorkflow.class); + store.execute(opts.getWorkflowId()); + UseExternalWorkflow use = + testWorkflowRule.newWorkflowStubTimeoutOptions(UseExternalWorkflow.class); + WorkflowFailedException e = + assertThrows(WorkflowFailedException.class, () -> use.execute("unused")); + assertTrue(e.getCause().getMessage().contains("belongs to a different workflow")); + target.unblock(); + } + + @Test + public void nexusServiceStubReuseFails() { + StoreNexusServiceWorkflow store = + testWorkflowRule.newWorkflowStubTimeoutOptions(StoreNexusServiceWorkflow.class); + store.execute(testWorkflowRule.getTaskQueue()); + UseNexusServiceWorkflow use = + testWorkflowRule.newWorkflowStubTimeoutOptions(UseNexusServiceWorkflow.class); + WorkflowFailedException e = + assertThrows(WorkflowFailedException.class, () -> use.execute("input")); + assertTrue(e.getCause().getMessage().contains("belongs to a different workflow")); + } + + @WorkflowInterface + public interface StoreActivityWorkflow { + @WorkflowMethod(name = "StoreActivity") + String execute(String input); + } + + public static class StoreActivityWorkflowImpl implements StoreActivityWorkflow { + @Override + public String execute(String input) { + storedActivity = + Workflow.newActivityStub( + TestActivities.TestActivity1.class, + ActivityOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(5)) + .build()); + return ""; + } + } + + @WorkflowInterface + public interface UseActivityWorkflow { + @WorkflowMethod(name = "UseActivity") + String execute(String input); + } + + public static class UseActivityWorkflowImpl implements UseActivityWorkflow { + @Override + public String execute(String input) { + return storedActivity.execute(input); + } + } + + @WorkflowInterface + public interface StoreLocalActivityWorkflow { + @WorkflowMethod(name = "StoreLocalActivity") + String execute(String input); + } + + public static class StoreLocalActivityWorkflowImpl implements StoreLocalActivityWorkflow { + @Override + public String execute(String input) { + storedLocalActivity = + Workflow.newLocalActivityStub( + TestActivities.TestActivity1.class, + LocalActivityOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(5)) + .build()); + return ""; + } + } + + @WorkflowInterface + public interface UseLocalActivityWorkflow { + @WorkflowMethod(name = "UseLocalActivity") + String execute(String input); + } + + public static class UseLocalActivityWorkflowImpl implements UseLocalActivityWorkflow { + @Override + public String execute(String input) { + return storedLocalActivity.execute(input); + } + } + + @WorkflowInterface + public interface StoreChildWorkflowWorkflow { + @WorkflowMethod(name = "StoreChild") + String execute(String input); + } + + public static class StoreChildWorkflowWorkflowImpl implements StoreChildWorkflowWorkflow { + @Override + public String execute(String input) { + storedChild = Workflow.newChildWorkflowStub(SimpleChildWorkflow.class); + return ""; + } + } + + @WorkflowInterface + public interface UseChildWorkflowWorkflow { + @WorkflowMethod(name = "UseChild") + String execute(String input); + } + + public static class UseChildWorkflowWorkflowImpl implements UseChildWorkflowWorkflow { + @Override + public String execute(String input) { + return storedChild.run(input); + } + } + + @WorkflowInterface + public interface StoreExternalWorkflow { + @WorkflowMethod(name = "StoreExternal") + String execute(String workflowId); + } + + public static class StoreExternalWorkflowImpl implements StoreExternalWorkflow { + @Override + public String execute(String workflowId) { + WorkflowExecution exec = WorkflowExecution.newBuilder().setWorkflowId(workflowId).build(); + storedExternal = Workflow.newExternalWorkflowStub(TargetWorkflow.class, exec); + return ""; + } + } + + @WorkflowInterface + public interface UseExternalWorkflow { + @WorkflowMethod(name = "UseExternal") + String execute(String input); + } + + public static class UseExternalWorkflowImpl implements UseExternalWorkflow { + @Override + public String execute(String input) { + storedExternal.unblock(); + return ""; + } + } + + @WorkflowInterface + public interface StoreNexusServiceWorkflow { + @WorkflowMethod(name = "StoreNexus") + String execute(String input); + } + + public static class StoreNexusServiceWorkflowImpl implements StoreNexusServiceWorkflow { + @Override + public String execute(String input) { + storedNexus = Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class); + return ""; + } + } + + @WorkflowInterface + public interface UseNexusServiceWorkflow { + @WorkflowMethod(name = "UseNexus") + String execute(String input); + } + + public static class UseNexusServiceWorkflowImpl implements UseNexusServiceWorkflow { + @Override + public String execute(String input) { + return storedNexus.operation(input); + } + } + + @WorkflowInterface + public interface SimpleChildWorkflow { + @WorkflowMethod + String run(String input); + } + + public static class SimpleChildWorkflowImpl implements SimpleChildWorkflow { + @Override + public String run(String input) { + return input; + } + } + + @WorkflowInterface + public interface TargetWorkflow { + @WorkflowMethod + String execute(); + + @SignalMethod + void unblock(); + } + + public static class TargetWorkflowImpl implements TargetWorkflow { + private final CompletablePromise done = Workflow.newPromise(); + + @Override + public String execute() { + Workflow.await(done::isCompleted); + return ""; + } + + @Override + public void unblock() { + done.complete(null); + } + } + + public static class SimpleActivityImpl implements TestActivities.TestActivity1 { + @Override + public String execute(String input) { + return input; + } + } + + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) + public static class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return OperationHandler.sync((ctx, details, input) -> input); + } + } +}