Skip to content

Commit 6814906

Browse files
Ensure heartbeat details aren't cleared (#2460)
1 parent cdd6497 commit 6814906

File tree

2 files changed

+115
-2
lines changed

2 files changed

+115
-2
lines changed

temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -1989,11 +1989,15 @@ private static State failActivityTask(
19891989
RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
19901990
if (request instanceof RespondActivityTaskFailedRequest) {
19911991
RespondActivityTaskFailedRequest req = (RespondActivityTaskFailedRequest) request;
1992-
data.heartbeatDetails = req.getLastHeartbeatDetails();
1992+
if (req.hasLastHeartbeatDetails()) {
1993+
data.heartbeatDetails = req.getLastHeartbeatDetails();
1994+
}
19931995
return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
19941996
} else if (request instanceof RespondActivityTaskFailedByIdRequest) {
19951997
RespondActivityTaskFailedByIdRequest req = (RespondActivityTaskFailedByIdRequest) request;
1996-
data.heartbeatDetails = req.getLastHeartbeatDetails();
1998+
if (req.hasLastHeartbeatDetails()) {
1999+
data.heartbeatDetails = req.getLastHeartbeatDetails();
2000+
}
19972001
return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
19982002
} else {
19992003
throw new IllegalArgumentException("Unknown request: " + request);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.testserver.functional.activity;
22+
23+
import static org.junit.Assert.assertEquals;
24+
import static org.junit.Assert.assertFalse;
25+
26+
import com.google.protobuf.ByteString;
27+
import io.temporal.activity.Activity;
28+
import io.temporal.activity.ActivityInfo;
29+
import io.temporal.activity.ActivityOptions;
30+
import io.temporal.api.common.v1.Payloads;
31+
import io.temporal.api.workflowservice.v1.RecordActivityTaskHeartbeatRequest;
32+
import io.temporal.common.RetryOptions;
33+
import io.temporal.common.converter.DefaultDataConverter;
34+
import io.temporal.failure.ActivityFailure;
35+
import io.temporal.testing.internal.SDKTestWorkflowRule;
36+
import io.temporal.testserver.functional.common.TestActivities;
37+
import io.temporal.testserver.functional.common.TestWorkflows;
38+
import io.temporal.workflow.Workflow;
39+
import java.time.Duration;
40+
import java.util.Optional;
41+
import java.util.concurrent.ConcurrentLinkedQueue;
42+
import org.junit.Rule;
43+
import org.junit.Test;
44+
45+
public class ActivityHeartbeat {
46+
private static final ConcurrentLinkedQueue<Optional<Payloads>> activityHeartbeats =
47+
new ConcurrentLinkedQueue<>();
48+
49+
@Rule
50+
public SDKTestWorkflowRule testWorkflowRule =
51+
SDKTestWorkflowRule.newBuilder()
52+
.setWorkflowTypes(TestWorkflow.class)
53+
.setActivityImplementations(new TestActivity())
54+
.build();
55+
56+
@Test
57+
public void testActivityHeartbeatNoLastHeartbeatDetails() {
58+
// Test that when last heartbeat details are not set on failure, the test server
59+
// clear the heartbeat details.
60+
String result =
61+
testWorkflowRule.newWorkflowStub(TestWorkflows.WorkflowReturnsString.class).execute();
62+
assertEquals("", result);
63+
assertEquals(2, activityHeartbeats.size());
64+
assertFalse(activityHeartbeats.poll().isPresent());
65+
assertEquals(
66+
"heartbeat details",
67+
DefaultDataConverter.STANDARD_INSTANCE.fromPayloads(
68+
0, activityHeartbeats.poll(), String.class, String.class));
69+
}
70+
71+
public static class TestActivity implements TestActivities.ActivityReturnsString {
72+
@Override
73+
public String execute() {
74+
ActivityInfo info = Activity.getExecutionContext().getInfo();
75+
activityHeartbeats.add(info.getHeartbeatDetails());
76+
// Heartbeat with the raw service stub to avoid the SDK keeping track of the heartbeat
77+
Activity.getExecutionContext()
78+
.getWorkflowClient()
79+
.getWorkflowServiceStubs()
80+
.blockingStub()
81+
.recordActivityTaskHeartbeat(
82+
RecordActivityTaskHeartbeatRequest.newBuilder()
83+
.setNamespace(info.getNamespace())
84+
.setTaskToken(ByteString.copyFrom(info.getTaskToken()))
85+
.setDetails(
86+
DefaultDataConverter.STANDARD_INSTANCE.toPayloads("heartbeat details").get())
87+
.build());
88+
throw new IllegalStateException("simulated failure");
89+
}
90+
}
91+
92+
public static class TestWorkflow implements TestWorkflows.WorkflowReturnsString {
93+
@Override
94+
public String execute() {
95+
ActivityOptions options =
96+
ActivityOptions.newBuilder()
97+
.setStartToCloseTimeout(Duration.ofSeconds(10))
98+
.setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(2).build())
99+
.build();
100+
101+
try {
102+
Workflow.newActivityStub(TestActivities.ActivityReturnsString.class, options).execute();
103+
} catch (ActivityFailure e) {
104+
// Expected
105+
}
106+
return "";
107+
}
108+
}
109+
}

0 commit comments

Comments
 (0)