43
43
import io .temporal .client .WorkflowOptions ;
44
44
import io .temporal .client .WorkflowStub ;
45
45
import io .temporal .common .RetryOptions ;
46
- import io .temporal .common .interceptors .ActivityInboundCallsInterceptor ;
47
- import io .temporal .common .interceptors .WorkerInterceptor ;
48
- import io .temporal .common .interceptors .WorkflowInboundCallsInterceptor ;
49
- import io .temporal .common .interceptors .WorkflowInboundCallsInterceptorBase ;
50
- import io .temporal .common .interceptors .WorkflowOutboundCallsInterceptor ;
46
+ import io .temporal .common .interceptors .*;
51
47
import io .temporal .common .reporter .TestStatsReporter ;
52
48
import io .temporal .serviceclient .MetricsTag ;
53
49
import io .temporal .serviceclient .WorkflowServiceStubs ;
57
53
import io .temporal .worker .Worker ;
58
54
import io .temporal .worker .WorkerFactoryOptions ;
59
55
import io .temporal .worker .WorkerMetricsTag ;
60
- import io .temporal .workflow .interceptors .SignalWorkflowOutboundCallsInterceptor ;
61
56
import io .temporal .workflow .shared .TestActivities .TestActivitiesImpl ;
62
57
import io .temporal .workflow .shared .TestActivities .TestActivity3 ;
63
58
import io .temporal .workflow .shared .TestActivities .VariousTestActivities ;
69
64
import java .util .Map ;
70
65
import java .util .concurrent .CompletableFuture ;
71
66
import java .util .concurrent .ExecutionException ;
67
+ import java .util .function .Function ;
72
68
import org .junit .After ;
73
69
import org .junit .Rule ;
74
70
import org .junit .Test ;
@@ -261,19 +257,7 @@ public void testCorruptedSignalMetrics() throws InterruptedException {
261
257
.setWorkerInterceptors (
262
258
new CorruptedSignalWorkerInterceptor (),
263
259
// Add noop just to test that list of interceptors is working.
264
- new WorkerInterceptor () {
265
- @ Override
266
- public WorkflowInboundCallsInterceptor interceptWorkflow (
267
- WorkflowInboundCallsInterceptor next ) {
268
- return next ;
269
- }
270
-
271
- @ Override
272
- public ActivityInboundCallsInterceptor interceptActivity (
273
- ActivityInboundCallsInterceptor next ) {
274
- return next ;
275
- }
276
- })
260
+ new WorkerInterceptorBase ())
277
261
.build ());
278
262
279
263
Worker worker = testEnvironment .newWorker (TASK_QUEUE );
@@ -341,24 +325,7 @@ public void testTemporalFailureMetric() throws InterruptedException {
341
325
342
326
@ Test
343
327
public void testTemporalActivityFailureMetric () throws InterruptedException {
344
- setUp (
345
- WorkerFactoryOptions .newBuilder ()
346
- .setWorkerInterceptors (
347
- // Add noop just to test that list of interceptors is working.
348
- new WorkerInterceptor () {
349
- @ Override
350
- public WorkflowInboundCallsInterceptor interceptWorkflow (
351
- WorkflowInboundCallsInterceptor next ) {
352
- return next ;
353
- }
354
-
355
- @ Override
356
- public ActivityInboundCallsInterceptor interceptActivity (
357
- ActivityInboundCallsInterceptor next ) {
358
- return next ;
359
- }
360
- })
361
- .build ());
328
+ setUp (WorkerFactoryOptions .getDefaultInstance ());
362
329
363
330
Worker worker = testEnvironment .newWorker (TASK_QUEUE );
364
331
worker .registerWorkflowImplementationTypes (TestActivityFailureCountersWorkflow .class );
@@ -603,12 +570,10 @@ public void execute() {
603
570
}
604
571
605
572
public static class Signal {
606
-
607
573
public String value ;
608
574
}
609
575
610
- private static class CorruptedSignalWorkerInterceptor implements WorkerInterceptor {
611
-
576
+ private static class CorruptedSignalWorkerInterceptor extends WorkerInterceptorBase {
612
577
@ Override
613
578
public WorkflowInboundCallsInterceptor interceptWorkflow (WorkflowInboundCallsInterceptor next ) {
614
579
return new WorkflowInboundCallsInterceptorBase (next ) {
@@ -627,10 +592,33 @@ public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
627
592
}
628
593
};
629
594
}
595
+ }
596
+
597
+ private static class SignalWorkflowOutboundCallsInterceptor
598
+ extends WorkflowOutboundCallsInterceptorBase {
599
+ private final Function <Object [], Object []> overrideArgs ;
600
+ private final Function <String , String > overrideSignalName ;
601
+
602
+ public SignalWorkflowOutboundCallsInterceptor (
603
+ Function <Object [], Object []> overrideArgs ,
604
+ Function <String , String > overrideSignalName ,
605
+ WorkflowOutboundCallsInterceptor next ) {
606
+ super (next );
607
+ this .overrideArgs = overrideArgs ;
608
+ this .overrideSignalName = overrideSignalName ;
609
+ }
630
610
631
611
@ Override
632
- public ActivityInboundCallsInterceptor interceptActivity (ActivityInboundCallsInterceptor next ) {
633
- return next ;
612
+ public SignalExternalOutput signalExternalWorkflow (SignalExternalInput input ) {
613
+ Object [] args = input .getArgs ();
614
+ if (args != null && args .length > 0 ) {
615
+ args = new Object [] {"corrupted signal" };
616
+ }
617
+ return super .signalExternalWorkflow (
618
+ new SignalExternalInput (
619
+ input .getExecution (),
620
+ overrideSignalName .apply (input .getSignalName ()),
621
+ overrideArgs .apply (args )));
634
622
}
635
623
}
636
624
}
0 commit comments