Skip to content

Commit a2455af

Browse files
Make sure workflow_task_execution_failed always has a failure_reason (#2419)
1 parent c6375d6 commit a2455af

File tree

2 files changed

+38
-2
lines changed

2 files changed

+38
-2
lines changed

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import io.temporal.serviceclient.RpcRetryOptions;
4040
import io.temporal.serviceclient.WorkflowServiceStubs;
4141
import io.temporal.worker.MetricsType;
42+
import io.temporal.worker.NonDeterministicException;
4243
import io.temporal.worker.WorkerMetricsTag;
4344
import io.temporal.worker.WorkflowTaskDispatchHandle;
4445
import io.temporal.worker.tuning.SlotReleaseReason;
@@ -474,11 +475,21 @@ private WorkflowTaskHandler.Result handleTask(
474475
try {
475476
return handler.handleWorkflowTask(task);
476477
} catch (Throwable e) {
478+
workflowTypeMetricsScope.counter(MetricsType.WORKFLOW_TASK_NO_COMPLETION_COUNTER).inc(1);
479+
// Make sure that the task failure metric has the correct type
480+
Scope workflowTaskFailureScope = workflowTypeMetricsScope;
481+
if (e instanceof NonDeterministicException) {
482+
workflowTaskFailureScope =
483+
workflowTaskFailureScope.tagged(
484+
ImmutableMap.of(TASK_FAILURE_TYPE, "NonDeterminismError"));
485+
} else {
486+
workflowTaskFailureScope =
487+
workflowTaskFailureScope.tagged(ImmutableMap.of(TASK_FAILURE_TYPE, "WorkflowError"));
488+
}
477489
// more detailed logging that we can do here is already done inside `handler`
478-
workflowTypeMetricsScope
490+
workflowTaskFailureScope
479491
.counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER)
480492
.inc(1);
481-
workflowTypeMetricsScope.counter(MetricsType.WORKFLOW_TASK_NO_COMPLETION_COUNTER).inc(1);
482493
throw e;
483494
} finally {
484495
sw.stop();

temporal-sdk/src/test/java/io/temporal/workflow/determinism/NonDeterministicWorkflowPolicyBlockWorkflowTest.java

+25
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,21 @@
2222

2323
import static org.junit.Assert.*;
2424

25+
import com.google.common.collect.ImmutableMap;
26+
import com.uber.m3.tally.RootScopeBuilder;
27+
import com.uber.m3.tally.Scope;
2528
import io.temporal.api.enums.v1.EventType;
2629
import io.temporal.api.enums.v1.WorkflowTaskFailedCause;
2730
import io.temporal.api.history.v1.HistoryEvent;
2831
import io.temporal.api.history.v1.WorkflowTaskFailedEventAttributes;
2932
import io.temporal.client.WorkflowFailedException;
3033
import io.temporal.client.WorkflowOptions;
3134
import io.temporal.client.WorkflowStub;
35+
import io.temporal.common.reporter.TestStatsReporter;
3236
import io.temporal.failure.TimeoutFailure;
3337
import io.temporal.internal.sync.WorkflowMethodThreadNameStrategy;
3438
import io.temporal.testing.internal.SDKTestWorkflowRule;
39+
import io.temporal.worker.MetricsType;
3540
import io.temporal.worker.NonDeterministicException;
3641
import io.temporal.worker.WorkerOptions;
3742
import io.temporal.workflow.shared.TestActivities.TestActivitiesImpl;
@@ -43,6 +48,9 @@
4348
import org.junit.Test;
4449

4550
public class NonDeterministicWorkflowPolicyBlockWorkflowTest {
51+
private final TestStatsReporter reporter = new TestStatsReporter();
52+
Scope metricsScope =
53+
new RootScopeBuilder().reporter(reporter).reportEvery(com.uber.m3.util.Duration.ofMillis(1));
4654

4755
@Rule
4856
public SDKTestWorkflowRule testWorkflowRule =
@@ -54,6 +62,7 @@ public class NonDeterministicWorkflowPolicyBlockWorkflowTest {
5462
WorkerOptions.newBuilder()
5563
.setStickyQueueScheduleToStartTimeout(Duration.ZERO)
5664
.build())
65+
.setMetricsScope(metricsScope)
5766
.build();
5867

5968
@Test
@@ -89,6 +98,22 @@ public void testNonDeterministicWorkflowPolicyBlockWorkflow() {
8998
assertEquals(
9099
NonDeterministicException.class.getName(),
91100
failedWFTEventAttributes.getFailure().getApplicationFailureInfo().getType());
101+
// Verify that the non-deterministic workflow task failure is reported for all the workflow
102+
// tasks
103+
reporter.assertCounter(
104+
MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER,
105+
ImmutableMap.of(
106+
"task_queue",
107+
testWorkflowRule.getTaskQueue(),
108+
"namespace",
109+
"UnitTest",
110+
"workflow_type",
111+
"TestWorkflowStringArg",
112+
"worker_type",
113+
"WorkflowWorker",
114+
"failure_reason",
115+
"NonDeterminismError"),
116+
(i) -> i >= 2);
92117
}
93118

94119
@Test

0 commit comments

Comments
 (0)