Skip to content

Fix NDE caused by removing Workflow.getVersion with a succeeding Work… #2370

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

package io.temporal.internal.history;

import io.temporal.api.command.v1.Command;
import io.temporal.api.command.v1.RecordMarkerCommandAttributes;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.enums.v1.CommandType;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.history.v1.MarkerRecordedEventAttributes;
Expand All @@ -42,6 +45,19 @@ public static boolean verifyMarkerName(HistoryEvent event, String markerName) {
return markerName.equals(attributes.getMarkerName());
}

/**
* @param command {@code Command} to inspect
* @param markerName expected marker name
* @return true if the command has a correct structure for a marker and an expected marker name
*/
public static boolean verifyMarkerName(Command command, String markerName) {
if (!CommandType.COMMAND_TYPE_RECORD_MARKER.equals(command.getCommandType())) {
return false;
}
RecordMarkerCommandAttributes attributes = command.getRecordMarkerCommandAttributes();
return markerName.equals(attributes.getMarkerName());
}

/**
* This method should be used to extract values from the marker persisted by the SDK itself. These
* values are converted using standard data converter to be always accessible by the SDK.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.internal.history;

import com.google.common.base.Preconditions;
import io.temporal.api.command.v1.Command;
import io.temporal.api.command.v1.RecordMarkerCommandAttributes;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.history.v1.HistoryEvent;
Expand Down Expand Up @@ -56,6 +57,14 @@ public static boolean hasVersionMarkerStructure(HistoryEvent event) {
return MarkerUtils.verifyMarkerName(event, MARKER_NAME);
}

/**
* @param command {@code Command} to inspect
* @return true if the command has a correct structure for a version marker
*/
public static boolean hasVersionMarkerStructure(Command command) {
return MarkerUtils.verifyMarkerName(command, MARKER_NAME);
}

@Nullable
public static String getChangeId(MarkerRecordedEventAttributes markerAttributes) {
return MarkerUtils.getValueFromMarker(markerAttributes, MARKER_CHANGE_ID_KEY, String.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,9 @@ public static Command createRecordMarker(RecordMarkerCommandAttributes attribute
.setRecordMarkerCommandAttributes(attributes)
.build();
}

public static Command createFakeMarkerCommand(String markerName) {
return createRecordMarker(
RecordMarkerCommandAttributes.newBuilder().setMarkerName(markerName).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package io.temporal.internal.statemachines;

import static io.temporal.internal.statemachines.StateMachineCommandUtils.createFakeMarkerCommand;
import static io.temporal.internal.sync.WorkflowInternal.DEFAULT_VERSION;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -200,7 +201,7 @@ public void handleWorkflowTaskStarted() {
}

void createFakeCommand() {
addCommand(StateMachineCommandUtils.RECORD_MARKER_FAKE_COMMAND);
addCommand(createFakeMarkerCommand(VersionMarkerUtils.MARKER_NAME));
}

private void validateVersionAndThrow(boolean preloaded) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,26 @@ private void handleCommandEvent(HistoryEvent event) {
continue;
}

// This checks if the next event is a version marker, but the next command is not a version
// marker. This can happen if a getVersion call was removed.
if (VersionMarkerUtils.hasVersionMarkerStructure(event)
&& !VersionMarkerUtils.hasVersionMarkerStructure(command.getCommand())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand what's happening here. So is !VersionMarkerUtils.hasVersionMarkerStructure(command.getCommand()) basically saying isFakeVersionMarker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this code deserves a comment. I will add it. !VersionMarkerUtils.hasVersionMarkerStructure(command.getCommand()) is basically checking if the command is from a Workflow.getVersion call. What I am checking here is, if the history event is a version marker, but the command isn't that means this event can't match with the command so it must be a removed version marker

if (handleNonMatchingVersionMarker(event)) {
// this event is a version marker for removed getVersion call.
// Handle the version marker as unmatched and return even if there is no commands to match
// it against.
return;
} else {
throw new NonDeterministicException(
"Event "
+ event.getEventId()
+ " of type "
+ event.getEventType()
+ " does not"
+ " match command type "
+ command.getCommandType());
}
}
// Note that handleEvent can cause a command cancellation in case of
// 1. MutableSideEffect
// 2. Version State Machine during replay cancels the command and enters SKIPPED state
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow.versionTests;

import static org.junit.Assert.*;

import io.temporal.activity.LocalActivityOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.worker.WorkerOptions;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.shared.TestActivities;
import io.temporal.workflow.shared.TestActivities.TestActivitiesImpl;
import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1;
import io.temporal.workflow.unsafe.WorkflowUnsafe;
import java.time.Duration;
import org.junit.Rule;
import org.junit.Test;

public class GetVersionRemovalBeforeMarkerTest {
private static boolean hasReplayed;

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestGetVersionRemovalWorkflowImpl.class)
.setActivityImplementations(new TestActivitiesImpl())
// Forcing a replay. Full history arrived from a normal queue causing a replay.
.setWorkerOptions(
WorkerOptions.newBuilder()
.setStickyQueueScheduleToStartTimeout(Duration.ZERO)
.build())
.build();

@Test
public void testSideEffectAfterGetVersion() {
TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
String result = workflowStub.execute("SideEffect");
assertTrue(hasReplayed);
assertEquals("side effect", result);
}

@Test
public void testMutableSideEffectAfterGetVersion() {
TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
String result = workflowStub.execute("MutableSideEffect");
assertTrue(hasReplayed);
assertEquals("mutable side effect", result);
}

@Test
public void testGetVersionAfterGetVersion() {
TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
String result = workflowStub.execute("GetVersion");
assertTrue(hasReplayed);
assertEquals("6", result);
}

@Test
public void testLocalActivityAfterGetVersion() {
TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
String result = workflowStub.execute("LocalActivity");
assertTrue(hasReplayed);
assertEquals("activity", result);
}

public static class TestGetVersionRemovalWorkflowImpl implements TestWorkflow1 {
private final TestActivities.VariousTestActivities activities =
Workflow.newLocalActivityStub(
TestActivities.VariousTestActivities.class,
LocalActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(5))
.build());

@Override
public String execute(String action) {
// Test removing a version check in replaying code with an additional thread running.
if (!WorkflowUnsafe.isReplaying()) {
int version = Workflow.getVersion("changeId", 1, 2);
assertEquals(version, 2);
} else {
hasReplayed = true;
}
String result = "";
if (action.equals("SideEffect")) {
result = Workflow.sideEffect(String.class, () -> "side effect");
} else if (action.equals("MutableSideEffect")) {
result =
Workflow.mutableSideEffect(
"mutable-side-effect-i",
String.class,
(a, b) -> !a.equals(b),
() -> "mutable side effect");
} else if (action.equals("GetVersion")) {
int v = Workflow.getVersion("otherChangeId", 5, 6);
result = String.valueOf(v);
} else if (action.equals("LocalActivity")) {
result = activities.activity();
}
// Sleep to trigger at lest one more workflow task
Workflow.sleep(Duration.ofSeconds(1));
return result;
}
}
}
Loading