Skip to content

Commit 963ea9f

Browse files
authored
Add sdk name/version to task completions when unset or changed (#2422)
1 parent f92b53c commit 963ea9f

File tree

12 files changed

+333
-23
lines changed

12 files changed

+333
-23
lines changed

temporal-sdk/src/main/java/io/temporal/internal/Config.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public final class Config {
2424
private Config() {}
2525

2626
/** Force new workflow task after workflow task timeout multiplied by this coefficient. */
27-
public static final double WORKFLOW_TAK_HEARTBEAT_COEFFICIENT = 4d / 5d;
27+
public static final double WORKFLOW_TASK_HEARTBEAT_COEFFICIENT = 4d / 5d;
2828

2929
/**
3030
* Limit how many eager activities can be requested by the SDK in one workflow task completion

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java

+18-10
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public WorkflowTaskResult handleWorkflowTask(
146146
Deadline.after(
147147
(long)
148148
(Durations.toNanos(startedEvent.getWorkflowTaskTimeout())
149-
* Config.WORKFLOW_TAK_HEARTBEAT_COEFFICIENT),
149+
* Config.WORKFLOW_TASK_HEARTBEAT_COEFFICIENT),
150150
TimeUnit.NANOSECONDS);
151151

152152
if (workflowTask.getPreviousStartedEventId()
@@ -180,15 +180,23 @@ public WorkflowTaskResult handleWorkflowTask(
180180
throw context.getWorkflowTaskFailure();
181181
}
182182
Map<String, WorkflowQueryResult> queryResults = executeQueries(workflowTask.getQueriesMap());
183-
return WorkflowTaskResult.newBuilder()
184-
.setCommands(commands)
185-
.setMessages(messages)
186-
.setQueryResults(queryResults)
187-
.setFinalCommand(context.isWorkflowMethodCompleted())
188-
.setForceWorkflowTask(localActivityTaskCount > 0 && !context.isWorkflowMethodCompleted())
189-
.setNonfirstLocalActivityAttempts(localActivityMeteringHelper.getNonfirstAttempts())
190-
.setSdkFlags(newSdkFlags)
191-
.build();
183+
WorkflowTaskResult.Builder result =
184+
WorkflowTaskResult.newBuilder()
185+
.setCommands(commands)
186+
.setMessages(messages)
187+
.setQueryResults(queryResults)
188+
.setFinalCommand(context.isWorkflowMethodCompleted())
189+
.setForceWorkflowTask(
190+
localActivityTaskCount > 0 && !context.isWorkflowMethodCompleted())
191+
.setNonfirstLocalActivityAttempts(localActivityMeteringHelper.getNonfirstAttempts())
192+
.setSdkFlags(newSdkFlags);
193+
if (workflowStateMachines.sdkNameToWrite() != null) {
194+
result.setWriteSdkName(workflowStateMachines.sdkNameToWrite());
195+
}
196+
if (workflowStateMachines.sdkVersionToWrite() != null) {
197+
result.setWriteSdkVersion(workflowStateMachines.sdkVersionToWrite());
198+
}
199+
return result.build();
192200
} finally {
193201
lock.unlock();
194202
}

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java

+15-6
Original file line numberDiff line numberDiff line change
@@ -247,12 +247,21 @@ private Result createCompletedWFTRequest(
247247
}
248248
completedRequest.setStickyAttributes(attributes);
249249
}
250-
if (!result.getSdkFlags().isEmpty()) {
251-
completedRequest =
252-
completedRequest.setSdkMetadata(
253-
WorkflowTaskCompletedMetadata.newBuilder()
254-
.addAllLangUsedFlags(result.getSdkFlags())
255-
.build());
250+
List<Integer> sdkFlags = result.getSdkFlags();
251+
String writeSdkName = result.getWriteSdkName();
252+
String writeSdkVersion = result.getWriteSdkVersion();
253+
if (!sdkFlags.isEmpty() || writeSdkName != null || writeSdkVersion != null) {
254+
WorkflowTaskCompletedMetadata.Builder md = WorkflowTaskCompletedMetadata.newBuilder();
255+
if (!sdkFlags.isEmpty()) {
256+
md.addAllLangUsedFlags(sdkFlags);
257+
}
258+
if (writeSdkName != null) {
259+
md.setSdkName(writeSdkName);
260+
}
261+
if (writeSdkVersion != null) {
262+
md.setSdkVersion(writeSdkVersion);
263+
}
264+
completedRequest.setSdkMetadata(md.build());
256265
}
257266
return new Result(
258267
workflowType,

temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowTaskResult.java

+30-2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public static final class Builder {
4141
private boolean forceWorkflowTask;
4242
private int nonfirstLocalActivityAttempts;
4343
private List<Integer> sdkFlags;
44+
private String writeSdkName;
45+
private String writeSdkVersion;
4446

4547
public Builder setCommands(List<Command> commands) {
4648
this.commands = commands;
@@ -77,6 +79,16 @@ public Builder setSdkFlags(List<Integer> sdkFlags) {
7779
return this;
7880
}
7981

82+
public Builder setWriteSdkName(String writeSdkName) {
83+
this.writeSdkName = writeSdkName;
84+
return this;
85+
}
86+
87+
public Builder setWriteSdkVersion(String writeSdkVersion) {
88+
this.writeSdkVersion = writeSdkVersion;
89+
return this;
90+
}
91+
8092
public WorkflowTaskResult build() {
8193
return new WorkflowTaskResult(
8294
commands == null ? Collections.emptyList() : commands,
@@ -85,7 +97,9 @@ public WorkflowTaskResult build() {
8597
finalCommand,
8698
forceWorkflowTask,
8799
nonfirstLocalActivityAttempts,
88-
sdkFlags == null ? Collections.emptyList() : sdkFlags);
100+
sdkFlags == null ? Collections.emptyList() : sdkFlags,
101+
writeSdkName,
102+
writeSdkVersion);
89103
}
90104
}
91105

@@ -96,6 +110,8 @@ public WorkflowTaskResult build() {
96110
private final boolean forceWorkflowTask;
97111
private final int nonfirstLocalActivityAttempts;
98112
private final List<Integer> sdkFlags;
113+
private final String writeSdkName;
114+
private final String writeSdkVersion;
99115

100116
private WorkflowTaskResult(
101117
List<Command> commands,
@@ -104,7 +120,9 @@ private WorkflowTaskResult(
104120
boolean finalCommand,
105121
boolean forceWorkflowTask,
106122
int nonfirstLocalActivityAttempts,
107-
List<Integer> sdkFlags) {
123+
List<Integer> sdkFlags,
124+
String writeSdkName,
125+
String writeSdkVersion) {
108126
this.commands = commands;
109127
this.messages = messages;
110128
this.nonfirstLocalActivityAttempts = nonfirstLocalActivityAttempts;
@@ -115,6 +133,8 @@ private WorkflowTaskResult(
115133
this.finalCommand = finalCommand;
116134
this.forceWorkflowTask = forceWorkflowTask;
117135
this.sdkFlags = sdkFlags;
136+
this.writeSdkName = writeSdkName;
137+
this.writeSdkVersion = writeSdkVersion;
118138
}
119139

120140
public List<Command> getCommands() {
@@ -145,4 +165,12 @@ public int getNonfirstLocalActivityAttempts() {
145165
public List<Integer> getSdkFlags() {
146166
return sdkFlags;
147167
}
168+
169+
public String getWriteSdkName() {
170+
return writeSdkName;
171+
}
172+
173+
public String getWriteSdkVersion() {
174+
return writeSdkVersion;
175+
}
148176
}

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

+30
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,13 @@
4343
import io.temporal.internal.history.VersionMarkerUtils;
4444
import io.temporal.internal.sync.WorkflowThread;
4545
import io.temporal.internal.worker.LocalActivityResult;
46+
import io.temporal.serviceclient.Version;
4647
import io.temporal.worker.NonDeterministicException;
4748
import io.temporal.workflow.ChildWorkflowCancellationType;
4849
import io.temporal.workflow.Functions;
4950
import java.nio.charset.StandardCharsets;
5051
import java.util.*;
52+
import javax.annotation.Nonnull;
5153
import javax.annotation.Nullable;
5254

5355
public final class WorkflowStateMachines {
@@ -179,6 +181,8 @@ enum HandleEventStatus {
179181
private final Set<String> acceptedUpdates = new HashSet<>();
180182

181183
private final SdkFlags flags;
184+
@Nonnull private String lastSeenSdkName = "";
185+
@Nonnull private String lastSeenSdkVersion = "";
182186

183187
public WorkflowStateMachines(
184188
StatesMachinesCallback callbacks, GetSystemInfoResponse.Capabilities capabilities) {
@@ -384,6 +388,12 @@ private void handleSingleEventLookahead(HistoryEvent event) {
384388
}
385389
flags.setSdkFlag(sdkFlag);
386390
}
391+
if (!Strings.isNullOrEmpty(completedEvent.getSdkMetadata().getSdkName())) {
392+
lastSeenSdkName = completedEvent.getSdkMetadata().getSdkName();
393+
}
394+
if (!Strings.isNullOrEmpty(completedEvent.getSdkMetadata().getSdkVersion())) {
395+
lastSeenSdkVersion = completedEvent.getSdkMetadata().getSdkVersion();
396+
}
387397
// Remove any finished update protocol state machines. We can't remove them on an event like
388398
// other state machines because a rejected update produces no event in history.
389399
protocolStateMachines.entrySet().removeIf(entry -> entry.getValue().isFinalState());
@@ -675,6 +685,26 @@ public EnumSet<SdkFlag> takeNewSdkFlags() {
675685
return flags.takeNewSdkFlags();
676686
}
677687

688+
/**
689+
* @return If we need to write the SDK name upon WFT completion, return it
690+
*/
691+
public String sdkNameToWrite() {
692+
if (!lastSeenSdkName.equals(Version.SDK_NAME)) {
693+
return Version.SDK_NAME;
694+
}
695+
return null;
696+
}
697+
698+
/**
699+
* @return If we need to write the SDK version upon WFT completion, return it
700+
*/
701+
public String sdkVersionToWrite() {
702+
if (!lastSeenSdkVersion.equals(Version.LIBRARY_VERSION)) {
703+
return Version.LIBRARY_VERSION;
704+
}
705+
return null;
706+
}
707+
678708
private void prepareCommands() {
679709
if (preparing) {
680710
return;

temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerTaskHandlerTests.java

+29
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import io.temporal.internal.worker.WorkflowExecutorCache;
4444
import io.temporal.internal.worker.WorkflowRunLockManager;
4545
import io.temporal.internal.worker.WorkflowTaskHandler;
46+
import io.temporal.serviceclient.Version;
4647
import io.temporal.serviceclient.WorkflowServiceStubs;
4748
import io.temporal.testUtils.HistoryUtils;
4849
import io.temporal.testing.internal.SDKTestWorkflowRule;
@@ -208,6 +209,34 @@ public void ifStickyExecutionAttributesAreSetThenWorkflowsAreCached() throws Thr
208209
assertEquals(Durations.fromSeconds(5), attributes.getScheduleToStartTimeout());
209210
}
210211

212+
@Test
213+
public void setsSdkNameAndVersionIfNotSetInHistory() throws Throwable {
214+
assumeFalse("skipping for docker tests", SDKTestWorkflowRule.useExternalService);
215+
216+
WorkflowExecutorCache cache =
217+
new WorkflowExecutorCache(10, new WorkflowRunLockManager(), new NoopScope());
218+
WorkflowTaskHandler taskHandler =
219+
new ReplayWorkflowTaskHandler(
220+
"namespace",
221+
setUpMockWorkflowFactory(),
222+
cache,
223+
SingleWorkerOptions.newBuilder().build(),
224+
InternalUtils.createStickyTaskQueue("sticky", "taskQueue"),
225+
Duration.ofSeconds(5),
226+
testWorkflowRule.getWorkflowServiceStubs(),
227+
null);
228+
229+
PollWorkflowTaskQueueResponse workflowTask =
230+
HistoryUtils.generateWorkflowTaskWithInitialHistory();
231+
232+
WorkflowTaskHandler.Result result = taskHandler.handleWorkflowTask(workflowTask);
233+
234+
assertTrue(result.isCompletionCommand());
235+
assertEquals(Version.SDK_NAME, result.getTaskCompleted().getSdkMetadata().getSdkName());
236+
assertEquals(
237+
Version.LIBRARY_VERSION, result.getTaskCompleted().getSdkMetadata().getSdkVersion());
238+
}
239+
211240
private ReplayWorkflowFactory setUpMockWorkflowFactory() throws Throwable {
212241
ReplayWorkflow mockWorkflow = mock(ReplayWorkflow.class);
213242
ReplayWorkflowFactory mockFactory = mock(ReplayWorkflowFactory.class);

temporal-sdk/src/test/java/io/temporal/internal/statemachines/TestHistoryBuilder.java

+4
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,10 @@ public long getPreviousStartedEventId() {
616616
return previousStartedEventId;
617617
}
618618

619+
public long getWorkflowTaskScheduledEventId() {
620+
return workflowTaskScheduledEventId;
621+
}
622+
619623
public long getWorkflowTaskStartedEventId() {
620624
return workflowTaskScheduledEventId + 1;
621625
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.statemachines;
22+
23+
import static org.junit.Assert.assertEquals;
24+
25+
import io.temporal.api.enums.v1.EventType;
26+
import io.temporal.api.history.v1.WorkflowTaskCompletedEventAttributes;
27+
import io.temporal.api.sdk.v1.WorkflowTaskCompletedMetadata;
28+
import io.temporal.serviceclient.Version;
29+
import java.util.Optional;
30+
import org.junit.Test;
31+
32+
public class WorkflowStateMachinesTest {
33+
private WorkflowStateMachines stateMachines;
34+
35+
private WorkflowStateMachines newStateMachines(TestEntityManagerListenerBase listener) {
36+
return new WorkflowStateMachines(listener, m -> {});
37+
}
38+
39+
private class TestActivityListener extends TestEntityManagerListenerBase {
40+
@Override
41+
public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
42+
builder.add(v -> stateMachines.completeWorkflow(Optional.empty()));
43+
}
44+
}
45+
46+
private void sdkNameAndVersionTest(
47+
String inputSdkVersion,
48+
String inputSdkName,
49+
String expectedSdkName,
50+
String expectedSdkVersion) {
51+
TestHistoryBuilder h = new TestHistoryBuilder();
52+
TestEntityManagerListenerBase listener = new TestActivityListener();
53+
stateMachines = newStateMachines(listener);
54+
55+
h.add(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED);
56+
h.addWorkflowTaskScheduledAndStarted();
57+
h.add(
58+
EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED,
59+
WorkflowTaskCompletedEventAttributes.newBuilder()
60+
.setScheduledEventId(h.getWorkflowTaskScheduledEventId())
61+
.setSdkMetadata(
62+
WorkflowTaskCompletedMetadata.newBuilder()
63+
.setSdkVersion(inputSdkVersion)
64+
.setSdkName(inputSdkName)));
65+
h.addWorkflowTaskScheduledAndStarted();
66+
assertEquals(2, h.getWorkflowTaskCount());
67+
68+
h.handleWorkflowTaskTakeCommands(stateMachines, 2);
69+
70+
assertEquals(expectedSdkName, stateMachines.sdkNameToWrite());
71+
assertEquals(expectedSdkVersion, stateMachines.sdkVersionToWrite());
72+
}
73+
74+
@Test
75+
public void testWritesSdkNameAndVersionWhenDifferent() {
76+
sdkNameAndVersionTest("hi", "skflajk", Version.SDK_NAME, Version.LIBRARY_VERSION);
77+
}
78+
79+
@Test
80+
public void doesNotWriteSdkNameAndVersionWhenSame() {
81+
sdkNameAndVersionTest(Version.LIBRARY_VERSION, Version.SDK_NAME, null, null);
82+
}
83+
84+
@Test
85+
public void writesOnlyNameIfChanged() {
86+
sdkNameAndVersionTest(Version.LIBRARY_VERSION, "sakflasjklf", Version.SDK_NAME, null);
87+
}
88+
89+
@Test
90+
public void writesOnlyVersionIfChanged() {
91+
sdkNameAndVersionTest("safklasjf", Version.SDK_NAME, null, Version.LIBRARY_VERSION);
92+
}
93+
}

0 commit comments

Comments
 (0)