Skip to content

Commit 9a896ee

Browse files
committed
Simplify code and errors handling between ActivityWorker and LocalActivityWorker
Make LocalActivityWorker more robust to unexpected errors Issue #1262
1 parent 1b1c1d4 commit 9a896ee

20 files changed

+747
-576
lines changed

temporal-sdk/src/main/java/io/temporal/common/interceptors/Header.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@
2323
import io.temporal.api.common.v1.Payload;
2424
import java.util.HashMap;
2525
import java.util.Map;
26+
import javax.annotation.Nonnull;
2627

2728
public class Header {
2829
private final Map<String, Payload> values;
2930

30-
public Header(io.temporal.api.common.v1.Header header) {
31+
public Header(@Nonnull io.temporal.api.common.v1.Header header) {
3132
values = header.getFieldsMap();
3233
}
3334

temporal-sdk/src/main/java/io/temporal/failure/SimulatedTimeoutFailure.java

+5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@
2323
import com.google.common.annotations.VisibleForTesting;
2424

2525
/** Internal do not use in application code. */
26+
// TODO this class leaked from testing package into the main codebase during a past fat finger
27+
// refactoring.
28+
// This exception is created currently in a place which should be never reachable.
29+
// This exception is not used by any testing code.
30+
// Will be removed in a separate PR
2631
@VisibleForTesting
2732
public final class SimulatedTimeoutFailure extends RuntimeException {
2833

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityInfoImpl.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,10 @@ public Optional<Payloads> getInput() {
145145
}
146146

147147
@Override
148-
public Header getHeader() {
149-
return response.getHeader();
148+
public Optional<Header> getHeader() {
149+
if (response.hasHeader()) {
150+
return Optional.of(response.getHeader());
151+
}
152+
return Optional.empty();
150153
}
151154
}

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityInfoInternal.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,5 @@ interface ActivityInfoInternal extends ActivityInfo {
4646
/**
4747
* @return header that is passed with the activity execution
4848
*/
49-
Header getHeader();
49+
Optional<Header> getHeader();
5050
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.activity;
22+
23+
import static io.temporal.internal.activity.ActivityTaskHandlerImpl.mapToActivityFailure;
24+
25+
import com.uber.m3.tally.Scope;
26+
import io.temporal.activity.ActivityExecutionContext;
27+
import io.temporal.activity.DynamicActivity;
28+
import io.temporal.api.common.v1.Payload;
29+
import io.temporal.api.common.v1.Payloads;
30+
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest;
31+
import io.temporal.client.ActivityCanceledException;
32+
import io.temporal.common.context.ContextPropagator;
33+
import io.temporal.common.converter.DataConverter;
34+
import io.temporal.common.converter.EncodedValues;
35+
import io.temporal.common.interceptors.ActivityInboundCallsInterceptor;
36+
import io.temporal.common.interceptors.ActivityInboundCallsInterceptor.ActivityOutput;
37+
import io.temporal.common.interceptors.Header;
38+
import io.temporal.common.interceptors.WorkerInterceptor;
39+
import io.temporal.internal.worker.ActivityTaskHandler;
40+
import io.temporal.serviceclient.CheckedExceptionWrapper;
41+
import java.lang.reflect.Method;
42+
import java.util.HashMap;
43+
import java.util.List;
44+
import java.util.Map;
45+
import java.util.Optional;
46+
import javax.annotation.Nonnull;
47+
import javax.annotation.Nullable;
48+
import org.slf4j.Logger;
49+
import org.slf4j.LoggerFactory;
50+
51+
final class ActivityTaskExecutors {
52+
static final Logger log = LoggerFactory.getLogger(ActivityTaskExecutor.class);
53+
54+
interface ActivityTaskExecutor {
55+
ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metricsScope);
56+
}
57+
58+
abstract static class BaseActivityTaskExecutor implements ActivityTaskExecutor {
59+
protected final DataConverter dataConverter;
60+
private final List<ContextPropagator> contextPropagators;
61+
private final WorkerInterceptor[] interceptors;
62+
private final ActivityExecutionContextFactory executionContextFactory;
63+
64+
public BaseActivityTaskExecutor(
65+
DataConverter dataConverter,
66+
List<ContextPropagator> contextPropagators,
67+
WorkerInterceptor[] interceptors,
68+
ActivityExecutionContextFactory executionContextFactory) {
69+
this.dataConverter = dataConverter;
70+
this.contextPropagators = contextPropagators;
71+
this.interceptors = interceptors;
72+
this.executionContextFactory = executionContextFactory;
73+
}
74+
75+
@Override
76+
public ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metricsScope) {
77+
ActivityExecutionContext context = executionContextFactory.createContext(info, metricsScope);
78+
Optional<Payloads> input = info.getInput();
79+
80+
try {
81+
info.getHeader()
82+
.ifPresent(value -> deserializeAndPopulateContext(value, contextPropagators));
83+
84+
ActivityInboundCallsInterceptor inboundCallsInterceptor = createRootInboundInterceptor();
85+
for (WorkerInterceptor interceptor : interceptors) {
86+
inboundCallsInterceptor = interceptor.interceptActivity(inboundCallsInterceptor);
87+
}
88+
inboundCallsInterceptor.init(context);
89+
90+
Object[] args = provideArgs(input);
91+
Header header =
92+
new Header(
93+
info.getHeader().orElse(io.temporal.api.common.v1.Header.getDefaultInstance()));
94+
ActivityOutput result =
95+
inboundCallsInterceptor.execute(
96+
new ActivityInboundCallsInterceptor.ActivityInput(header, args));
97+
if (context.isDoNotCompleteOnReturn()) {
98+
return new ActivityTaskHandler.Result(
99+
info.getActivityId(), null, null, null, null, context.isUseLocalManualCompletion());
100+
}
101+
102+
return this.constructSuccessfulResultValue(info, result);
103+
} catch (Throwable e) {
104+
Throwable ex = CheckedExceptionWrapper.unwrap(e);
105+
boolean local = info.isLocal();
106+
if (ex instanceof ActivityCanceledException) {
107+
log.info(
108+
"{} canceled. ActivityId={}, activityType={}, attempt={}",
109+
local ? "Local activity" : "Activity",
110+
info.getActivityId(),
111+
info.getActivityType(),
112+
info.getAttempt());
113+
} else {
114+
log.warn(
115+
"{} failure. ActivityId={}, activityType={}, attempt={}",
116+
local ? "Local activity" : "Activity",
117+
info.getActivityId(),
118+
info.getActivityType(),
119+
info.getAttempt(),
120+
ex);
121+
}
122+
123+
if (e instanceof Error) {
124+
// we shouldn't silently convert Errors into activity failures, we have to propagate it.
125+
// it will still be converted into activity failures and reported by the workers
126+
throw e;
127+
}
128+
return mapToActivityFailure(ex, info.getActivityId(), metricsScope, local, dataConverter);
129+
}
130+
}
131+
132+
abstract ActivityInboundCallsInterceptor createRootInboundInterceptor();
133+
134+
abstract Object[] provideArgs(Optional<Payloads> input);
135+
136+
protected abstract ActivityTaskHandler.Result constructSuccessfulResultValue(
137+
ActivityInfoInternal info, ActivityOutput result);
138+
139+
ActivityTaskHandler.Result constructResultValue(
140+
ActivityInfoInternal info, @Nullable ActivityOutput result) {
141+
RespondActivityTaskCompletedRequest.Builder request =
142+
RespondActivityTaskCompletedRequest.newBuilder();
143+
if (result != null) {
144+
Optional<Payloads> serialized = dataConverter.toPayloads(result.getResult());
145+
serialized.ifPresent(request::setResult);
146+
}
147+
return new ActivityTaskHandler.Result(
148+
info.getActivityId(), request.build(), null, null, null, false);
149+
}
150+
151+
static void deserializeAndPopulateContext(
152+
@Nonnull io.temporal.api.common.v1.Header header,
153+
@Nullable List<ContextPropagator> contextPropagatorList) {
154+
if (contextPropagatorList == null || contextPropagatorList.isEmpty()) {
155+
return;
156+
}
157+
158+
Map<String, Payload> headerData = new HashMap<>(header.getFieldsMap());
159+
for (ContextPropagator propagator : contextPropagatorList) {
160+
propagator.setCurrentContext(propagator.deserializeContext(headerData));
161+
}
162+
}
163+
}
164+
165+
static class POJOActivityImplementation extends BaseActivityTaskExecutor {
166+
private final Method method;
167+
private final Object activity;
168+
169+
POJOActivityImplementation(
170+
Method interfaceMethod,
171+
Object activity,
172+
DataConverter dataConverter,
173+
List<ContextPropagator> contextPropagators,
174+
WorkerInterceptor[] interceptors,
175+
ActivityExecutionContextFactory executionContextFactory) {
176+
super(dataConverter, contextPropagators, interceptors, executionContextFactory);
177+
this.method = interfaceMethod;
178+
this.activity = activity;
179+
}
180+
181+
@Override
182+
ActivityInboundCallsInterceptor createRootInboundInterceptor() {
183+
return new RootActivityInboundCallsInterceptor.POJOActivityInboundCallsInterceptor(
184+
activity, method);
185+
}
186+
187+
@Override
188+
Object[] provideArgs(Optional<Payloads> input) {
189+
return DataConverter.arrayFromPayloads(
190+
dataConverter, input, method.getParameterTypes(), method.getGenericParameterTypes());
191+
}
192+
193+
@Override
194+
protected ActivityTaskHandler.Result constructSuccessfulResultValue(
195+
ActivityInfoInternal info, ActivityOutput result) {
196+
return constructResultValue(
197+
info,
198+
// if the expected result of the method is null, we don't publish result at all
199+
method.getReturnType() != Void.TYPE ? result : null);
200+
}
201+
}
202+
203+
static class DynamicActivityImplementation extends BaseActivityTaskExecutor {
204+
private final DynamicActivity activity;
205+
206+
DynamicActivityImplementation(
207+
DynamicActivity activity,
208+
DataConverter dataConverter,
209+
List<ContextPropagator> contextPropagators,
210+
WorkerInterceptor[] interceptors,
211+
ActivityExecutionContextFactory executionContextFactory) {
212+
super(dataConverter, contextPropagators, interceptors, executionContextFactory);
213+
this.activity = activity;
214+
}
215+
216+
@Override
217+
ActivityInboundCallsInterceptor createRootInboundInterceptor() {
218+
return new RootActivityInboundCallsInterceptor.DynamicActivityInboundCallsInterceptor(
219+
activity);
220+
}
221+
222+
@Override
223+
Object[] provideArgs(Optional<Payloads> input) {
224+
EncodedValues encodedValues = new EncodedValues(input, dataConverter);
225+
return new Object[] {encodedValues};
226+
}
227+
228+
@Override
229+
protected ActivityTaskHandler.Result constructSuccessfulResultValue(
230+
ActivityInfoInternal info, ActivityOutput result) {
231+
return constructResultValue(info, result);
232+
}
233+
}
234+
}

0 commit comments

Comments
 (0)