Skip to content

Update Java SDK for Temporal Sever v1.26.2 #2357

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 12 commits into from
Jan 6, 2025
4 changes: 3 additions & 1 deletion docker/github/dynamicconfig/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ component.nexusoperations.callback.endpoint.template:
component.callbacks.allowedAddresses:
- value:
- Pattern: "localhost:7243"
AllowInsecure: true
AllowInsecure: true
system.refreshNexusEndpointsMinWait:
- value: 1ms
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,14 @@ private TemporalFailure failureToExceptionImpl(Failure failure, DataConverter da
}
case FAILUREINFO_NOT_SET:
default:
throw new IllegalArgumentException("Failure info not set");
// All unknown types are considered to be retryable ApplicationError.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked all other SDKs and the Java SDK was the only SDK that was throwing an exception here instead of converting to an application failure

return ApplicationFailure.newFromValues(
failure.getMessage(),
"",
false,
new EncodedValues(Optional.empty(), dataConverter),
cause,
null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ public Throwable wrapFailure(ActivityTask t, Throwable failure) {
failure);
}

// TODO: Suppress warning until the SDK supports deployment
@SuppressWarnings("deprecation")
private void sendReply(
ByteString taskToken, ActivityTaskHandler.Result response, Scope metricsScope) {
RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,8 @@ private WorkflowTaskHandler.Result handleTask(
}
}

// TODO: Suppress warning until the SDK supports deployment
@SuppressWarnings("deprecation")
private RespondWorkflowTaskCompletedResponse sendTaskCompleted(
ByteString taskToken,
RespondWorkflowTaskCompletedRequest.Builder taskCompleted,
Expand Down Expand Up @@ -514,6 +516,8 @@ private RespondWorkflowTaskCompletedResponse sendTaskCompleted(
grpcRetryOptions);
}

// TODO: Suppress warning until the SDK supports deployment
@SuppressWarnings("deprecation")
private void sendTaskFailed(
ByteString taskToken,
RespondWorkflowTaskFailedRequest.Builder taskFailed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.*;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;

/**
Expand All @@ -70,13 +69,11 @@
*/
@RunWith(JUnitParamsRunner.class)
public class ActivityTimeoutTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder().setDoNotStart(true).build();

// TODO This test takes longer than it should to complete because
// of the cached heartbeat that prevents a quick shutdown
public @Rule Timeout timeout = Timeout.seconds(15);
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder().setTestTimeoutSeconds(15).setDoNotStart(true).build();

/**
* An activity reaches startToClose timeout once, max retries are set to 1. o
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,6 @@ public void syncOperationImmediatelyCancelled() {
"operation canceled before it was started", canceledFailure.getOriginalMessage());
}

@Test
public void syncOperationCancelled() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this test since the Server dropped support for cancelling failing sync operations

TestWorkflows.TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class);
WorkflowFailedException exception =
Assert.assertThrows(WorkflowFailedException.class, () -> workflowStub.execute(""));
Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure);
NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause();
Assert.assertTrue(nexusFailure.getCause() instanceof CanceledFailure);
}

public static class TestNexus implements TestWorkflows.TestWorkflow1 {
@Override
public String execute(String input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.temporal.client.WorkflowOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.failure.NexusOperationFailure;
import io.temporal.failure.TerminatedFailure;
import io.temporal.nexus.WorkflowClientOperationHandlers;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.*;
Expand All @@ -54,10 +55,20 @@ public void terminateAsyncOperation() {
Assert.assertThrows(WorkflowFailedException.class, () -> workflowStub.execute(""));
Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure);
NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause();
Assert.assertTrue(nexusFailure.getCause() instanceof ApplicationFailure);
Assert.assertEquals(
"operation terminated",
((ApplicationFailure) nexusFailure.getCause()).getOriginalMessage());
// TODO(https://github.com/temporalio/sdk-java/issues/2358): Test server needs to be fixed to
// return the correct type
Assert.assertTrue(
nexusFailure.getCause() instanceof ApplicationFailure
|| nexusFailure.getCause() instanceof TerminatedFailure);
if (nexusFailure.getCause() instanceof ApplicationFailure) {
Assert.assertEquals(
"operation terminated",
((ApplicationFailure) nexusFailure.getCause()).getOriginalMessage());
} else {
Assert.assertEquals(
"operation terminated",
((TerminatedFailure) nexusFailure.getCause()).getOriginalMessage());
}
}

@Service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ static final class ActivityTaskData {
TestServiceRetryState retryState;
Duration nextBackoffInterval;
String identity;
Timestamp lastAttemptCompleteTime;

ActivityTaskData(
TestWorkflowStore store, StartWorkflowExecutionRequest startWorkflowExecutionRequest) {
Expand Down Expand Up @@ -2112,6 +2113,7 @@ private static RetryState attemptActivityRetry(
ctx.onCommit(
(historySize) -> {
data.retryState = nextAttempt;
data.lastAttemptCompleteTime = ctx.currentTime();
task.setAttempt(nextAttempt.getAttempt());
task.setCurrentAttemptScheduledTime(ctx.currentTime());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Timestamp;
Expand Down Expand Up @@ -86,6 +87,17 @@
import org.slf4j.LoggerFactory;

class TestWorkflowMutableStateImpl implements TestWorkflowMutableState {
static final Failure FAILED_UPDATE_ON_WF_COMPLETION =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test confirming that this at least looks mostly like the real server's version? Unsure if Java has tests against actual server and test server with the same code, but I just want to make sure if/when this changes server side we can catch it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I can do that

Failure.newBuilder()
.setMessage(
"Workflow Update failed because the Workflow completed before the Update completed.")
.setSource("Server")
.setApplicationFailureInfo(
ApplicationFailureInfo.newBuilder()
.setType("AcceptedUpdateCompletedWorkflow")
.setNonRetryable(true)
.build())
.build();

/**
* If the implementation throws an exception, changes accumulated in the RequestContext will not
Expand Down Expand Up @@ -541,6 +553,7 @@ public void completeWorkflowTask(
|| request.getForceCreateNewWorkflowTask())) {
scheduleWorkflowTask(ctx);
}

workflowTaskStateMachine.getData().bufferedEvents.clear();
Map<String, ConsistentQuery> queries = data.consistentQueryRequests;
Map<String, WorkflowQueryResult> queryResultsMap = request.getQueryResultsMap();
Expand Down Expand Up @@ -1671,6 +1684,27 @@ private void processWorkflowCompletionCallbacks(RequestContext ctx) {
return;
}

updates.forEach(
(k, updateStateMachine) -> {
if (!(updateStateMachine.getState() == StateMachines.State.COMPLETED
|| updateStateMachine.getState() == StateMachines.State.FAILED)) {
updateStateMachine.action(
Action.COMPLETE,
ctx,
Message.newBuilder()
.setBody(
Any.pack(
Response.newBuilder()
.setOutcome(
Outcome.newBuilder()
.setFailure(FAILED_UPDATE_ON_WF_COMPLETION)
.build())
.build()))
.build(),
completionEvent.get().getEventId());
}
});

for (Callback cb : startRequest.getCompletionCallbacksList()) {
if (!cb.hasNexus()) {
// test server only supports nexus callbacks currently
Expand Down Expand Up @@ -3101,6 +3135,10 @@ private static PendingActivityInfo constructPendingActivityInfo(
builder.setLastWorkerIdentity(activityTaskData.identity);
}

if (activityTaskData.lastAttemptCompleteTime != null) {
builder.setLastAttemptCompleteTime(activityTaskData.lastAttemptCompleteTime);
}

// Some ids are only present in the schedule event...
if (activityTaskData.scheduledEvent != null) {
populatePendingActivityInfoFromScheduledEvent(builder, activityTaskData.scheduledEvent);
Expand Down Expand Up @@ -3145,12 +3183,8 @@ private static void populatePendingActivityInfoFromScheduledEvent(

private static void populatePendingActivityInfoFromPollResponse(
PendingActivityInfo.Builder builder, PollActivityTaskQueueResponseOrBuilder task) {
// In golang, we set one but never both of these fields, depending on the activity state
if (builder.getState() == PendingActivityState.PENDING_ACTIVITY_STATE_SCHEDULED) {
builder.setScheduledTime(task.getScheduledTime());
} else {
builder.setLastStartedTime(task.getStartedTime());
}
builder.setScheduledTime(task.getScheduledTime());
builder.setLastStartedTime(task.getStartedTime());
}

private static void populatePendingActivityInfoFromHeartbeatDetails(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public void testSuccessfulActivity() throws InterruptedException {
.setMaximumAttempts(2)
// times should be present, but we can't know what the expected value is if this test is
// going to run against the real server.
.setScheduledTime(actual.getScheduledTime())
.setLastStartedTime(actual.getLastStartedTime())
.setExpirationTime(actual.getExpirationTime())
.build();
Expand Down Expand Up @@ -266,8 +267,10 @@ public void testFailedActivity() throws InterruptedException {
.setMaximumAttempts(2)
// times should be present, but we can't know what the expected value is if this test is
// going to run against the real server.
.setScheduledTime(actual.getScheduledTime())
.setLastStartedTime(actual.getLastStartedTime())
.setExpirationTime(actual.getExpirationTime())
.setLastAttemptCompleteTime(actual.getLastAttemptCompleteTime())
// this ends up being a dummy value, but if it weren't, we still wouldn't expect to know
// it.
.setLastWorkerIdentity(actual.getLastWorkerIdentity())
Expand Down Expand Up @@ -333,6 +336,7 @@ private void testKilledWorkflow(
.setMaximumAttempts(2)
// times should be present, but we can't know what the expected value is if this test is
// going to run against the real server.
.setScheduledTime(actual.getScheduledTime())
.setLastStartedTime(actual.getLastStartedTime())
.setExpirationTime(actual.getExpirationTime())
// this ends up being a dummy value, but if it weren't, we still wouldn't expect to know
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

package io.temporal.testserver.functional;

import static org.junit.Assume.assumeFalse;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.api.common.v1.Payloads;
Expand Down Expand Up @@ -548,7 +546,6 @@ public void updateAndPollByWorkflowId() {
@Test
public void getCompletedUpdateOfCompletedWorkflow() {
// Assert that we can get and poll a completed update from a completed workflow.
assumeFalse("Skipping as real server has a bug", SDKTestWorkflowRule.useExternalService);

WorkflowOptions options =
WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build();
Expand Down Expand Up @@ -593,7 +590,7 @@ public void getCompletedUpdateOfCompletedWorkflow() {

@Test
public void getIncompleteUpdateOfCompletedWorkflow() {
// Assert that we can't get an incomplete update of a completed workflow. Expect a NOT_FOUND
// Assert that the server fails an incomplete update if the workflow is completed.
WorkflowOptions options =
WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build();

Expand All @@ -617,48 +614,60 @@ public void getIncompleteUpdateOfCompletedWorkflow() {
workflowStub.signal();
workflowStub.execute();

StatusRuntimeException exception =
Assert.assertThrows(
StatusRuntimeException.class,
() ->
updateWorkflow(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
TestWorkflows.UpdateType.BLOCK));
Assert.assertEquals(Status.NOT_FOUND.getCode(), exception.getStatus().getCode());
exception =
Assert.assertThrows(
StatusRuntimeException.class,
() ->
updateWorkflow(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
TestWorkflows.UpdateType.BLOCK));
Assert.assertEquals(Status.NOT_FOUND.getCode(), exception.getStatus().getCode());
exception =
Assert.assertThrows(
StatusRuntimeException.class,
() ->
pollWorkflowUpdate(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED));
Assert.assertEquals(Status.NOT_FOUND.getCode(), exception.getStatus().getCode());
exception =
Assert.assertThrows(
StatusRuntimeException.class,
() ->
pollWorkflowUpdate(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED));
Assert.assertEquals(Status.NOT_FOUND.getCode(), exception.getStatus().getCode());
response =
updateWorkflow(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
TestWorkflows.UpdateType.BLOCK);
Assert.assertEquals(
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
response.getStage());
assertUpdateOutcomeIsAcceptedUpdateCompletedWorkflow(response.getOutcome());

response =
updateWorkflow(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
TestWorkflows.UpdateType.BLOCK);
Assert.assertEquals(
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
response.getStage());
assertUpdateOutcomeIsAcceptedUpdateCompletedWorkflow(response.getOutcome());

PollWorkflowExecutionUpdateResponse pollResponse =
pollWorkflowUpdate(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED);
Assert.assertEquals(
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
pollResponse.getStage());
assertUpdateOutcomeIsAcceptedUpdateCompletedWorkflow(pollResponse.getOutcome());

pollResponse =
pollWorkflowUpdate(
exec,
"updateId",
UpdateWorkflowExecutionLifecycleStage
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED);
Assert.assertEquals(
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
pollResponse.getStage());
assertUpdateOutcomeIsAcceptedUpdateCompletedWorkflow(pollResponse.getOutcome());
}

private void assertUpdateOutcomeIsAcceptedUpdateCompletedWorkflow(Outcome outcome) {
Assert.assertEquals(
"Workflow Update failed because the Workflow completed before the Update completed.",
outcome.getFailure().getMessage());
Assert.assertEquals(
"AcceptedUpdateCompletedWorkflow",
outcome.getFailure().getApplicationFailureInfo().getType());
}

private UpdateWorkflowExecutionResponse updateWorkflow(
Expand Down
Loading