Skip to content

Commit cbcf26c

Browse files
Fix unbalanced locks in test server for Nexus (#2341)
Fix unbalanced locks in test server for Nexus
1 parent 8782de3 commit cbcf26c

File tree

2 files changed

+20
-6
lines changed

2 files changed

+20
-6
lines changed

temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationTimeoutTest.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public static class TestNexus implements TestWorkflows.TestWorkflow1 {
6161
public String execute(String input) {
6262
NexusOperationOptions options =
6363
NexusOperationOptions.newBuilder()
64-
.setScheduleToCloseTimeout(Duration.ofSeconds(1))
64+
.setScheduleToCloseTimeout(Duration.ofSeconds(5))
6565
.build();
6666
NexusServiceOptions serviceOptions =
6767
NexusServiceOptions.newBuilder().setOperationOptions(options).build();
@@ -74,14 +74,22 @@ public String execute(String input) {
7474

7575
@ServiceImpl(service = TestNexusServices.TestNexusService1.class)
7676
public class TestNexusServiceImpl {
77+
int attempt = 0;
78+
7779
@OperationImpl
7880
public OperationHandler<String, String> operation() {
7981
// Implemented inline
8082
return OperationHandler.sync(
8183
(ctx, details, name) -> {
82-
// Simulate a long running operation
84+
// Fail the first attempt with a retry-able exception. This tests
85+
// the schedule-to-close timeout applies across attempts.
86+
attempt += 1;
87+
if (attempt == 1) {
88+
throw new RuntimeException("test exception");
89+
}
90+
// Simulate a long-running operation
8391
try {
84-
Thread.sleep(2000);
92+
Thread.sleep(6000);
8593
} catch (InterruptedException e) {
8694
throw new RuntimeException(e);
8795
}

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import io.temporal.internal.testservice.StateMachines.ActivityTaskData;
6363
import io.temporal.internal.testservice.StateMachines.CancelExternalData;
6464
import io.temporal.internal.testservice.StateMachines.ChildWorkflowData;
65+
import io.temporal.internal.testservice.StateMachines.NexusOperationData;
6566
import io.temporal.internal.testservice.StateMachines.SignalExternalData;
6667
import io.temporal.internal.testservice.StateMachines.State;
6768
import io.temporal.internal.testservice.StateMachines.TimerData;
@@ -782,14 +783,18 @@ private void processScheduleNexusOperation(
782783
nexusOperations.put(scheduleEventId, operation);
783784

784785
operation.action(Action.INITIATE, ctx, attr, workflowTaskCompletedId);
786+
// Record the current attempt of this request to be used in the timeout handler
787+
// of this request to make sure we are timing out the correct request.
788+
int attempt = operation.getData().getAttempt();
785789
ctx.addTimer(
786790
ProtobufTimeUtils.toJavaDuration(operation.getData().requestTimeout),
787-
() ->
788-
timeoutNexusRequest(
789-
scheduleEventId, "StartNexusOperation", operation.getData().getAttempt()),
791+
() -> timeoutNexusRequest(scheduleEventId, "StartNexusOperation", attempt),
790792
"StartNexusOperation request timeout");
791793
if (attr.hasScheduleToCloseTimeout()
792794
&& Durations.toMillis(attr.getScheduleToCloseTimeout()) > 0) {
795+
// ScheduleToCloseTimeout is the total time from the start of the operation to the end of the
796+
// operation
797+
// so the attempt is not relevant here.
793798
ctx.addTimer(
794799
ProtobufTimeUtils.toJavaDuration(attr.getScheduleToCloseTimeout()),
795800
() ->
@@ -978,6 +983,7 @@ private void processScheduleActivityTask(
978983
ActivityTaskScheduledEventAttributes scheduledEvent =
979984
activityStateMachine.getData().scheduledEvent;
980985
int attempt = activityStateMachine.getData().getAttempt();
986+
// TODO(quinn) If the first attempt fails, it is not clear this timer will work as expected
981987
ctx.addTimer(
982988
ProtobufTimeUtils.toJavaDuration(scheduledEvent.getScheduleToCloseTimeout()),
983989
() ->

0 commit comments

Comments
 (0)