Skip to content

Add WorkerInterceptorBase and improve WorkerInterceptor docs to help new users of interceptors #1231

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,62 @@
/**
* Intercepts workflow and activity executions.
*
* <p>TODO(maxim): JavaDoc with sample
* <p>Prefer extending {@link WorkerInterceptorBase} and overriding only the methods you need
* instead of implementing this interface directly. {@link WorkerInterceptorBase} provides correct
* default implementations to all the methods of this interface.
*
* <p>You may want to start your implementation with this initial structure:
*
* <pre>{@code
* public class CustomWorkerInterceptor extends WorkerInterceptorBase {
* // remove if you don't need to have a custom WorkflowInboundCallsInterceptor or
* // WorkflowOutboundCallsInterceptor
* @Override
* public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next) {
* return new CustomWorkflowInboundCallsInterceptor(next) {
* // remove if you don't need to have a custom WorkflowOutboundCallsInterceptor
* @Override
* public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
* next.init(new CustomWorkflowOutboundCallsInterceptor(outboundCalls));
* }
* };
* }
*
* // remove if you don't need to have a custom ActivityInboundCallsInterceptor
* @Override
* public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) {
* return new CustomActivityInboundCallsInterceptor(next);
* }
*
* private static class CustomWorkflowInboundCallsInterceptor
* extends WorkflowInboundCallsInterceptorBase {
* public CustomWorkflowInboundCallsInterceptor(WorkflowInboundCallsInterceptor next) {
* super(next);
* }
*
* // override only the methods you need
* }
*
* private static class CustomWorkflowOutboundCallsInterceptor
* extends WorkflowOutboundCallsInterceptorBase {
* public CustomWorkflowOutboundCallsInterceptor(WorkflowOutboundCallsInterceptor next) {
* super(next);
* }
*
* // override only the methods you need
* }
*
* private static class CustomActivityInboundCallsInterceptor
* extends ActivityInboundCallsInterceptorBase {
* public CustomActivityInboundCallsInterceptor(ActivityInboundCallsInterceptor next) {
* super(next);
* }
*
* // override only the methods you need
* }
* }
*
* }</pre>
*/
@Experimental
public interface WorkerInterceptor {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.common.interceptors;

public class WorkerInterceptorBase implements WorkerInterceptor {
@Override
public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next) {
return next;
}

@Override
public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) {
return next;
}
}
72 changes: 30 additions & 42 deletions temporal-sdk/src/test/java/io/temporal/workflow/MetricsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,7 @@
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.common.RetryOptions;
import io.temporal.common.interceptors.ActivityInboundCallsInterceptor;
import io.temporal.common.interceptors.WorkerInterceptor;
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptorBase;
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
import io.temporal.common.interceptors.*;
import io.temporal.common.reporter.TestStatsReporter;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.WorkflowServiceStubs;
Expand All @@ -57,7 +53,6 @@
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactoryOptions;
import io.temporal.worker.WorkerMetricsTag;
import io.temporal.workflow.interceptors.SignalWorkflowOutboundCallsInterceptor;
import io.temporal.workflow.shared.TestActivities.TestActivitiesImpl;
import io.temporal.workflow.shared.TestActivities.TestActivity3;
import io.temporal.workflow.shared.TestActivities.VariousTestActivities;
Expand All @@ -69,6 +64,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -261,19 +257,7 @@ public void testCorruptedSignalMetrics() throws InterruptedException {
.setWorkerInterceptors(
new CorruptedSignalWorkerInterceptor(),
// Add noop just to test that list of interceptors is working.
new WorkerInterceptor() {
@Override
public WorkflowInboundCallsInterceptor interceptWorkflow(
WorkflowInboundCallsInterceptor next) {
return next;
}

@Override
public ActivityInboundCallsInterceptor interceptActivity(
ActivityInboundCallsInterceptor next) {
return next;
}
})
new WorkerInterceptorBase())
.build());

Worker worker = testEnvironment.newWorker(TASK_QUEUE);
Expand Down Expand Up @@ -341,24 +325,7 @@ public void testTemporalFailureMetric() throws InterruptedException {

@Test
public void testTemporalActivityFailureMetric() throws InterruptedException {
setUp(
WorkerFactoryOptions.newBuilder()
.setWorkerInterceptors(
// Add noop just to test that list of interceptors is working.
new WorkerInterceptor() {
@Override
public WorkflowInboundCallsInterceptor interceptWorkflow(
WorkflowInboundCallsInterceptor next) {
return next;
}

@Override
public ActivityInboundCallsInterceptor interceptActivity(
ActivityInboundCallsInterceptor next) {
return next;
}
})
.build());
setUp(WorkerFactoryOptions.getDefaultInstance());

Worker worker = testEnvironment.newWorker(TASK_QUEUE);
worker.registerWorkflowImplementationTypes(TestActivityFailureCountersWorkflow.class);
Expand Down Expand Up @@ -603,12 +570,10 @@ public void execute() {
}

public static class Signal {

public String value;
}

private static class CorruptedSignalWorkerInterceptor implements WorkerInterceptor {

private static class CorruptedSignalWorkerInterceptor extends WorkerInterceptorBase {
@Override
public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next) {
return new WorkflowInboundCallsInterceptorBase(next) {
Expand All @@ -627,10 +592,33 @@ public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
}
};
}
}

private static class SignalWorkflowOutboundCallsInterceptor
extends WorkflowOutboundCallsInterceptorBase {
private final Function<Object[], Object[]> overrideArgs;
private final Function<String, String> overrideSignalName;

public SignalWorkflowOutboundCallsInterceptor(
Function<Object[], Object[]> overrideArgs,
Function<String, String> overrideSignalName,
WorkflowOutboundCallsInterceptor next) {
super(next);
this.overrideArgs = overrideArgs;
this.overrideSignalName = overrideSignalName;
}

@Override
public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) {
return next;
public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
Object[] args = input.getArgs();
if (args != null && args.length > 0) {
args = new Object[] {"corrupted signal"};
}
return super.signalExternalWorkflow(
new SignalExternalInput(
input.getExecution(),
overrideSignalName.apply(input.getSignalName()),
overrideArgs.apply(args)));
}
}
}

This file was deleted.