Skip to content

Commit d86fc86

Browse files
committed
Issue temporalio#1261 WIP
1 parent c5f0ebb commit d86fc86

25 files changed

+568
-224
lines changed

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ plugins {
1010
id 'org.cadixdev.licenser' version '0.6.1'
1111
id 'com.palantir.git-version' version "${palantirGitVersionVersion}" apply false
1212
id 'io.github.gradle-nexus.publish-plugin' version '1.1.0'
13-
id 'com.diffplug.spotless' version '6.11.0' apply false
13+
id 'com.diffplug.spotless' version '6.12.0' apply false
1414
id 'com.github.nbaztec.coveralls-jacoco' version "1.2.15" apply false
1515

1616
// id 'org.jetbrains.kotlin.jvm' version '1.4.32'
+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionUrl=https\://services.gradle.org/distributions/gradle-7.5.1-all.zip
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.history;
22+
23+
import com.fasterxml.jackson.annotation.JsonFormat;
24+
import com.fasterxml.jackson.annotation.JsonProperty;
25+
import java.time.Duration;
26+
import javax.annotation.Nullable;
27+
28+
/**
29+
* See <a
30+
* href="https://github.com/temporalio/sdk-core/blob/master/protos/local/temporal/sdk/core/external_data/external_data.proto#L12">Core
31+
* Data Structure</a>
32+
*/
33+
public class LocalActivityMarkerMetadata {
34+
// The time the LA was originally scheduled (wall clock time). This is used to track
35+
// schedule-to-close timeouts when timer-based backoffs are used.
36+
@JsonProperty(value = "firstSkd")
37+
private long originalScheduledTimestamp;
38+
39+
// The number of attempts at execution before we recorded this result. Typically starts at 1,
40+
// but it is possible to start at a higher number when backing off using a timer.
41+
@JsonProperty(value = "atpt")
42+
private int attempt;
43+
44+
// If set, this local activity conceptually is retrying after the specified backoff.
45+
// Implementation wise, they are really two different LA machines, but with the same type & input.
46+
// The retry starts with an attempt number > 1.
47+
@Nullable
48+
@JsonFormat(shape = JsonFormat.Shape.NUMBER_INT)
49+
@JsonProperty(value = "backoff")
50+
private Duration backoff;
51+
52+
public LocalActivityMarkerMetadata() {}
53+
54+
public LocalActivityMarkerMetadata(int attempt, long originalScheduledTimestamp) {
55+
this.attempt = attempt;
56+
this.originalScheduledTimestamp = originalScheduledTimestamp;
57+
}
58+
59+
public long getOriginalScheduledTimestamp() {
60+
return originalScheduledTimestamp;
61+
}
62+
63+
public void setOriginalScheduledTimestamp(long originalScheduledTimestamp) {
64+
this.originalScheduledTimestamp = originalScheduledTimestamp;
65+
}
66+
67+
public int getAttempt() {
68+
return attempt;
69+
}
70+
71+
public void setAttempt(int attempt) {
72+
this.attempt = attempt;
73+
}
74+
75+
@Nullable
76+
public Duration getBackoff() {
77+
return backoff;
78+
}
79+
80+
public void setBackoff(@Nullable Duration backoff) {
81+
this.backoff = backoff;
82+
}
83+
}

temporal-sdk/src/main/java/io/temporal/internal/history/LocalActivityMarkerUtils.java

+29-2
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,29 @@
2020

2121
package io.temporal.internal.history;
2222

23+
import io.temporal.api.common.v1.Payloads;
2324
import io.temporal.api.history.v1.HistoryEvent;
2425
import io.temporal.api.history.v1.MarkerRecordedEventAttributes;
26+
import java.util.Map;
2527
import javax.annotation.Nullable;
2628

2729
public class LocalActivityMarkerUtils {
30+
public static final String MARKER_NAME = "LocalActivity";
2831
public static final String MARKER_ACTIVITY_ID_KEY = "activityId";
2932
public static final String MARKER_ACTIVITY_TYPE_KEY = "type";
3033
public static final String MARKER_ACTIVITY_RESULT_KEY = "result";
3134
public static final String MARKER_ACTIVITY_INPUT_KEY = "input";
3235
public static final String MARKER_TIME_KEY = "time";
36+
public static final String MARKER_METADATA_KEY = "meta";
3337
// Deprecated in favor of result. Still present for backwards compatibility.
34-
public static final String MARKER_DATA_KEY = "data";
38+
private static final String MARKER_DATA_KEY = "data";
3539

3640
/**
3741
* @param event {@code HistoryEvent} to inspect
3842
* @return true if the event has a correct structure for a local activity
3943
*/
4044
public static boolean hasLocalActivityStructure(HistoryEvent event) {
41-
return MarkerUtils.verifyMarkerName(event, MarkerUtils.LOCAL_ACTIVITY_MARKER_NAME);
45+
return MarkerUtils.verifyMarkerName(event, MARKER_NAME);
4246
}
4347

4448
@Nullable
@@ -50,4 +54,27 @@ public static String getActivityId(MarkerRecordedEventAttributes markerAttribute
5054
public static String getActivityTypeName(MarkerRecordedEventAttributes markerAttributes) {
5155
return MarkerUtils.getValueFromMarker(markerAttributes, MARKER_ACTIVITY_TYPE_KEY, String.class);
5256
}
57+
58+
@Nullable
59+
public static Payloads getResult(MarkerRecordedEventAttributes markerAttributes) {
60+
Map<String, Payloads> detailsMap = markerAttributes.getDetailsMap();
61+
Payloads result = detailsMap.get(LocalActivityMarkerUtils.MARKER_ACTIVITY_RESULT_KEY);
62+
if (result == null) {
63+
// Support old histories that used "data" as a key for "result".
64+
result = detailsMap.get(LocalActivityMarkerUtils.MARKER_DATA_KEY);
65+
}
66+
return result;
67+
}
68+
69+
@Nullable
70+
public static Long getTime(MarkerRecordedEventAttributes markerAttributes) {
71+
return MarkerUtils.getValueFromMarker(markerAttributes, MARKER_TIME_KEY, Long.class);
72+
}
73+
74+
@Nullable
75+
public static LocalActivityMarkerMetadata getMetadata(
76+
MarkerRecordedEventAttributes markerAttributes) {
77+
return MarkerUtils.getValueFromMarker(
78+
markerAttributes, MARKER_METADATA_KEY, LocalActivityMarkerMetadata.class);
79+
}
5380
}

temporal-sdk/src/main/java/io/temporal/internal/history/MarkerUtils.java

-3
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@
2828
import java.util.Optional;
2929

3030
public class MarkerUtils {
31-
public static final String VERSION_MARKER_NAME = "Version";
32-
33-
public static final String LOCAL_ACTIVITY_MARKER_NAME = "LocalActivity";
3431

3532
/**
3633
* @param event {@code HistoryEvent} to inspect

temporal-sdk/src/main/java/io/temporal/internal/history/VersionMarkerUtils.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import javax.annotation.Nullable;
3232

3333
public class VersionMarkerUtils {
34+
public static final String MARKER_NAME = "Version";
3435
public static final String MARKER_CHANGE_ID_KEY = "changeId";
3536
public static final String MARKER_VERSION_KEY = "version";
3637

@@ -52,7 +53,7 @@ public static String tryGetChangeIdFromVersionMarkerEvent(HistoryEvent event) {
5253
* @return true if the event has a correct structure for a version marker
5354
*/
5455
public static boolean hasVersionMarkerStructure(HistoryEvent event) {
55-
return MarkerUtils.verifyMarkerName(event, MarkerUtils.VERSION_MARKER_NAME);
56+
return MarkerUtils.verifyMarkerName(event, MARKER_NAME);
5657
}
5758

5859
@Nullable
@@ -74,7 +75,7 @@ public static RecordMarkerCommandAttributes createMarkerAttributes(
7475
details.put(
7576
MARKER_VERSION_KEY, DefaultDataConverter.STANDARD_INSTANCE.toPayloads(version).get());
7677
return RecordMarkerCommandAttributes.newBuilder()
77-
.setMarkerName(MarkerUtils.VERSION_MARKER_NAME)
78+
.setMarkerName(MARKER_NAME)
7879
.putAllDetails(details)
7980
.build();
8081
}

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
3333
import io.temporal.internal.statemachines.ExecuteActivityParameters;
3434
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
35+
import io.temporal.internal.statemachines.LocalActivityCallback;
3536
import io.temporal.internal.statemachines.StartChildWorkflowExecutionParameters;
3637
import io.temporal.workflow.Functions;
3738
import io.temporal.workflow.Functions.Func;
@@ -115,8 +116,7 @@ Functions.Proc1<Exception> scheduleActivityTask(
115116
ExecuteActivityParameters parameters, Functions.Proc2<Optional<Payloads>, Failure> callback);
116117

117118
Functions.Proc scheduleLocalActivityTask(
118-
ExecuteLocalActivityParameters parameters,
119-
Functions.Proc2<Optional<Payloads>, Failure> callback);
119+
ExecuteLocalActivityParameters parameters, LocalActivityCallback callback);
120120

121121
/**
122122
* Start child workflow.
@@ -181,8 +181,8 @@ Functions.Proc1<RuntimeException> newTimer(
181181
* getting random number or new UUID. The only way to fail SideEffect is to throw {@link Error}
182182
* which causes workflow task failure. The workflow task after timeout is rescheduled and
183183
* re-executed giving SideEffect another chance to succeed. Use {@link
184-
* #scheduleLocalActivityTask(ExecuteLocalActivityParameters, Functions.Proc2)} for executing
185-
* operations that rely on non-global dependencies and can fail.
184+
* #scheduleLocalActivityTask(ExecuteLocalActivityParameters, LocalActivityCallback)} for
185+
* executing operations that rely on non-global dependencies and can fail.
186186
*
187187
* @param func function that is called once to return a value.
188188
* @param callback function that accepts the result of the side effect.

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java

+2-6
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,7 @@
3232
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
3333
import io.temporal.failure.CanceledFailure;
3434
import io.temporal.internal.common.ProtobufTimeUtils;
35-
import io.temporal.internal.statemachines.ExecuteActivityParameters;
36-
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
37-
import io.temporal.internal.statemachines.StartChildWorkflowExecutionParameters;
38-
import io.temporal.internal.statemachines.WorkflowStateMachines;
35+
import io.temporal.internal.statemachines.*;
3936
import io.temporal.internal.worker.SingleWorkerOptions;
4037
import io.temporal.workflow.Functions;
4138
import io.temporal.workflow.Functions.Func;
@@ -208,8 +205,7 @@ public Functions.Proc1<Exception> scheduleActivityTask(
208205

209206
@Override
210207
public Functions.Proc scheduleLocalActivityTask(
211-
ExecuteLocalActivityParameters parameters,
212-
Functions.Proc2<Optional<Payloads>, Failure> callback) {
208+
ExecuteLocalActivityParameters parameters, LocalActivityCallback callback) {
213209
return workflowStateMachines.scheduleLocalActivityTask(parameters, callback);
214210
}
215211

temporal-sdk/src/main/java/io/temporal/internal/statemachines/ExecuteLocalActivityParameters.java

+10-14
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@
2424
import io.temporal.api.common.v1.Payloads;
2525
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
2626
import io.temporal.internal.common.ProtobufTimeUtils;
27+
import io.temporal.workflow.Workflow;
2728
import java.time.Duration;
2829
import java.util.Objects;
2930
import javax.annotation.Nonnull;
3031
import javax.annotation.Nullable;
3132

3233
public class ExecuteLocalActivityParameters {
33-
public static final long NOT_SCHEDULED = -1;
3434

3535
// This builder doesn't have all the fields published yet (a specific attempt for example)
3636
// It contains only the fields known at the moment of scheduling from the workflow.
@@ -41,27 +41,27 @@ public class ExecuteLocalActivityParameters {
4141
private final Duration localRetryThreshold;
4242
private final boolean doNotIncludeArgumentsIntoMarker;
4343

44-
private @Nullable Duration scheduleToStartTimeout;
45-
4644
/**
47-
* Timestamp of the moment when the first attempt of this local activity was scheduled. Comes into
48-
* play when localRetryThreshold is reached. If {@link #NOT_SCHEDULED} then the first attempt was
49-
* not scheduled yet, and we are going to do it now locally. This timestamp is registered by the
50-
* worker performing the first attempt, so this mechanic needs reasonably synchronized worker
51-
* clocks.
45+
* This timestamp is a Workflow Time ({@link Workflow#currentTimeMillis()}) at the moment of
46+
* scheduling of the first attempt. Comes into play when localRetryThreshold is reached. This
47+
* mechanic requires reasonably synchronized worker clocks to work properly.
5248
*/
53-
private long originalScheduledTimestamp = NOT_SCHEDULED;
49+
private final long originalScheduledTimestamp;
50+
51+
private @Nullable Duration scheduleToStartTimeout;
5452

5553
public ExecuteLocalActivityParameters(
5654
@Nonnull PollActivityTaskQueueResponse.Builder activityTaskBuilder,
5755
@Nullable Duration scheduleToStartTimeout,
5856
boolean doNotIncludeArgumentsIntoMarker,
59-
Duration localRetryThreshold) {
57+
Duration localRetryThreshold,
58+
long originalScheduledTimestamp) {
6059
this.activityTaskBuilder = Objects.requireNonNull(activityTaskBuilder, "activityTaskBuilder");
6160
this.initialActivityTask = activityTaskBuilder.build();
6261
this.scheduleToStartTimeout = scheduleToStartTimeout;
6362
this.doNotIncludeArgumentsIntoMarker = doNotIncludeArgumentsIntoMarker;
6463
this.localRetryThreshold = localRetryThreshold;
64+
this.originalScheduledTimestamp = originalScheduledTimestamp;
6565
}
6666

6767
public String getActivityId() {
@@ -111,10 +111,6 @@ public Duration getLocalRetryThreshold() {
111111
return localRetryThreshold;
112112
}
113113

114-
public void setOriginalScheduledTimestamp(long scheduledTimestamp) {
115-
this.originalScheduledTimestamp = scheduledTimestamp;
116-
}
117-
118114
public long getOriginalScheduledTimestamp() {
119115
return originalScheduledTimestamp;
120116
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.statemachines;
22+
23+
import io.temporal.api.common.v1.Payloads;
24+
import io.temporal.api.failure.v1.Failure;
25+
import io.temporal.workflow.Functions;
26+
import java.time.Duration;
27+
import java.util.Optional;
28+
import javax.annotation.Nonnull;
29+
import javax.annotation.Nullable;
30+
31+
@FunctionalInterface
32+
public interface LocalActivityCallback
33+
extends Functions.Proc2<
34+
Optional<Payloads>, LocalActivityCallback.LocalActivityFailedException> {
35+
36+
@Override
37+
void apply(Optional<Payloads> successOutput, LocalActivityFailedException exception);
38+
39+
class LocalActivityFailedException extends RuntimeException {
40+
private final @Nonnull Failure failure;
41+
private final long originalScheduledTimestamp;
42+
private final int lastAttempt;
43+
/**
44+
* If this is not null, code that processes this exception will schedule a workflow timer to
45+
* continue retrying the execution
46+
*/
47+
private final @Nullable Duration backoff;
48+
49+
public LocalActivityFailedException(
50+
@Nonnull Failure failure,
51+
long originalScheduledTimestamp,
52+
int lastAttempt,
53+
@Nullable Duration backoff) {
54+
this.failure = failure;
55+
this.originalScheduledTimestamp = originalScheduledTimestamp;
56+
this.lastAttempt = lastAttempt;
57+
this.backoff = backoff;
58+
}
59+
60+
@Nonnull
61+
public Failure getFailure() {
62+
return failure;
63+
}
64+
65+
public long getOriginalScheduledTimestamp() {
66+
return originalScheduledTimestamp;
67+
}
68+
69+
public int getLastAttempt() {
70+
return lastAttempt;
71+
}
72+
73+
@Nullable
74+
public Duration getBackoff() {
75+
return backoff;
76+
}
77+
}
78+
}

0 commit comments

Comments
 (0)