Skip to content

Commit 0fd824f

Browse files
committed
Simplify code and error handling between ActivityWorker and LocalActivityWorker
Make LocalActivityWorker more robust to unexpected errors Issue #1262
1 parent 1b1c1d4 commit 0fd824f

22 files changed

+743
-608
lines changed

temporal-sdk/src/main/java/io/temporal/common/interceptors/Header.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@
2323
import io.temporal.api.common.v1.Payload;
2424
import java.util.HashMap;
2525
import java.util.Map;
26+
import javax.annotation.Nonnull;
2627

2728
public class Header {
2829
private final Map<String, Payload> values;
2930

30-
public Header(io.temporal.api.common.v1.Header header) {
31+
public Header(@Nonnull io.temporal.api.common.v1.Header header) {
3132
values = header.getFieldsMap();
3233
}
3334

temporal-sdk/src/main/java/io/temporal/failure/SimulatedTimeoutFailure.java

+5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@
2323
import com.google.common.annotations.VisibleForTesting;
2424

2525
/** Internal do not use in application code. */
26+
// TODO this class leaked from testing package into the main codebase during a past fat finger
27+
// refactoring.
28+
// This exception is created currently in a place which should be never reachable.
29+
// This exception is not used by any testing code.
30+
// Will be removed in a separate PR
2631
@VisibleForTesting
2732
public final class SimulatedTimeoutFailure extends RuntimeException {
2833

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityInfoImpl.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,10 @@ public Optional<Payloads> getInput() {
145145
}
146146

147147
@Override
148-
public Header getHeader() {
149-
return response.getHeader();
148+
public Optional<Header> getHeader() {
149+
if (response.hasHeader()) {
150+
return Optional.of(response.getHeader());
151+
}
152+
return Optional.empty();
150153
}
151154
}

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityInfoInternal.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,5 @@ interface ActivityInfoInternal extends ActivityInfo {
4646
/**
4747
* @return header that is passed with the activity execution
4848
*/
49-
Header getHeader();
49+
Optional<Header> getHeader();
5050
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.activity;
22+
23+
import static io.temporal.internal.activity.ActivityTaskHandlerImpl.mapToActivityFailure;
24+
25+
import com.uber.m3.tally.Scope;
26+
import io.temporal.activity.ActivityExecutionContext;
27+
import io.temporal.activity.DynamicActivity;
28+
import io.temporal.api.common.v1.Payload;
29+
import io.temporal.api.common.v1.Payloads;
30+
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest;
31+
import io.temporal.client.ActivityCanceledException;
32+
import io.temporal.common.context.ContextPropagator;
33+
import io.temporal.common.converter.DataConverter;
34+
import io.temporal.common.converter.EncodedValues;
35+
import io.temporal.common.interceptors.ActivityInboundCallsInterceptor;
36+
import io.temporal.common.interceptors.ActivityInboundCallsInterceptor.ActivityOutput;
37+
import io.temporal.common.interceptors.Header;
38+
import io.temporal.common.interceptors.WorkerInterceptor;
39+
import io.temporal.internal.worker.ActivityTaskHandler;
40+
import io.temporal.serviceclient.CheckedExceptionWrapper;
41+
import java.lang.reflect.Method;
42+
import java.util.HashMap;
43+
import java.util.List;
44+
import java.util.Map;
45+
import java.util.Optional;
46+
import javax.annotation.Nonnull;
47+
import javax.annotation.Nullable;
48+
import org.slf4j.Logger;
49+
import org.slf4j.LoggerFactory;
50+
51+
final class ActivityTaskExecutors {
52+
static final Logger log = LoggerFactory.getLogger(ActivityTaskExecutor.class);
53+
54+
interface ActivityTaskExecutor {
55+
ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metricsScope);
56+
}
57+
58+
abstract static class BaseActivityTaskExecutor implements ActivityTaskExecutor {
59+
protected final DataConverter dataConverter;
60+
private final List<ContextPropagator> contextPropagators;
61+
private final WorkerInterceptor[] interceptors;
62+
private final ActivityExecutionContextFactory executionContextFactory;
63+
64+
public BaseActivityTaskExecutor(
65+
DataConverter dataConverter,
66+
List<ContextPropagator> contextPropagators,
67+
WorkerInterceptor[] interceptors,
68+
ActivityExecutionContextFactory executionContextFactory) {
69+
this.dataConverter = dataConverter;
70+
this.contextPropagators = contextPropagators;
71+
this.interceptors = interceptors;
72+
this.executionContextFactory = executionContextFactory;
73+
}
74+
75+
@Override
76+
public ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metricsScope) {
77+
ActivityExecutionContext context = executionContextFactory.createContext(info, metricsScope);
78+
Optional<Payloads> input = info.getInput();
79+
80+
try {
81+
info.getHeader()
82+
.ifPresent(value -> deserializeAndPopulateContext(value, contextPropagators));
83+
84+
ActivityInboundCallsInterceptor inboundCallsInterceptor = createRootInboundInterceptor();
85+
for (WorkerInterceptor interceptor : interceptors) {
86+
inboundCallsInterceptor = interceptor.interceptActivity(inboundCallsInterceptor);
87+
}
88+
inboundCallsInterceptor.init(context);
89+
90+
Object[] args = provideArgs(input);
91+
Header header =
92+
new Header(
93+
info.getHeader().orElse(io.temporal.api.common.v1.Header.getDefaultInstance()));
94+
ActivityOutput result =
95+
inboundCallsInterceptor.execute(
96+
new ActivityInboundCallsInterceptor.ActivityInput(header, args));
97+
if (context.isDoNotCompleteOnReturn()) {
98+
return new ActivityTaskHandler.Result(
99+
info.getActivityId(), null, null, null, null, context.isUseLocalManualCompletion());
100+
}
101+
102+
return this.constructSuccessfulResultValue(info, result);
103+
} catch (Throwable e) {
104+
Throwable ex = CheckedExceptionWrapper.unwrap(e);
105+
boolean local = info.isLocal();
106+
if (ex instanceof ActivityCanceledException) {
107+
log.info(
108+
"{} canceled. ActivityId={}, activityType={}, attempt={}",
109+
local ? "Local activity" : "Activity",
110+
info.getActivityId(),
111+
info.getActivityType(),
112+
info.getAttempt());
113+
} else {
114+
log.warn(
115+
"{} failure. ActivityId={}, activityType={}, attempt={}",
116+
local ? "Local activity" : "Activity",
117+
info.getActivityId(),
118+
info.getActivityType(),
119+
info.getAttempt(),
120+
ex);
121+
}
122+
123+
return mapToActivityFailure(ex, info.getActivityId(), metricsScope, local, dataConverter);
124+
}
125+
}
126+
127+
abstract ActivityInboundCallsInterceptor createRootInboundInterceptor();
128+
129+
abstract Object[] provideArgs(Optional<Payloads> input);
130+
131+
protected abstract ActivityTaskHandler.Result constructSuccessfulResultValue(
132+
ActivityInfoInternal info, ActivityOutput result);
133+
134+
ActivityTaskHandler.Result constructResultValue(
135+
ActivityInfoInternal info, @Nullable ActivityOutput result) {
136+
RespondActivityTaskCompletedRequest.Builder request =
137+
RespondActivityTaskCompletedRequest.newBuilder();
138+
if (result != null) {
139+
Optional<Payloads> serialized = dataConverter.toPayloads(result.getResult());
140+
serialized.ifPresent(request::setResult);
141+
}
142+
return new ActivityTaskHandler.Result(
143+
info.getActivityId(), request.build(), null, null, null, false);
144+
}
145+
146+
static void deserializeAndPopulateContext(
147+
@Nonnull io.temporal.api.common.v1.Header header,
148+
@Nullable List<ContextPropagator> contextPropagatorList) {
149+
if (contextPropagatorList == null || contextPropagatorList.isEmpty()) {
150+
return;
151+
}
152+
153+
Map<String, Payload> headerData = new HashMap<>(header.getFieldsMap());
154+
for (ContextPropagator propagator : contextPropagatorList) {
155+
propagator.setCurrentContext(propagator.deserializeContext(headerData));
156+
}
157+
}
158+
}
159+
160+
static class POJOActivityImplementation extends BaseActivityTaskExecutor {
161+
private final Method method;
162+
private final Object activity;
163+
164+
POJOActivityImplementation(
165+
Method interfaceMethod,
166+
Object activity,
167+
DataConverter dataConverter,
168+
List<ContextPropagator> contextPropagators,
169+
WorkerInterceptor[] interceptors,
170+
ActivityExecutionContextFactory executionContextFactory) {
171+
super(dataConverter, contextPropagators, interceptors, executionContextFactory);
172+
this.method = interfaceMethod;
173+
this.activity = activity;
174+
}
175+
176+
@Override
177+
ActivityInboundCallsInterceptor createRootInboundInterceptor() {
178+
return new RootActivityInboundCallsInterceptor.POJOActivityInboundCallsInterceptor(
179+
activity, method);
180+
}
181+
182+
@Override
183+
Object[] provideArgs(Optional<Payloads> input) {
184+
return DataConverter.arrayFromPayloads(
185+
dataConverter, input, method.getParameterTypes(), method.getGenericParameterTypes());
186+
}
187+
188+
@Override
189+
protected ActivityTaskHandler.Result constructSuccessfulResultValue(
190+
ActivityInfoInternal info, ActivityOutput result) {
191+
return constructResultValue(
192+
info,
193+
// if the expected result of the method is null, we don't publish result at all
194+
method.getReturnType() != Void.TYPE ? result : null);
195+
}
196+
}
197+
198+
static class DynamicActivityImplementation extends BaseActivityTaskExecutor {
199+
private final DynamicActivity activity;
200+
201+
DynamicActivityImplementation(
202+
DynamicActivity activity,
203+
DataConverter dataConverter,
204+
List<ContextPropagator> contextPropagators,
205+
WorkerInterceptor[] interceptors,
206+
ActivityExecutionContextFactory executionContextFactory) {
207+
super(dataConverter, contextPropagators, interceptors, executionContextFactory);
208+
this.activity = activity;
209+
}
210+
211+
@Override
212+
ActivityInboundCallsInterceptor createRootInboundInterceptor() {
213+
return new RootActivityInboundCallsInterceptor.DynamicActivityInboundCallsInterceptor(
214+
activity);
215+
}
216+
217+
@Override
218+
Object[] provideArgs(Optional<Payloads> input) {
219+
EncodedValues encodedValues = new EncodedValues(input, dataConverter);
220+
return new Object[] {encodedValues};
221+
}
222+
223+
@Override
224+
protected ActivityTaskHandler.Result constructSuccessfulResultValue(
225+
ActivityInfoInternal info, ActivityOutput result) {
226+
return constructResultValue(info, result);
227+
}
228+
}
229+
}

0 commit comments

Comments
 (0)