Skip to content

Commit d0cf3f3

Browse files
Add support for attaching to a running workflow (#2424)
Add support for attaching to a running workflow
1 parent 729e25e commit d0cf3f3

12 files changed

+718
-26
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.client;
22+
23+
import com.google.common.base.Preconditions;
24+
import io.temporal.common.Experimental;
25+
import java.util.Objects;
26+
27+
/**
28+
* OnConflictOptions specifies the actions to be taken when using the {@link
29+
* io.temporal.api.enums.v1.WorkflowIdConflictPolicy#WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING}
30+
*/
31+
@Experimental
32+
public class OnConflictOptions {
33+
public static OnConflictOptions.Builder newBuilder() {
34+
return new OnConflictOptions.Builder();
35+
}
36+
37+
public static OnConflictOptions.Builder newBuilder(OnConflictOptions options) {
38+
return new OnConflictOptions.Builder(options);
39+
}
40+
41+
public static OnConflictOptions getDefaultInstance() {
42+
return DEFAULT_INSTANCE;
43+
}
44+
45+
private static final OnConflictOptions DEFAULT_INSTANCE;
46+
47+
static {
48+
DEFAULT_INSTANCE = OnConflictOptions.newBuilder().build();
49+
}
50+
51+
private final boolean attachRequestId;
52+
private final boolean attachCompletionCallbacks;
53+
private final boolean attachLinks;
54+
55+
private OnConflictOptions(
56+
boolean attachRequestId, boolean attachCompletionCallbacks, boolean attachLinks) {
57+
this.attachRequestId = attachRequestId;
58+
this.attachCompletionCallbacks = attachCompletionCallbacks;
59+
this.attachLinks = attachLinks;
60+
}
61+
62+
public boolean isAttachRequestId() {
63+
return attachRequestId;
64+
}
65+
66+
public boolean isAttachCompletionCallbacks() {
67+
return attachCompletionCallbacks;
68+
}
69+
70+
public boolean isAttachLinks() {
71+
return attachLinks;
72+
}
73+
74+
@Override
75+
public boolean equals(Object o) {
76+
if (this == o) return true;
77+
if (o == null || getClass() != o.getClass()) return false;
78+
OnConflictOptions that = (OnConflictOptions) o;
79+
return attachRequestId == that.attachRequestId
80+
&& attachCompletionCallbacks == that.attachCompletionCallbacks
81+
&& attachLinks == that.attachLinks;
82+
}
83+
84+
@Override
85+
public int hashCode() {
86+
return Objects.hash(attachRequestId, attachCompletionCallbacks, attachLinks);
87+
}
88+
89+
@Override
90+
public String toString() {
91+
return "OnConflictOptions{"
92+
+ "attachRequestId="
93+
+ attachRequestId
94+
+ ", attachCompletionCallbacks="
95+
+ attachCompletionCallbacks
96+
+ ", attachLinks="
97+
+ attachLinks
98+
+ '}';
99+
}
100+
101+
public static final class Builder {
102+
private boolean attachRequestId;
103+
private boolean attachCompletionCallbacks;
104+
private boolean attachLinks;
105+
106+
public Builder(OnConflictOptions options) {
107+
this.attachRequestId = options.attachRequestId;
108+
this.attachCompletionCallbacks = options.attachCompletionCallbacks;
109+
this.attachLinks = options.attachLinks;
110+
}
111+
112+
public Builder() {}
113+
114+
/** Attaches the request ID to the running workflow. */
115+
public Builder setAttachRequestId(boolean attachRequestId) {
116+
this.attachRequestId = attachRequestId;
117+
return this;
118+
}
119+
120+
/**
121+
* Attaches the completion callbacks to the running workflow. If true, AttachRequestId must be
122+
* true.
123+
*/
124+
public Builder setAttachCompletionCallbacks(boolean attachCompletionCallbacks) {
125+
this.attachCompletionCallbacks = attachCompletionCallbacks;
126+
return this;
127+
}
128+
129+
/** Attaches the links to the WorkflowExecutionOptionsUpdatedEvent history event. */
130+
public Builder setAttachLinks(boolean attachLinks) {
131+
this.attachLinks = attachLinks;
132+
return this;
133+
}
134+
135+
public OnConflictOptions build() {
136+
if (attachCompletionCallbacks) {
137+
Preconditions.checkState(
138+
attachRequestId, "AttachRequestId must be true if AttachCompletionCallbacks is true");
139+
}
140+
return new OnConflictOptions(attachRequestId, attachCompletionCallbacks, attachLinks);
141+
}
142+
}
143+
}

temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java

+37-5
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public static WorkflowOptions merge(
8484
.setRequestId(o.getRequestId())
8585
.setCompletionCallbacks(o.getCompletionCallbacks())
8686
.setLinks(o.getLinks())
87+
.setOnConflictOptions(o.getOnConflictOptions())
8788
.validateBuildWithDefaults();
8889
}
8990

@@ -129,6 +130,8 @@ public static final class Builder {
129130

130131
private List<Link> links;
131132

133+
private OnConflictOptions onConflictOptions;
134+
132135
private Builder() {}
133136

134137
private Builder(WorkflowOptions options) {
@@ -155,6 +158,7 @@ private Builder(WorkflowOptions options) {
155158
this.requestId = options.requestId;
156159
this.completionCallbacks = options.completionCallbacks;
157160
this.links = options.links;
161+
this.onConflictOptions = options.onConflictOptions;
158162
}
159163

160164
/**
@@ -460,6 +464,20 @@ public Builder setLinks(List<Link> links) {
460464
return this;
461465
}
462466

467+
/**
468+
* Set workflow ID conflict options used in conjunction with conflict policy
469+
* WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING. If onConflictOptions is set and a workflow is
470+
* already running, the options specifies the actions to be taken on the running workflow. If
471+
* not set or use together with any other WorkflowIDConflictPolicy, this parameter is ignored.
472+
*
473+
* <p>WARNING: Not intended for User Code.
474+
*/
475+
@Experimental
476+
public Builder setOnConflictOptions(OnConflictOptions onConflictOptions) {
477+
this.onConflictOptions = onConflictOptions;
478+
return this;
479+
}
480+
463481
public WorkflowOptions build() {
464482
return new WorkflowOptions(
465483
workflowId,
@@ -481,7 +499,8 @@ public WorkflowOptions build() {
481499
staticDetails,
482500
requestId,
483501
completionCallbacks,
484-
links);
502+
links,
503+
onConflictOptions);
485504
}
486505

487506
/**
@@ -508,7 +527,8 @@ public WorkflowOptions validateBuildWithDefaults() {
508527
staticDetails,
509528
requestId,
510529
completionCallbacks,
511-
links);
530+
links,
531+
onConflictOptions);
512532
}
513533
}
514534

@@ -551,6 +571,7 @@ public WorkflowOptions validateBuildWithDefaults() {
551571
private final List<Callback> completionCallbacks;
552572

553573
private final List<Link> links;
574+
private final OnConflictOptions onConflictOptions;
554575

555576
private WorkflowOptions(
556577
String workflowId,
@@ -572,7 +593,8 @@ private WorkflowOptions(
572593
String staticDetails,
573594
String requestId,
574595
List<Callback> completionCallbacks,
575-
List<Link> links) {
596+
List<Link> links,
597+
OnConflictOptions onConflictOptions) {
576598
this.workflowId = workflowId;
577599
this.workflowIdReusePolicy = workflowIdReusePolicy;
578600
this.workflowRunTimeout = workflowRunTimeout;
@@ -593,6 +615,7 @@ private WorkflowOptions(
593615
this.requestId = requestId;
594616
this.completionCallbacks = completionCallbacks;
595617
this.links = links;
618+
this.onConflictOptions = onConflictOptions;
596619
}
597620

598621
public String getWorkflowId() {
@@ -689,6 +712,11 @@ public String getStaticDetails() {
689712
return staticDetails;
690713
}
691714

715+
@Experimental
716+
public @Nullable OnConflictOptions getOnConflictOptions() {
717+
return onConflictOptions;
718+
}
719+
692720
public Builder toBuilder() {
693721
return new Builder(this);
694722
}
@@ -717,7 +745,8 @@ public boolean equals(Object o) {
717745
&& Objects.equal(staticDetails, that.staticDetails)
718746
&& Objects.equal(requestId, that.requestId)
719747
&& Objects.equal(completionCallbacks, that.completionCallbacks)
720-
&& Objects.equal(links, that.links);
748+
&& Objects.equal(links, that.links)
749+
&& Objects.equal(onConflictOptions, that.onConflictOptions);
721750
}
722751

723752
@Override
@@ -742,7 +771,8 @@ public int hashCode() {
742771
staticDetails,
743772
requestId,
744773
completionCallbacks,
745-
links);
774+
links,
775+
onConflictOptions);
746776
}
747777

748778
@Override
@@ -791,6 +821,8 @@ public String toString() {
791821
+ completionCallbacks
792822
+ ", links="
793823
+ links
824+
+ ", onConflictOptions="
825+
+ onConflictOptions
794826
+ '}';
795827
}
796828
}

temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientRequestFactory.java

+11
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.temporal.api.enums.v1.HistoryEventFilterType;
3131
import io.temporal.api.sdk.v1.UserMetadata;
3232
import io.temporal.api.taskqueue.v1.TaskQueue;
33+
import io.temporal.api.workflow.v1.OnConflictOptions;
3334
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest;
3435
import io.temporal.api.workflowservice.v1.SignalWithStartWorkflowExecutionRequest;
3536
import io.temporal.api.workflowservice.v1.StartWorkflowExecutionRequest;
@@ -100,6 +101,16 @@ StartWorkflowExecutionRequest.Builder newStartWorkflowExecutionRequest(
100101
options.getLinks().forEach(request::addLinks);
101102
}
102103

104+
if (options.getOnConflictOptions() != null) {
105+
OnConflictOptions.Builder onConflictOptions =
106+
OnConflictOptions.newBuilder()
107+
.setAttachRequestId(options.getOnConflictOptions().isAttachRequestId())
108+
.setAttachLinks(options.getOnConflictOptions().isAttachLinks())
109+
.setAttachCompletionCallbacks(
110+
options.getOnConflictOptions().isAttachCompletionCallbacks());
111+
request.setOnConflictOptions(onConflictOptions);
112+
}
113+
103114
String taskQueue = options.getTaskQueue();
104115
if (taskQueue != null && !taskQueue.isEmpty()) {
105116
request.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue).build());

temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java

+7
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.temporal.api.common.v1.Callback;
2727
import io.temporal.api.enums.v1.TaskQueueKind;
2828
import io.temporal.api.taskqueue.v1.TaskQueue;
29+
import io.temporal.client.OnConflictOptions;
2930
import io.temporal.client.WorkflowOptions;
3031
import io.temporal.client.WorkflowStub;
3132
import io.temporal.common.metadata.POJOActivityMethodMetadata;
@@ -144,6 +145,12 @@ public static WorkflowStub createNexusBoundStub(
144145
.filter(link -> link != null)
145146
.collect(Collectors.toList()));
146147
}
148+
nexusWorkflowOptions.setOnConflictOptions(
149+
OnConflictOptions.newBuilder()
150+
.setAttachRequestId(true)
151+
.setAttachLinks(true)
152+
.setAttachCompletionCallbacks(true)
153+
.build());
147154
return stub.newInstance(nexusWorkflowOptions.build());
148155
}
149156

temporal-sdk/src/main/java/io/temporal/internal/statemachines/NexusOperationStateMachine.java

+19-17
Original file line numberDiff line numberDiff line change
@@ -186,46 +186,48 @@ private void cancelNexusOperationCommand() {
186186
}
187187

188188
private void notifyStarted() {
189-
if (!async) {
190-
if (currentEvent.getEventType() != EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED) {
191-
startedCallback.apply(Optional.empty(), null);
192-
} else {
193-
async = true;
194-
String operationToken =
195-
currentEvent.getNexusOperationStartedEventAttributes().getOperationToken();
196-
String operationId =
197-
currentEvent.getNexusOperationStartedEventAttributes().getOperationId();
198-
startedCallback.apply(
199-
Optional.of(operationToken.isEmpty() ? operationId : operationToken), null);
200-
}
201-
}
189+
async = true;
190+
String operationToken =
191+
currentEvent.getNexusOperationStartedEventAttributes().getOperationToken();
192+
// TODO(#2423) Remove support for operationId
193+
String operationId = currentEvent.getNexusOperationStartedEventAttributes().getOperationId();
194+
startedCallback.apply(
195+
Optional.of(operationToken.isEmpty() ? operationId : operationToken), null);
202196
}
203197

204198
private void notifyCompleted() {
205-
notifyStarted();
206199
NexusOperationCompletedEventAttributes attributes =
207200
currentEvent.getNexusOperationCompletedEventAttributes();
201+
if (!async) {
202+
startedCallback.apply(Optional.empty(), null);
203+
}
208204
completionCallback.apply(Optional.of(attributes.getResult()), null);
209205
}
210206

211207
private void notifyFailed() {
212-
notifyStarted();
213208
NexusOperationFailedEventAttributes attributes =
214209
currentEvent.getNexusOperationFailedEventAttributes();
210+
if (!async) {
211+
startedCallback.apply(Optional.empty(), attributes.getFailure());
212+
}
215213
completionCallback.apply(Optional.empty(), attributes.getFailure());
216214
}
217215

218216
private void notifyCanceled() {
219-
notifyStarted();
220217
NexusOperationCanceledEventAttributes attributes =
221218
currentEvent.getNexusOperationCanceledEventAttributes();
219+
if (!async) {
220+
startedCallback.apply(Optional.empty(), attributes.getFailure());
221+
}
222222
completionCallback.apply(Optional.empty(), attributes.getFailure());
223223
}
224224

225225
private void notifyTimedOut() {
226-
notifyStarted();
227226
NexusOperationTimedOutEventAttributes attributes =
228227
currentEvent.getNexusOperationTimedOutEventAttributes();
228+
if (!async) {
229+
startedCallback.apply(Optional.empty(), attributes.getFailure());
230+
}
229231
completionCallback.apply(Optional.empty(), attributes.getFailure());
230232
}
231233

temporal-sdk/src/main/java/io/temporal/nexus/WorkflowHandleFactory.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
public interface WorkflowHandleFactory<T, R> {
3434
/**
3535
* Invoked every operation start call and expected to return a workflow handle to a workflow stub
36-
* through the provided {@link WorkflowClient}.
36+
* created with the {@link WorkflowClient} provided by {@link
37+
* NexusOperationContext#getWorkflowClient()}.
3738
*/
3839
@Nullable
3940
WorkflowHandle<R> apply(OperationContext context, OperationStartDetails details, T input);

0 commit comments

Comments
 (0)