Skip to content

Commit 9a8894a

Browse files
Set LastHeartbeatDetails on activity failure (#2354)
1 parent ff333ca commit 9a8894a

12 files changed

+160
-13
lines changed

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactory.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@
2121
package io.temporal.internal.activity;
2222

2323
import com.uber.m3.tally.Scope;
24-
import io.temporal.activity.ActivityExecutionContext;
2524

2625
public interface ActivityExecutionContextFactory {
27-
ActivityExecutionContext createContext(ActivityInfoInternal info, Scope metricsScope);
26+
InternalActivityExecutionContext createContext(ActivityInfoInternal info, Scope metricsScope);
2827
}

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextFactoryImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
package io.temporal.internal.activity;
2222

2323
import com.uber.m3.tally.Scope;
24-
import io.temporal.activity.ActivityExecutionContext;
2524
import io.temporal.common.converter.DataConverter;
2625
import io.temporal.internal.client.external.ManualActivityCompletionClientFactory;
2726
import io.temporal.serviceclient.WorkflowServiceStubs;
@@ -61,7 +60,8 @@ public ActivityExecutionContextFactoryImpl(
6160
}
6261

6362
@Override
64-
public ActivityExecutionContext createContext(ActivityInfoInternal info, Scope metricsScope) {
63+
public InternalActivityExecutionContext createContext(
64+
ActivityInfoInternal info, Scope metricsScope) {
6565
return new ActivityExecutionContextImpl(
6666
service,
6767
namespace,

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityExecutionContextImpl.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
* @see ActivityExecutionContext
4646
*/
4747
@ThreadSafe
48-
class ActivityExecutionContextImpl implements ActivityExecutionContext {
48+
class ActivityExecutionContextImpl implements InternalActivityExecutionContext {
4949
private final Lock lock = new ReentrantLock();
5050
private final ManualActivityCompletionClientFactory manualCompletionClientFactory;
5151
private final Functions.Proc completionHandle;
@@ -165,4 +165,9 @@ public Scope getMetricsScope() {
165165
public ActivityInfo getInfo() {
166166
return info;
167167
}
168+
169+
@Override
170+
public Object getLastHeartbeatValue() {
171+
return heartbeatContext.getLastHeartbeatDetails();
172+
}
168173
}

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskExecutors.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import static io.temporal.internal.activity.ActivityTaskHandlerImpl.mapToActivityFailure;
2424

2525
import com.uber.m3.tally.Scope;
26-
import io.temporal.activity.ActivityExecutionContext;
2726
import io.temporal.activity.ActivityInfo;
2827
import io.temporal.activity.DynamicActivity;
2928
import io.temporal.api.common.v1.Payload;
@@ -76,7 +75,8 @@ public BaseActivityTaskExecutor(
7675

7776
@Override
7877
public ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metricsScope) {
79-
ActivityExecutionContext context = executionContextFactory.createContext(info, metricsScope);
78+
InternalActivityExecutionContext context =
79+
executionContextFactory.createContext(info, metricsScope);
8080
ActivityInfo activityInfo = context.getInfo();
8181
ActivitySerializationContext serializationContext =
8282
new ActivitySerializationContext(
@@ -133,7 +133,12 @@ public ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metri
133133
}
134134

135135
return mapToActivityFailure(
136-
ex, info.getActivityId(), metricsScope, local, dataConverterWithActivityContext);
136+
ex,
137+
info.getActivityId(),
138+
context.getLastHeartbeatValue(),
139+
metricsScope,
140+
local,
141+
dataConverterWithActivityContext);
137142
}
138143
}
139144

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskHandlerImpl.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,12 @@ public Result handle(ActivityTask activityTask, Scope metricsScope, boolean loca
137137
+ knownTypes);
138138
} catch (Exception exception) {
139139
return mapToActivityFailure(
140-
exception, pollResponse.getActivityId(), metricsScope, localActivity, dataConverter);
140+
exception,
141+
pollResponse.getActivityId(),
142+
null,
143+
metricsScope,
144+
localActivity,
145+
dataConverter);
141146
}
142147
}
143148

@@ -186,6 +191,7 @@ private void registerActivityImplementation(Object activity) {
186191
static ActivityTaskHandler.Result mapToActivityFailure(
187192
Throwable exception,
188193
String activityId,
194+
@Nullable Object lastHeartbeatDetails,
189195
Scope metricsScope,
190196
boolean isLocalActivity,
191197
DataConverter dataConverter) {
@@ -212,6 +218,9 @@ static ActivityTaskHandler.Result mapToActivityFailure(
212218
Failure failure = dataConverter.exceptionToFailure(exception);
213219
RespondActivityTaskFailedRequest.Builder result =
214220
RespondActivityTaskFailedRequest.newBuilder().setFailure(failure);
221+
if (lastHeartbeatDetails != null) {
222+
dataConverter.toPayloads(lastHeartbeatDetails).ifPresent(result::setLastHeartbeatDetails);
223+
}
215224
return new ActivityTaskHandler.Result(
216225
activityId,
217226
null,

temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContext.java

+2
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,6 @@ interface HeartbeatContext {
3535
* @see io.temporal.activity.ActivityExecutionContext#getHeartbeatDetails(Class, Type)
3636
*/
3737
<V> Optional<V> getHeartbeatDetails(Class<V> detailsClass, Type detailsGenericType);
38+
39+
Object getLastHeartbeatDetails();
3840
}

temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java

+13
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,19 @@ public <V> Optional<V> getHeartbeatDetails(Class<V> detailsClass, Type detailsGe
150150
}
151151
}
152152

153+
@Override
154+
public Object getLastHeartbeatDetails() {
155+
lock.lock();
156+
try {
157+
if (receivedAHeartbeat) {
158+
return this.lastDetails;
159+
}
160+
return null;
161+
} finally {
162+
lock.unlock();
163+
}
164+
}
165+
153166
private void doHeartBeatLocked(Object details) {
154167
long nextHeartbeatDelay;
155168
try {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.activity;
22+
23+
import io.temporal.activity.ActivityExecutionContext;
24+
25+
/**
26+
* Internal context object passed to an Activity implementation, providing more internal details
27+
* than the user facing {@link ActivityExecutionContext}.
28+
*/
29+
public interface InternalActivityExecutionContext extends ActivityExecutionContext {
30+
/** Get the latest value of {@link ActivityExecutionContext#heartbeat(Object)}. */
31+
Object getLastHeartbeatValue();
32+
}

temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextFactoryImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@
2121
package io.temporal.internal.activity;
2222

2323
import com.uber.m3.tally.Scope;
24-
import io.temporal.activity.ActivityExecutionContext;
2524

2625
public class LocalActivityExecutionContextFactoryImpl implements ActivityExecutionContextFactory {
2726

2827
public LocalActivityExecutionContextFactoryImpl() {}
2928

3029
@Override
31-
public ActivityExecutionContext createContext(ActivityInfoInternal info, Scope metricsScope) {
30+
public InternalActivityExecutionContext createContext(
31+
ActivityInfoInternal info, Scope metricsScope) {
3232
return new LocalActivityExecutionContextImpl(info, metricsScope);
3333
}
3434
}

temporal-sdk/src/main/java/io/temporal/internal/activity/LocalActivityExecutionContextImpl.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,13 @@
2121
package io.temporal.internal.activity;
2222

2323
import com.uber.m3.tally.Scope;
24-
import io.temporal.activity.ActivityExecutionContext;
2524
import io.temporal.activity.ActivityInfo;
2625
import io.temporal.activity.ManualActivityCompletionClient;
2726
import io.temporal.client.ActivityCompletionException;
2827
import java.lang.reflect.Type;
2928
import java.util.Optional;
3029

31-
class LocalActivityExecutionContextImpl implements ActivityExecutionContext {
30+
class LocalActivityExecutionContextImpl implements InternalActivityExecutionContext {
3231
private final ActivityInfo info;
3332
private final Scope metricsScope;
3433

@@ -88,4 +87,9 @@ public ManualActivityCompletionClient useLocalManualCompletion() {
8887
public Scope getMetricsScope() {
8988
return metricsScope;
9089
}
90+
91+
@Override
92+
public Object getLastHeartbeatValue() {
93+
return null;
94+
}
9195
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.activity;
22+
23+
import io.temporal.testing.internal.SDKTestOptions;
24+
import io.temporal.testing.internal.SDKTestWorkflowRule;
25+
import io.temporal.workflow.Workflow;
26+
import io.temporal.workflow.shared.TestActivities;
27+
import io.temporal.workflow.shared.TestWorkflows;
28+
import org.junit.Rule;
29+
import org.junit.Test;
30+
31+
public class ActivityHeartbeatSentOnFailureTest {
32+
33+
@Rule
34+
public SDKTestWorkflowRule testWorkflowRule =
35+
SDKTestWorkflowRule.newBuilder()
36+
.setWorkflowTypes(TestWorkflowImpl.class)
37+
.setActivityImplementations(new HeartBeatingActivityImpl())
38+
.build();
39+
40+
/** Tests that the last Activity#heartbeat value is sent if the activity fails. */
41+
@Test
42+
public void activityHeartbeatSentOnFailure() {
43+
TestWorkflows.NoArgsWorkflow workflow =
44+
testWorkflowRule.newWorkflowStub(TestWorkflows.NoArgsWorkflow.class);
45+
workflow.execute();
46+
}
47+
48+
public static class TestWorkflowImpl implements TestWorkflows.NoArgsWorkflow {
49+
50+
private final TestActivities.NoArgsActivity activities =
51+
Workflow.newActivityStub(
52+
TestActivities.NoArgsActivity.class,
53+
SDKTestOptions.newActivityOptions20sScheduleToClose());
54+
55+
@Override
56+
public void execute() {
57+
activities.execute();
58+
}
59+
}
60+
61+
public static class HeartBeatingActivityImpl implements TestActivities.NoArgsActivity {
62+
@Override
63+
public void execute() {
64+
// If the heartbeat details are "3", then we know that the last heartbeat was sent.
65+
if (Activity.getExecutionContext().getHeartbeatDetails(String.class).orElse("").equals("3")) {
66+
return;
67+
}
68+
// Send 3 heartbeats and then fail, expecting the last heartbeat to be sent
69+
// even though the activity fails and the last two attempts would normally be throttled.
70+
Activity.getExecutionContext().heartbeat("1");
71+
Activity.getExecutionContext().heartbeat("2");
72+
Activity.getExecutionContext().heartbeat("3");
73+
throw new RuntimeException("simulated failure");
74+
}
75+
}
76+
}

temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java

+2
Original file line numberDiff line numberDiff line change
@@ -1956,9 +1956,11 @@ private static State failActivityTask(
19561956
RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
19571957
if (request instanceof RespondActivityTaskFailedRequest) {
19581958
RespondActivityTaskFailedRequest req = (RespondActivityTaskFailedRequest) request;
1959+
data.heartbeatDetails = req.getLastHeartbeatDetails();
19591960
return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
19601961
} else if (request instanceof RespondActivityTaskFailedByIdRequest) {
19611962
RespondActivityTaskFailedByIdRequest req = (RespondActivityTaskFailedByIdRequest) request;
1963+
data.heartbeatDetails = req.getLastHeartbeatDetails();
19621964
return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
19631965
} else {
19641966
throw new IllegalArgumentException("Unknown request: " + request);

0 commit comments

Comments
 (0)