Skip to content

Commit b8c4b7b

Browse files
Fix proto decoding in a Nexus Operation (#2281)
Fix proto decoding in a Nexus Operation
1 parent 93e30d7 commit b8c4b7b

File tree

8 files changed

+329
-7
lines changed

8 files changed

+329
-7
lines changed

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ ext {
3232
// Platforms
3333
grpcVersion = '1.54.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager
3434
jacksonVersion = '2.14.2' // [2.9.0,)
35-
nexusVersion = '0.2.0-alpha'
35+
nexusVersion = '0.2.1-alpha'
3636
// we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though.
3737
micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.13.6' : '1.9.9' // [1.0.0,)
3838

temporal-sdk/src/main/java/io/temporal/internal/nexus/PayloadSerializer.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.nexusrpc.Serializer;
2525
import io.temporal.api.common.v1.Payload;
2626
import io.temporal.common.converter.DataConverter;
27+
import java.lang.reflect.ParameterizedType;
2728
import java.lang.reflect.Type;
2829
import java.util.Optional;
2930
import javax.annotation.Nullable;
@@ -52,7 +53,14 @@ public Content serialize(@Nullable Object o) {
5253
public @Nullable Object deserialize(Content content, Type type) {
5354
try {
5455
Payload payload = Payload.parseFrom(content.getData());
55-
return dataConverter.fromPayload(payload, type.getClass(), type);
56+
if ((type instanceof Class)) {
57+
return dataConverter.fromPayload(payload, (Class<?>) type, type);
58+
} else if (type instanceof ParameterizedType) {
59+
return dataConverter.fromPayload(
60+
payload, (Class<?>) ((ParameterizedType) type).getRawType(), type);
61+
} else {
62+
throw new IllegalArgumentException("Unsupported type: " + type);
63+
}
5664
} catch (InvalidProtocolBufferException e) {
5765
throw new RuntimeException(e);
5866
}

temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperation.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public OperationStartResult<R> start(
7373
io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink);
7474
try {
7575
OperationStartResult.Builder<R> result =
76-
OperationStartResult.<R>newBuilder().setAsyncOperationId(workflowExec.getWorkflowId());
76+
OperationStartResult.<R>newAsyncBuilder(workflowExec.getWorkflowId());
7777
if (nexusLink != null) {
7878
result.addLink(nexusProtoLinkToLink(nexusLink));
7979
}

temporal-sdk/src/test/java/io/temporal/internal/nexus/PayloadSerializerTest.java

+39-1
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,13 @@
2020

2121
package io.temporal.internal.nexus;
2222

23+
import com.google.common.reflect.TypeToken;
24+
import io.temporal.api.common.v1.WorkflowExecution;
2325
import io.temporal.common.converter.DataConverter;
2426
import io.temporal.common.converter.DefaultDataConverter;
27+
import io.temporal.common.converter.EncodedValuesTest;
28+
import java.util.Collections;
29+
import java.util.Map;
2530
import org.junit.Assert;
2631
import org.junit.Test;
2732

@@ -39,7 +44,40 @@ public void testPayload() {
3944
@Test
4045
public void testNull() {
4146
PayloadSerializer.Content content = payloadSerializer.serialize(null);
42-
payloadSerializer.deserialize(content, String.class);
4347
Assert.assertEquals(null, payloadSerializer.deserialize(content, String.class));
4448
}
49+
50+
@Test
51+
public void testInteger() {
52+
PayloadSerializer.Content content = payloadSerializer.serialize(1);
53+
Assert.assertEquals(1, payloadSerializer.deserialize(content, Integer.class));
54+
}
55+
56+
@Test
57+
public void testArray() {
58+
String[] cars = {"test", "nexus", "serialization"};
59+
PayloadSerializer.Content content = payloadSerializer.serialize(cars);
60+
Assert.assertArrayEquals(
61+
cars, (String[]) payloadSerializer.deserialize(content, String[].class));
62+
}
63+
64+
@Test
65+
public void testHashMap() {
66+
Map<String, EncodedValuesTest.Pair> map =
67+
Collections.singletonMap("key", new EncodedValuesTest.Pair(1, "hello"));
68+
PayloadSerializer.Content content = payloadSerializer.serialize(map);
69+
Map<String, EncodedValuesTest.Pair> newMap =
70+
(Map<String, EncodedValuesTest.Pair>)
71+
payloadSerializer.deserialize(
72+
content, (new TypeToken<Map<String, EncodedValuesTest.Pair>>() {}).getType());
73+
Assert.assertTrue(newMap.get("key") instanceof EncodedValuesTest.Pair);
74+
}
75+
76+
@Test
77+
public void testProto() {
78+
WorkflowExecution exec =
79+
WorkflowExecution.newBuilder().setWorkflowId("id").setRunId("runId").build();
80+
PayloadSerializer.Content content = payloadSerializer.serialize(exec);
81+
Assert.assertEquals(exec, payloadSerializer.deserialize(content, WorkflowExecution.class));
82+
}
4583
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow.nexus;
22+
23+
import io.nexusrpc.Operation;
24+
import io.nexusrpc.Service;
25+
import io.nexusrpc.handler.OperationHandler;
26+
import io.nexusrpc.handler.OperationImpl;
27+
import io.nexusrpc.handler.ServiceImpl;
28+
import io.temporal.common.converter.EncodedValuesTest;
29+
import io.temporal.nexus.WorkflowClientOperationHandlers;
30+
import io.temporal.testing.internal.SDKTestWorkflowRule;
31+
import io.temporal.workflow.NexusServiceOptions;
32+
import io.temporal.workflow.Workflow;
33+
import io.temporal.workflow.shared.TestWorkflows;
34+
import java.util.Collections;
35+
import java.util.List;
36+
import org.junit.Assert;
37+
import org.junit.ClassRule;
38+
import org.junit.Test;
39+
40+
// Test an operation that takes and returns a List type with a non-primitive element type
41+
public class GenericListOperationTest {
42+
@ClassRule
43+
public static SDKTestWorkflowRule testWorkflowRule =
44+
SDKTestWorkflowRule.newBuilder()
45+
.setWorkflowTypes(TestNexus.class)
46+
.setNexusServiceImplementation(new TestNexusServiceImpl())
47+
.build();
48+
49+
@Test
50+
public void testOperation() {
51+
TestWorkflows.TestWorkflow1 workflowStub =
52+
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class);
53+
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
54+
Assert.assertEquals("hello", result);
55+
}
56+
57+
public static class TestNexus implements TestWorkflows.TestWorkflow1 {
58+
@Override
59+
public String execute(String input) {
60+
NexusServiceOptions serviceOptions =
61+
NexusServiceOptions.newBuilder()
62+
.setEndpoint(testWorkflowRule.getNexusEndpoint().getSpec().getName())
63+
.build();
64+
// Try to call with the typed stub
65+
TestNexusService serviceStub =
66+
Workflow.newNexusServiceStub(TestNexusService.class, serviceOptions);
67+
List<EncodedValuesTest.Pair> arg =
68+
Collections.singletonList(new EncodedValuesTest.Pair(1, "hello"));
69+
return serviceStub.operation(arg).get(0).getS();
70+
}
71+
}
72+
73+
@Service
74+
public interface TestNexusService {
75+
@Operation
76+
List<EncodedValuesTest.Pair> operation(List<EncodedValuesTest.Pair> input);
77+
}
78+
79+
@ServiceImpl(service = TestNexusService.class)
80+
public static class TestNexusServiceImpl {
81+
@OperationImpl
82+
public OperationHandler<List<EncodedValuesTest.Pair>, List<EncodedValuesTest.Pair>>
83+
operation() {
84+
return WorkflowClientOperationHandlers.sync(
85+
(context, details, client, input) -> {
86+
return input;
87+
});
88+
}
89+
}
90+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow.nexus;
22+
23+
import io.nexusrpc.Operation;
24+
import io.nexusrpc.Service;
25+
import io.nexusrpc.handler.OperationHandler;
26+
import io.nexusrpc.handler.OperationImpl;
27+
import io.nexusrpc.handler.ServiceImpl;
28+
import io.temporal.api.common.v1.WorkflowExecution;
29+
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
30+
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
31+
import io.temporal.nexus.WorkflowClientOperationHandlers;
32+
import io.temporal.testing.internal.SDKTestWorkflowRule;
33+
import io.temporal.workflow.*;
34+
import io.temporal.workflow.shared.TestWorkflows;
35+
import org.junit.Assert;
36+
import org.junit.Rule;
37+
import org.junit.Test;
38+
39+
// Test an operation that takes and returns a protobuf message
40+
public class ProtoOperationTest {
41+
@Rule
42+
public SDKTestWorkflowRule testWorkflowRule =
43+
SDKTestWorkflowRule.newBuilder()
44+
.setWorkflowTypes(TestNexus.class)
45+
.setNexusServiceImplementation(new TestNexusServiceImpl())
46+
.build();
47+
48+
@Test
49+
public void testDescribeWorkflowOperation() {
50+
TestWorkflows.TestWorkflow1 workflowStub =
51+
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class);
52+
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
53+
Assert.assertEquals(testWorkflowRule.getTaskQueue(), result);
54+
}
55+
56+
public static class TestNexus implements TestWorkflows.TestWorkflow1 {
57+
@Override
58+
public String execute(String input) {
59+
TestNexusService serviceStub = Workflow.newNexusServiceStub(TestNexusService.class);
60+
61+
WorkflowExecution exec =
62+
WorkflowExecution.newBuilder()
63+
.setWorkflowId(Workflow.getInfo().getWorkflowId())
64+
.setRunId(Workflow.getInfo().getRunId())
65+
.build();
66+
return serviceStub
67+
.describeWorkflow(
68+
DescribeWorkflowExecutionRequest.newBuilder()
69+
.setNamespace(Workflow.getInfo().getNamespace())
70+
.setExecution(exec)
71+
.build())
72+
.getExecutionConfig()
73+
.getTaskQueue()
74+
.getName();
75+
}
76+
}
77+
78+
@Service
79+
public interface TestNexusService {
80+
@Operation
81+
DescribeWorkflowExecutionResponse describeWorkflow(DescribeWorkflowExecutionRequest input);
82+
}
83+
84+
@ServiceImpl(service = TestNexusService.class)
85+
public class TestNexusServiceImpl {
86+
@OperationImpl
87+
public OperationHandler<DescribeWorkflowExecutionRequest, DescribeWorkflowExecutionResponse>
88+
describeWorkflow() {
89+
return WorkflowClientOperationHandlers.sync(
90+
(context, details, client, input) ->
91+
client.getWorkflowServiceStubs().blockingStub().describeWorkflowExecution(input));
92+
}
93+
}
94+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow.nexus;
22+
23+
import io.nexusrpc.Operation;
24+
import io.nexusrpc.Service;
25+
import io.nexusrpc.handler.OperationHandler;
26+
import io.nexusrpc.handler.OperationImpl;
27+
import io.nexusrpc.handler.ServiceImpl;
28+
import io.temporal.nexus.WorkflowClientOperationHandlers;
29+
import io.temporal.testing.internal.SDKTestWorkflowRule;
30+
import io.temporal.workflow.NexusServiceOptions;
31+
import io.temporal.workflow.NexusServiceStub;
32+
import io.temporal.workflow.Workflow;
33+
import io.temporal.workflow.shared.TestWorkflows;
34+
import org.junit.Assert;
35+
import org.junit.ClassRule;
36+
import org.junit.Test;
37+
38+
// Test an operation that takes and returns a void type
39+
public class VoidOperationTest {
40+
@ClassRule
41+
public static SDKTestWorkflowRule testWorkflowRule =
42+
SDKTestWorkflowRule.newBuilder()
43+
.setWorkflowTypes(TestNexus.class)
44+
.setNexusServiceImplementation(new TestNexusServiceImpl())
45+
.build();
46+
47+
@Test
48+
public void testVoidOperation() {
49+
TestWorkflows.TestWorkflow1 workflowStub =
50+
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class);
51+
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
52+
Assert.assertEquals("success", result);
53+
}
54+
55+
public static class TestNexus implements TestWorkflows.TestWorkflow1 {
56+
@Override
57+
public String execute(String input) {
58+
NexusServiceOptions serviceOptions =
59+
NexusServiceOptions.newBuilder()
60+
.setEndpoint(testWorkflowRule.getNexusEndpoint().getSpec().getName())
61+
.build();
62+
// Try to call with the typed stub
63+
TestNexusService serviceStub =
64+
Workflow.newNexusServiceStub(TestNexusService.class, serviceOptions);
65+
serviceStub.noop();
66+
// Try to call with an untyped stub
67+
NexusServiceStub untypedServiceStub =
68+
Workflow.newUntypedNexusServiceStub("TestNexusService", serviceOptions);
69+
untypedServiceStub.execute("noop", Void.class, null);
70+
untypedServiceStub.execute("noop", Void.class, Void.class, null);
71+
return "success";
72+
}
73+
}
74+
75+
@Service
76+
public interface TestNexusService {
77+
@Operation
78+
void noop();
79+
}
80+
81+
@ServiceImpl(service = TestNexusService.class)
82+
public static class TestNexusServiceImpl {
83+
@OperationImpl
84+
public OperationHandler<Void, Void> noop() {
85+
return WorkflowClientOperationHandlers.sync((context, details, client, input) -> null);
86+
}
87+
}
88+
}

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -3180,9 +3180,13 @@ private static PendingNexusOperationInfo constructPendingNexusOperationInfo(
31803180
.setScheduledEventId(data.scheduledEventId)
31813181
.setScheduleToCloseTimeout(data.scheduledEvent.getScheduleToCloseTimeout())
31823182
.setState(convertNexusOperationState(sm.getState(), data))
3183-
.setAttempt(data.getAttempt())
3184-
.setLastAttemptCompleteTime(data.lastAttemptCompleteTime)
3185-
.setNextAttemptScheduleTime(data.nextAttemptScheduleTime);
3183+
.setAttempt(data.getAttempt());
3184+
if (data.lastAttemptCompleteTime != null) {
3185+
builder.setLastAttemptCompleteTime(data.lastAttemptCompleteTime);
3186+
}
3187+
if (data.nextAttemptScheduleTime != null) {
3188+
builder.setNextAttemptScheduleTime(data.nextAttemptScheduleTime);
3189+
}
31863190

31873191
data.retryState.getPreviousRunFailure().ifPresent(builder::setLastAttemptFailure);
31883192

0 commit comments

Comments
 (0)