diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/PollScaleReportHandle.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/PollScaleReportHandle.java index 2a89b66765..925c60e0bc 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/PollScaleReportHandle.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/PollScaleReportHandle.java @@ -36,6 +36,11 @@ public PollScaleReportHandle( public synchronized void report(T task, Throwable e) { if (e != null) { + // We want to avoid scaling down on errors if we have never seen a scaling decision + // since we might never scale up again. + if (!everSawScalingDecision) { + return; + } if ((e instanceof StatusRuntimeException)) { StatusRuntimeException statusRuntimeException = (StatusRuntimeException) e; if (statusRuntimeException.getStatus().getCode() == Status.Code.RESOURCE_EXHAUSTED) { diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/PollScaleReportHandleTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/PollScaleReportHandleTest.java index a6d896a4cf..7edc3f69f9 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/PollScaleReportHandleTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/PollScaleReportHandleTest.java @@ -12,11 +12,16 @@ public class PollScaleReportHandleTest { public void handleResourceExhaustedError() { // Mock dependencies Functions.Proc1 mockScaleCallback = Mockito.mock(Functions.Proc1.class); + ScalingTask mockTask = Mockito.mock(ScalingTask.class); + ScalingTask.ScalingDecision mockDecision = Mockito.mock(ScalingTask.ScalingDecision.class); + Mockito.when(mockTask.getScalingDecision()).thenReturn(mockDecision); + Mockito.when(mockDecision.getPollRequestDeltaSuggestion()).thenReturn(0); PollScaleReportHandle handle = new PollScaleReportHandle<>(1, 10, 8, mockScaleCallback); // Simulate RESOURCE_EXHAUSTED error StatusRuntimeException exception = new StatusRuntimeException(Status.RESOURCE_EXHAUSTED); + handle.report(mockTask, null); handle.report(null, exception); // Verify target poller count is halved and callback is invoked @@ -27,10 +32,15 @@ public void handleResourceExhaustedError() { public void handleGenericError() { // Mock dependencies Functions.Proc1 mockScaleCallback = Mockito.mock(Functions.Proc1.class); + ScalingTask mockTask = Mockito.mock(ScalingTask.class); + ScalingTask.ScalingDecision mockDecision = Mockito.mock(ScalingTask.ScalingDecision.class); + Mockito.when(mockTask.getScalingDecision()).thenReturn(mockDecision); + Mockito.when(mockDecision.getPollRequestDeltaSuggestion()).thenReturn(0); PollScaleReportHandle handle = new PollScaleReportHandle<>(1, 10, 5, mockScaleCallback); // Simulate a generic error + handle.report(mockTask, null); handle.report(null, new RuntimeException("Generic error")); // Verify target poller count is decremented and callback is invoked