Skip to content

Throw clear error if workflow stub is misused #2543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ final class ActivityStubImpl extends ActivityStubBase {
private final WorkflowOutboundCallsInterceptor activityExecutor;
private final Functions.Proc assertReadOnly;

private void assertSameWorkflow() {
if (activityExecutor != WorkflowInternal.getWorkflowOutboundInterceptor()) {
throw new IllegalStateException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happened before this code was added? Did things work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the linked issue, it would be some cryptic error message

        io.temporal.failure.ApplicationFailure: message='Operation allowed only while eventLoop is running', type='java.lang.IllegalStateException', nonRetryable=false

"Activity stub belongs to a different workflow. Create a new stub for each workflow instance.");
}
}

static ActivityStub newInstance(
ActivityOptions options,
WorkflowOutboundCallsInterceptor activityExecutor,
Expand All @@ -34,6 +41,7 @@ static ActivityStub newInstance(
@Override
public <R> Promise<R> executeAsync(
String activityName, Class<R> resultClass, Type resultType, Object... args) {
assertSameWorkflow();
this.assertReadOnly.apply();
return activityExecutor
.executeActivity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ class ChildWorkflowStubImpl implements ChildWorkflowStub {
private final CompletablePromise<WorkflowExecution> execution;
private final Functions.Proc1<String> assertReadOnly;

private void assertSameWorkflow() {
if (outboundCallsInterceptor != WorkflowInternal.getWorkflowOutboundInterceptor()) {
throw new IllegalStateException(
"Child workflow stub belongs to a different workflow. Create a new stub for each workflow instance.");
}
}

ChildWorkflowStubImpl(
String workflowType,
ChildWorkflowOptions options,
Expand Down Expand Up @@ -60,6 +67,7 @@ public <R> R execute(Class<R> resultClass, Object... args) {

@Override
public <R> R execute(Class<R> resultClass, Type resultType, Object... args) {
assertSameWorkflow();
assertReadOnly.apply("schedule child workflow");
Promise<R> result = executeAsync(resultClass, resultType, args);
if (AsyncInternal.isAsync()) {
Expand All @@ -83,6 +91,7 @@ public <R> Promise<R> executeAsync(Class<R> resultClass, Object... args) {

@Override
public <R> Promise<R> executeAsync(Class<R> resultClass, Type resultType, Object... args) {
assertSameWorkflow();
assertReadOnly.apply("schedule child workflow");
ChildWorkflowOutput<R> result =
outboundCallsInterceptor.executeChildWorkflow(
Expand All @@ -100,6 +109,7 @@ public <R> Promise<R> executeAsync(Class<R> resultClass, Type resultType, Object

@Override
public void signal(String signalName, Object... args) {
assertSameWorkflow();
assertReadOnly.apply("signal workflow");
Promise<Void> signaled =
outboundCallsInterceptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ class ExternalWorkflowStubImpl implements ExternalWorkflowStub {
private final WorkflowExecution execution;
private Functions.Proc1<String> assertReadOnly;

private void assertSameWorkflow() {
if (outboundCallsInterceptor != WorkflowInternal.getWorkflowOutboundInterceptor()) {
throw new IllegalStateException(
"External workflow stub belongs to a different workflow. Create a new stub for each workflow instance.");
}
}

public ExternalWorkflowStubImpl(
WorkflowExecution execution,
WorkflowOutboundCallsInterceptor outboundCallsInterceptor,
Expand All @@ -29,6 +36,7 @@ public WorkflowExecution getExecution() {

@Override
public void signal(String signalName, Object... args) {
assertSameWorkflow();
assertReadOnly.apply("signal external workflow");
Promise<Void> signaled =
outboundCallsInterceptor
Expand All @@ -52,6 +60,7 @@ public void signal(String signalName, Object... args) {

@Override
public void cancel() {
assertSameWorkflow();
assertReadOnly.apply("cancel external workflow");
Promise<Void> cancelRequested =
outboundCallsInterceptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ class LocalActivityStubImpl extends ActivityStubBase {
private final WorkflowOutboundCallsInterceptor activityExecutor;
private final Functions.Proc assertReadOnly;

private void assertSameWorkflow() {
if (activityExecutor != WorkflowInternal.getWorkflowOutboundInterceptor()) {
throw new IllegalStateException(
"Local Activity stub belongs to a different workflow. Create a new stub for each workflow instance.");
}
}

static ActivityStub newInstance(
LocalActivityOptions options,
WorkflowOutboundCallsInterceptor activityExecutor,
Expand All @@ -34,6 +41,7 @@ private LocalActivityStubImpl(
@Override
public <R> Promise<R> executeAsync(
String activityName, Class<R> resultClass, Type resultType, Object... args) {
assertSameWorkflow();
this.assertReadOnly.apply();
return activityExecutor
.executeLocalActivity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ public class NexusServiceStubImpl implements NexusServiceStub {
private final WorkflowOutboundCallsInterceptor outboundCallsInterceptor;
private final Functions.Proc1<String> assertReadOnly;

private void assertSameWorkflow() {
if (outboundCallsInterceptor != WorkflowInternal.getWorkflowOutboundInterceptor()) {
throw new IllegalStateException(
"Nexus service stub belongs to a different workflow. Create a new stub for each workflow instance.");
}
}

public NexusServiceStubImpl(
String name,
NexusServiceOptions options,
Expand All @@ -31,6 +38,7 @@ public <R> R execute(String operationName, Class<R> resultClass, Object arg) {

@Override
public <R> R execute(String operationName, Class<R> resultClass, Type resultType, Object arg) {
assertSameWorkflow();
assertReadOnly.apply("execute nexus operation");
Promise<R> result = executeAsync(operationName, resultClass, resultType, arg);
if (AsyncInternal.isAsync()) {
Expand All @@ -55,6 +63,7 @@ public <R> Promise<R> executeAsync(String operationName, Class<R> resultClass, O
@Override
public <R> Promise<R> executeAsync(
String operationName, Class<R> resultClass, Type resultType, Object arg) {
assertSameWorkflow();
assertReadOnly.apply("execute nexus operation");
NexusOperationOptions mergedOptions =
NexusOperationOptions.newBuilder(options.getOperationOptions())
Expand Down Expand Up @@ -82,6 +91,7 @@ public <R> NexusOperationHandle<R> start(String operationName, Class<R> resultCl
@Override
public <R> NexusOperationHandle<R> start(
String operationName, Class<R> resultClass, Type resultType, Object arg) {
assertSameWorkflow();
assertReadOnly.apply("schedule nexus operation");
NexusOperationOptions mergedOptions =
NexusOperationOptions.newBuilder(options.getOperationOptions())
Expand Down
Loading
Loading