Skip to content

Commit f93910b

Browse files
Make sure GetVersion never yields (#2376)
Make sure GetVersion never yields
1 parent ce90d24 commit f93910b

34 files changed

+504
-33
lines changed

temporal-sdk/src/main/java/io/temporal/internal/common/SdkFlag.java

+4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ public enum SdkFlag {
3030
* Changes behavior of GetVersion to not yield if no previous call existed in history.
3131
*/
3232
SKIP_YIELD_ON_DEFAULT_VERSION(1),
33+
/*
34+
* Changes behavior of GetVersion to never yield.
35+
*/
36+
SKIP_YIELD_ON_VERSION(2),
3337
UNKNOWN(Integer.MAX_VALUE);
3438

3539
private final int value;

temporal-sdk/src/main/java/io/temporal/internal/common/SdkFlags.java

+11
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,17 @@ public boolean tryUseSdkFlag(SdkFlag flag) {
6868
}
6969
}
7070

71+
/**
72+
* @return True if this flag is set.
73+
*/
74+
public boolean checkSdkFlag(SdkFlag flag) {
75+
if (!supportSdkMetadata) {
76+
return false;
77+
}
78+
79+
return sdkFlags.contains(flag);
80+
}
81+
7182
/**
7283
* @return All flags set since the last call to takeNewSdkFlags.
7384
*/

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ void mutableSideEffect(
284284
* @param callback used to return version
285285
* @return True if the identifier is not present in history
286286
*/
287-
boolean getVersion(
287+
Integer getVersion(
288288
String changeId,
289289
int minSupported,
290290
int maxSupported,
@@ -417,6 +417,11 @@ boolean getVersion(
417417
*/
418418
boolean tryUseSdkFlag(SdkFlag flag);
419419

420+
/**
421+
* @return true if this flag is currently set.
422+
*/
423+
boolean checkSdkFlag(SdkFlag flag);
424+
420425
/**
421426
* @return The Build ID of the worker which executed the current Workflow Task. May be empty the
422427
* task was completed by a worker without a Build ID. If this worker is the one executing this

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,11 @@ public boolean tryUseSdkFlag(SdkFlag flag) {
260260
return workflowStateMachines.tryUseSdkFlag(flag);
261261
}
262262

263+
@Override
264+
public boolean checkSdkFlag(SdkFlag flag) {
265+
return workflowStateMachines.checkSdkFlag(flag);
266+
}
267+
263268
@Override
264269
public Optional<String> getCurrentBuildId() {
265270
String curTaskBID = workflowStateMachines.getCurrentTaskBuildId();
@@ -324,7 +329,7 @@ public void mutableSideEffect(
324329
}
325330

326331
@Override
327-
public boolean getVersion(
332+
public Integer getVersion(
328333
String changeId,
329334
int minSupported,
330335
int maxSupported,

temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ private VersionStateMachine(
379379
* @param callback used to return version
380380
* @return True if the identifier is not present in history
381381
*/
382-
public boolean getVersion(
382+
public Integer getVersion(
383383
int minSupported, int maxSupported, Functions.Proc2<Integer, RuntimeException> callback) {
384384
InvocationStateMachine ism = new InvocationStateMachine(minSupported, maxSupported, callback);
385385
ism.explicitEvent(ExplicitEvent.CHECK_EXECUTION_STATE);
@@ -390,7 +390,7 @@ public boolean getVersion(
390390
// This means either this version marker did not exist in the original execution or
391391
// the version marker did exist, but was in an earlier WFT. If the version marker was in a
392392
// previous WFT then the version field should have a value.
393-
return !(ism.getState() == VersionStateMachine.State.SKIPPED_REPLAYING && version == null);
393+
return version == null ? preloadedVersion : version;
394394
}
395395

396396
public void handleNonMatchingEvent(HistoryEvent event) {

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ enum HandleEventStatus {
5858
}
5959

6060
/** Initial set of SDK flags that will be set on all new workflow executions. */
61-
private static final List<SdkFlag> initialFlags =
62-
Collections.unmodifiableList(
63-
Collections.singletonList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION));
61+
@VisibleForTesting
62+
public static List<SdkFlag> initialFlags =
63+
Collections.unmodifiableList(Arrays.asList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION));
6464

6565
/**
6666
* EventId of the WorkflowTaskStarted event of the Workflow Task that was picked up by a worker
@@ -661,6 +661,13 @@ public boolean tryUseSdkFlag(SdkFlag flag) {
661661
return flags.tryUseSdkFlag(flag);
662662
}
663663

664+
/**
665+
* @return True if the SDK flag is set in the workflow execution
666+
*/
667+
public boolean checkSdkFlag(SdkFlag flag) {
668+
return flags.checkSdkFlag(flag);
669+
}
670+
664671
/**
665672
* @return Set of all new flags set since the last call
666673
*/
@@ -1074,7 +1081,7 @@ public void mutableSideEffect(
10741081
stateMachineSink);
10751082
}
10761083

1077-
public boolean getVersion(
1084+
public Integer getVersion(
10781085
String changeId,
10791086
int minSupported,
10801087
int maxSupported,

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

+24-2
Original file line numberDiff line numberDiff line change
@@ -1119,7 +1119,7 @@ private <R> R mutableSideEffectImpl(
11191119
@Override
11201120
public int getVersion(String changeId, int minSupported, int maxSupported) {
11211121
CompletablePromise<Integer> result = Workflow.newPromise();
1122-
boolean markerExists =
1122+
Integer versionToUse =
11231123
replayContext.getVersion(
11241124
changeId,
11251125
minSupported,
@@ -1140,12 +1140,34 @@ public int getVersion(String changeId, int minSupported, int maxSupported) {
11401140
* because it can lead to non-deterministic scheduling.
11411141
* */
11421142
if (replayContext.isReplaying()
1143-
&& !markerExists
1143+
&& versionToUse == null
11441144
&& replayContext.tryUseSdkFlag(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION)
11451145
&& minSupported == DEFAULT_VERSION) {
11461146
return DEFAULT_VERSION;
11471147
}
11481148

1149+
/*
1150+
* Previously the SDK would yield on the getVersion call to the scheduler. This is not ideal because it can lead to non-deterministic
1151+
* scheduling if the getVersion call was removed.
1152+
* */
1153+
if (replayContext.checkSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION)) {
1154+
// This can happen if we are replaying a workflow and encounter a getVersion call that did not
1155+
// exist on the original execution and the range does not include the default version.
1156+
if (versionToUse == null) {
1157+
versionToUse = DEFAULT_VERSION;
1158+
}
1159+
if (versionToUse < minSupported || versionToUse > maxSupported) {
1160+
throw new UnsupportedVersion(
1161+
new UnsupportedVersion.UnsupportedVersionException(
1162+
String.format(
1163+
"Version %d of changeId %s is not supported. Supported v is between %d and %d.",
1164+
versionToUse, changeId, minSupported, maxSupported)));
1165+
}
1166+
return versionToUse;
1167+
}
1168+
// Legacy behavior if SKIP_YIELD_ON_VERSION is not set. This means this thread will yield on the
1169+
// getVersion call.
1170+
// while it waits for the result.
11491171
try {
11501172
return result.get();
11511173
} catch (UnsupportedVersion.UnsupportedVersionException ex) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow.versionTests;
22+
23+
import io.temporal.internal.common.SdkFlag;
24+
import io.temporal.internal.statemachines.WorkflowStateMachines;
25+
import java.util.Arrays;
26+
import java.util.Collections;
27+
import org.junit.Before;
28+
import org.junit.runner.RunWith;
29+
import org.junit.runners.Parameterized;
30+
31+
@RunWith(Parameterized.class)
32+
public abstract class BaseVersionTest {
33+
34+
@Parameterized.Parameter public static boolean setVersioningFlag;
35+
36+
@Parameterized.Parameters()
37+
public static Object[] data() {
38+
return new Object[][] {{true}, {false}};
39+
}
40+
41+
@Before
42+
public void setup() {
43+
if (setVersioningFlag) {
44+
WorkflowStateMachines.initialFlags =
45+
Collections.unmodifiableList(
46+
Arrays.asList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION, SdkFlag.SKIP_YIELD_ON_VERSION));
47+
}
48+
}
49+
}

temporal-sdk/src/test/java/io/temporal/workflow/versionTests/DefaultVersionNotSupportedDuringReplayTest.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.temporal.workflow.shared.TestWorkflows.TestWorkflowReturnString;
2929
import io.temporal.workflow.unsafe.WorkflowUnsafe;
3030
import java.time.Duration;
31+
import org.junit.Assert;
3132
import org.junit.Rule;
3233
import org.junit.Test;
3334

@@ -36,7 +37,7 @@
3637
* replayed on a code version that doesn't support the {@link
3738
* io.temporal.workflow.Workflow#DEFAULT_VERSION} anymore
3839
*/
39-
public class DefaultVersionNotSupportedDuringReplayTest {
40+
public class DefaultVersionNotSupportedDuringReplayTest extends BaseVersionTest {
4041

4142
private static final Signal unsupportedVersionExceptionThrown = new Signal();
4243

@@ -64,6 +65,9 @@ public String execute() {
6465
try {
6566
Workflow.getVersion("test_change", 2, 3);
6667
} catch (UnsupportedVersion e) {
68+
Assert.assertEquals(
69+
"Version -1 of changeId test_change is not supported. Supported v is between 2 and 3.",
70+
e.getMessage());
6771
unsupportedVersionExceptionThrown.signal();
6872
throw e;
6973
}

temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionAddNewBeforeTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import org.slf4j.Logger;
3535
import org.slf4j.LoggerFactory;
3636

37-
public class GetVersionAddNewBeforeTest {
37+
public class GetVersionAddNewBeforeTest extends BaseVersionTest {
3838

3939
private static final Logger log = LoggerFactory.getLogger(GetVersionAddNewBeforeTest.class);
4040
private static int versionFoo;

temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionAfterScopeCancellationInMainWorkflowMethodTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535

3636
/** This test reproduces a clash in cancellation scopes with getVersion described here: */
3737
@Issue("https://github.com/temporalio/sdk-java/issues/648")
38-
public class GetVersionAfterScopeCancellationInMainWorkflowMethodTest {
38+
public class GetVersionAfterScopeCancellationInMainWorkflowMethodTest extends BaseVersionTest {
3939

4040
@Rule
4141
public SDKTestWorkflowRule testWorkflowRule =

temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionAfterScopeCancellationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
* io.temporal.internal.statemachines.VersionStateMachineTest#testRecordAfterCommandCancellation}
4343
*/
4444
@Issue("https://github.com/temporalio/sdk-java/issues/615")
45-
public class GetVersionAfterScopeCancellationTest {
45+
public class GetVersionAfterScopeCancellationTest extends BaseVersionTest {
4646

4747
@Rule
4848
public SDKTestWorkflowRule testWorkflowRule =

temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionAndTimerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.junit.Rule;
3434
import org.junit.Test;
3535

36-
public class GetVersionAndTimerTest {
36+
public class GetVersionAndTimerTest extends BaseVersionTest {
3737

3838
@Rule
3939
public SDKTestWorkflowRule testWorkflowRuleWithoutVersion =

temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionDefaultInSignalTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import org.junit.Rule;
3636
import org.junit.Test;
3737

38-
public class GetVersionDefaultInSignalTest {
38+
public class GetVersionDefaultInSignalTest extends BaseVersionTest {
3939

4040
@Rule
4141
public SDKTestWorkflowRule testWorkflowRule =

temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInSignalOnReplayTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import org.junit.Rule;
3939
import org.junit.Test;
4040

41-
public class GetVersionInSignalOnReplayTest {
41+
public class GetVersionInSignalOnReplayTest extends BaseVersionTest {
4242
public static boolean hasReplayedSignal;
4343

4444
@Rule

temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionInSignalTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import org.junit.Rule;
3333
import org.junit.Test;
3434

35-
public class GetVersionInSignalTest {
35+
public class GetVersionInSignalTest extends BaseVersionTest {
3636

3737
@Rule
3838
public SDKTestWorkflowRule testWorkflowRule =

temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionMultipleCallsDefaultTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import org.junit.Rule;
3636
import org.junit.Test;
3737

38-
public class GetVersionMultipleCallsDefaultTest {
38+
public class GetVersionMultipleCallsDefaultTest extends BaseVersionTest {
3939
@Rule
4040
public SDKTestWorkflowRule testWorkflowRule =
4141
SDKTestWorkflowRule.newBuilder()

temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionMultipleCallsTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import org.junit.Rule;
3535
import org.junit.Test;
3636

37-
public class GetVersionMultipleCallsTest {
37+
public class GetVersionMultipleCallsTest extends BaseVersionTest {
3838
@Rule
3939
public SDKTestWorkflowRule testWorkflowRule =
4040
SDKTestWorkflowRule.newBuilder()

0 commit comments

Comments
 (0)