Skip to content

Make getResult methods to throw TimeoutException instead of raw gRPC DEADLINE_EXCEEDED #1209

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 2 commits into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import com.google.protobuf.ByteString;
import io.grpc.Deadline;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.EventType;
Expand All @@ -33,26 +35,62 @@
import io.temporal.common.converter.DataConverter;
import io.temporal.internal.client.external.GenericWorkflowClient;
import io.temporal.internal.common.WorkflowExecutionUtils;
import io.temporal.serviceclient.CheckedExceptionWrapper;
import java.util.Optional;
import java.util.concurrent.*;
import javax.annotation.Nonnull;

/** This class encapsulates async long poll logic of {@link RootWorkflowClientInvoker} */
final class WorkflowClientLongPollAsyncHelper {

static CompletableFuture<Optional<Payloads>> getWorkflowExecutionResultAsync(
GenericWorkflowClient genericClient,
WorkflowClientRequestFactory workflowClientHelper,
WorkflowExecution workflowExecution,
@Nonnull WorkflowExecution workflowExecution,
Optional<String> workflowType,
long timeout,
TimeUnit unit,
DataConverter converter) {
Deadline longPollTimeoutDeadline = Deadline.after(timeout, unit);
return getInstanceCloseEventAsync(
genericClient, workflowClientHelper, workflowExecution, ByteString.EMPTY, timeout, unit)
.thenApply(
(closeEvent) ->
getResultFromCloseEvent(workflowExecution, workflowType, closeEvent, converter));
genericClient,
workflowClientHelper,
workflowExecution,
ByteString.EMPTY,
longPollTimeoutDeadline)
.handle(
(closeEvent, e) -> {
if (e == null) {
return getResultFromCloseEvent(
workflowExecution, workflowType, closeEvent, converter);
} else {
throw handleException(e, longPollTimeoutDeadline, workflowExecution, timeout, unit);
}
});
}

private static CompletionException handleException(
Throwable e,
Deadline longPollTimeoutDeadline,
@Nonnull WorkflowExecution workflowExecution,
long timeout,
TimeUnit unit) {
if (e instanceof CompletionException) {
Throwable cause = e.getCause();
if (longPollTimeoutDeadline.isExpired()
&& cause instanceof StatusRuntimeException
&& Status.Code.DEADLINE_EXCEEDED.equals(
((StatusRuntimeException) cause).getStatus().getCode())) {
// we want to form timeout exception only if the original deadline is indeed expired.
// Otherwise, we should rethrow a raw DEADLINE_EXCEEDED. throwing TimeoutException
// in this case will be highly misleading.
return new CompletionException(
WorkflowClientLongPollHelper.newTimeoutException(workflowExecution, timeout, unit));
} else {
return (CompletionException) e;
}
} else {
return new CompletionException(e);
}
}

/** Returns an instance closing event, potentially waiting for workflow to complete. */
Expand All @@ -61,34 +99,28 @@ private static CompletableFuture<HistoryEvent> getInstanceCloseEventAsync(
WorkflowClientRequestFactory workflowClientHelper,
final WorkflowExecution workflowExecution,
ByteString pageToken,
long timeout,
TimeUnit unit) {
Deadline longPollTimeoutDeadline) {
GetWorkflowExecutionHistoryRequest request =
workflowClientHelper.newHistoryLongPollRequest(workflowExecution, pageToken);
Deadline deadline = Deadline.after(timeout, unit);
CompletableFuture<GetWorkflowExecutionHistoryResponse> response =
genericClient.longPollHistoryAsync(request, deadline);
genericClient.longPollHistoryAsync(request, longPollTimeoutDeadline);
return response.thenComposeAsync(
(r) -> {
// TODO to fix https://github.com/temporalio/sdk-java/issues/1177 we need to process
// DEADLINE_EXCEEDED
// or an underlying TimeoutException
if (deadline.isExpired()) {
// TODO check that such throwing populates a stacktrace into TimeoutException. It likely
// doesn't.
// Instead a CompletionException should be used
throw CheckedExceptionWrapper.wrap(
WorkflowClientLongPollHelper.newTimeoutException(workflowExecution, timeout, unit));
}
History history = r.getHistory();
if (history.getEventsCount() == 0) {
// Empty poll returned
ByteString nextPageToken =
r.getNextPageToken().isEmpty() ? pageToken : r.getNextPageToken();
return getInstanceCloseEventAsync(
genericClient, workflowClientHelper, workflowExecution, pageToken, timeout, unit);
genericClient,
workflowClientHelper,
workflowExecution,
nextPageToken,
longPollTimeoutDeadline);
}
HistoryEvent event = history.getEvents(0);
HistoryEvent event = history.getEvents(0); // should be only one event
if (!WorkflowExecutionUtils.isWorkflowExecutionClosedEvent(event)) {
throw new RuntimeException("Last history event is not completion event: " + event);
throw new RuntimeException("Unexpected workflow execution closing event: " + event);
}
// Workflow called continueAsNew. Start polling the new generation with new runId.
if (event.getEventType() == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW) {
Expand All @@ -104,9 +136,8 @@ private static CompletableFuture<HistoryEvent> getInstanceCloseEventAsync(
genericClient,
workflowClientHelper,
nextWorkflowExecution,
r.getNextPageToken(),
timeout,
unit);
ByteString.EMPTY,
longPollTimeoutDeadline);
}
return CompletableFuture.completedFuture(event);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.google.protobuf.ByteString;
import io.grpc.Deadline;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.*;
Expand All @@ -35,6 +37,7 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;

/** This class encapsulates sync long poll logic of {@link RootWorkflowClientInvoker} */
final class WorkflowClientLongPollHelper {
Expand All @@ -51,7 +54,7 @@ final class WorkflowClientLongPollHelper {
static Optional<Payloads> getWorkflowExecutionResult(
GenericWorkflowClient genericClient,
WorkflowClientRequestFactory workflowClientHelper,
WorkflowExecution workflowExecution,
@Nonnull WorkflowExecution workflowExecution,
Optional<String> workflowType,
DataConverter converter,
long timeout,
Expand All @@ -75,7 +78,7 @@ static Optional<Payloads> getWorkflowExecutionResult(
static WorkflowExecutionStatus waitForWorkflowInstanceCompletion(
GenericWorkflowClient genericClient,
WorkflowClientRequestFactory workflowClientHelper,
WorkflowExecution workflowExecution,
@Nonnull WorkflowExecution workflowExecution,
long timeout,
TimeUnit unit)
throws TimeoutException {
Expand All @@ -95,36 +98,36 @@ static WorkflowExecutionStatus waitForWorkflowInstanceCompletion(
private static HistoryEvent getInstanceCloseEvent(
GenericWorkflowClient genericClient,
WorkflowClientRequestFactory workflowClientHelper,
WorkflowExecution workflowExecution,
@Nonnull WorkflowExecution workflowExecution,
long timeout,
TimeUnit unit)
throws TimeoutException {
ByteString pageToken = ByteString.EMPTY;
GetWorkflowExecutionHistoryResponse response;
Deadline deadline = Deadline.after(timeout, unit);

do {
if (deadline.isExpired()) {
throw newTimeoutException(workflowExecution, timeout, unit);
}
Deadline longPollTimeoutDeadline = Deadline.after(timeout, unit);

while (true) {
GetWorkflowExecutionHistoryRequest request =
workflowClientHelper.newHistoryLongPollRequest(workflowExecution, pageToken);
// TODO to fix https://github.com/temporalio/sdk-java/issues/1177 we need to process
// DEADLINE_EXCEEDED correctly. It may be thrown if a deadline of one request is exceeded,
// but the total timeout is not.
response = genericClient.longPollHistory(request, deadline);

if (response == null || !response.hasHistory()) {
continue;
try {
response = genericClient.longPollHistory(request, longPollTimeoutDeadline);
} catch (StatusRuntimeException e) {
if (longPollTimeoutDeadline.isExpired()
&& Status.Code.DEADLINE_EXCEEDED.equals(e.getStatus().getCode())) {
// we want to form timeout exception only if the original deadline is indeed expired.
// Otherwise, we should rethrow a raw DEADLINE_EXCEEDED. throwing TimeoutException
// in this case will be highly misleading.
throw newTimeoutException(workflowExecution, timeout, unit);
}
throw e;
}

pageToken = response.getNextPageToken();
History history = response.getHistory();
if (history.getEventsCount() > 0) {
HistoryEvent event = history.getEvents(0);
HistoryEvent event = history.getEvents(0); // should be only one event
if (!WorkflowExecutionUtils.isWorkflowExecutionClosedEvent(event)) {
throw new RuntimeException("Last history event is not completion event: " + event);
throw new RuntimeException("Unexpected workflow execution closing event: " + event);
}
// Workflow called continueAsNew. Start polling the new execution with new runId.
if (event.getEventType() == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW) {
Expand All @@ -141,11 +144,14 @@ private static HistoryEvent getInstanceCloseEvent(
}
return event;
}
} while (true);
if (!response.getNextPageToken().isEmpty()) {
pageToken = response.getNextPageToken();
}
}
}

static TimeoutException newTimeoutException(
WorkflowExecution workflowExecution, long timeout, TimeUnit unit) {
@Nonnull WorkflowExecution workflowExecution, long timeout, TimeUnit unit) {
return new TimeoutException(
"WorkflowId="
+ workflowExecution.getWorkflowId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.workflowservice.v1.*;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;

public interface GenericWorkflowClient {

Expand All @@ -39,8 +40,8 @@ public interface GenericWorkflowClient {
void terminate(TerminateWorkflowExecutionRequest request);

GetWorkflowExecutionHistoryResponse longPollHistory(
GetWorkflowExecutionHistoryRequest request, Deadline deadline);
@Nonnull GetWorkflowExecutionHistoryRequest request, @Nonnull Deadline deadline);

CompletableFuture<GetWorkflowExecutionHistoryResponse> longPollHistoryAsync(
GetWorkflowExecutionHistoryRequest request, Deadline deadline);
@Nonnull GetWorkflowExecutionHistoryRequest request, @Nonnull Deadline deadline);
}
Loading