Skip to content

Commit 7073375

Browse files
Add operation Id to callback headers (#2336)
Add operation Id to callback headers
1 parent 30f391f commit 7073375

File tree

2 files changed

+19
-2
lines changed

2 files changed

+19
-2
lines changed

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ ext {
3131
// Platforms
3232
grpcVersion = '1.54.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager
3333
jacksonVersion = '2.14.2' // [2.9.0,)
34-
nexusVersion = '0.3.0-alpha' // [0.1.0,)
34+
nexusVersion = '0.3.0-alpha'
3535
// we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though.
3636
micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.13.6' : '1.9.9' // [1.0.0,)
3737

temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121
package io.temporal.internal.common;
2222

2323
import com.google.common.base.Defaults;
24+
import io.nexusrpc.Header;
2425
import io.temporal.api.common.v1.Callback;
2526
import io.temporal.api.enums.v1.TaskQueueKind;
2627
import io.temporal.api.taskqueue.v1.TaskQueue;
2728
import io.temporal.client.WorkflowOptions;
2829
import io.temporal.client.WorkflowStub;
2930
import io.temporal.internal.client.NexusStartWorkflowRequest;
3031
import java.util.Arrays;
32+
import java.util.Map;
33+
import java.util.TreeMap;
3134
import java.util.stream.Collectors;
3235
import org.slf4j.Logger;
3336
import org.slf4j.LoggerFactory;
@@ -78,6 +81,20 @@ public static WorkflowStub createNexusBoundStub(
7881
throw new IllegalArgumentException(
7982
"WorkflowId is expected to be set on WorkflowOptions when used with Nexus");
8083
}
84+
// Add the Nexus operation ID to the headers if it is not already present to support fabricating
85+
// a NexusOperationStarted event if the completion is received before the response to a
86+
// StartOperation request.
87+
Map<String, String> headers =
88+
request.getCallbackHeaders().entrySet().stream()
89+
.collect(
90+
Collectors.toMap(
91+
(k) -> k.getKey().toLowerCase(),
92+
Map.Entry::getValue,
93+
(a, b) -> a,
94+
() -> new TreeMap<>(String.CASE_INSENSITIVE_ORDER)));
95+
if (!headers.containsKey(Header.OPERATION_ID)) {
96+
headers.put(Header.OPERATION_ID.toLowerCase(), options.getWorkflowId());
97+
}
8198
WorkflowOptions.Builder nexusWorkflowOptions =
8299
WorkflowOptions.newBuilder(options)
83100
.setRequestId(request.getRequestId())
@@ -87,7 +104,7 @@ public static WorkflowStub createNexusBoundStub(
87104
.setNexus(
88105
Callback.Nexus.newBuilder()
89106
.setUrl(request.getCallbackUrl())
90-
.putAllHeader(request.getCallbackHeaders())
107+
.putAllHeader(headers)
91108
.build())
92109
.build()));
93110
if (options.getTaskQueue() == null) {

0 commit comments

Comments
 (0)