43
43
import io .temporal .client .WorkflowOptions ;
44
44
import io .temporal .client .WorkflowStub ;
45
45
import io .temporal .common .RetryOptions ;
46
- import io .temporal .common .interceptors .ActivityInboundCallsInterceptor ;
47
- import io .temporal .common .interceptors .WorkerInterceptor ;
48
- import io .temporal .common .interceptors .WorkflowInboundCallsInterceptor ;
49
- import io .temporal .common .interceptors .WorkflowInboundCallsInterceptorBase ;
50
- import io .temporal .common .interceptors .WorkflowOutboundCallsInterceptor ;
46
+ import io .temporal .common .interceptors .*;
51
47
import io .temporal .common .reporter .TestStatsReporter ;
52
48
import io .temporal .serviceclient .MetricsTag ;
53
49
import io .temporal .serviceclient .WorkflowServiceStubs ;
57
53
import io .temporal .worker .Worker ;
58
54
import io .temporal .worker .WorkerFactoryOptions ;
59
55
import io .temporal .worker .WorkerMetricsTag ;
60
- import io .temporal .workflow .interceptors .SignalWorkflowOutboundCallsInterceptor ;
61
56
import io .temporal .workflow .shared .TestActivities .TestActivitiesImpl ;
62
57
import io .temporal .workflow .shared .TestActivities .TestActivity3 ;
63
58
import io .temporal .workflow .shared .TestActivities .VariousTestActivities ;
69
64
import java .util .Map ;
70
65
import java .util .concurrent .CompletableFuture ;
71
66
import java .util .concurrent .ExecutionException ;
67
+ import java .util .function .Function ;
68
+
72
69
import org .junit .After ;
73
70
import org .junit .Rule ;
74
71
import org .junit .Test ;
@@ -261,19 +258,7 @@ public void testCorruptedSignalMetrics() throws InterruptedException {
261
258
.setWorkerInterceptors (
262
259
new CorruptedSignalWorkerInterceptor (),
263
260
// Add noop just to test that list of interceptors is working.
264
- new WorkerInterceptor () {
265
- @ Override
266
- public WorkflowInboundCallsInterceptor interceptWorkflow (
267
- WorkflowInboundCallsInterceptor next ) {
268
- return next ;
269
- }
270
-
271
- @ Override
272
- public ActivityInboundCallsInterceptor interceptActivity (
273
- ActivityInboundCallsInterceptor next ) {
274
- return next ;
275
- }
276
- })
261
+ new WorkerInterceptorBase ())
277
262
.build ());
278
263
279
264
Worker worker = testEnvironment .newWorker (TASK_QUEUE );
@@ -341,24 +326,7 @@ public void testTemporalFailureMetric() throws InterruptedException {
341
326
342
327
@ Test
343
328
public void testTemporalActivityFailureMetric () throws InterruptedException {
344
- setUp (
345
- WorkerFactoryOptions .newBuilder ()
346
- .setWorkerInterceptors (
347
- // Add noop just to test that list of interceptors is working.
348
- new WorkerInterceptor () {
349
- @ Override
350
- public WorkflowInboundCallsInterceptor interceptWorkflow (
351
- WorkflowInboundCallsInterceptor next ) {
352
- return next ;
353
- }
354
-
355
- @ Override
356
- public ActivityInboundCallsInterceptor interceptActivity (
357
- ActivityInboundCallsInterceptor next ) {
358
- return next ;
359
- }
360
- })
361
- .build ());
329
+ setUp (WorkerFactoryOptions .getDefaultInstance ());
362
330
363
331
Worker worker = testEnvironment .newWorker (TASK_QUEUE );
364
332
worker .registerWorkflowImplementationTypes (TestActivityFailureCountersWorkflow .class );
@@ -603,12 +571,10 @@ public void execute() {
603
571
}
604
572
605
573
public static class Signal {
606
-
607
574
public String value ;
608
575
}
609
576
610
- private static class CorruptedSignalWorkerInterceptor implements WorkerInterceptor {
611
-
577
+ private static class CorruptedSignalWorkerInterceptor extends WorkerInterceptorBase {
612
578
@ Override
613
579
public WorkflowInboundCallsInterceptor interceptWorkflow (WorkflowInboundCallsInterceptor next ) {
614
580
return new WorkflowInboundCallsInterceptorBase (next ) {
@@ -627,10 +593,87 @@ public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
627
593
}
628
594
};
629
595
}
596
+ }
597
+
598
+ private static class SignalWorkflowOutboundCallsInterceptor extends WorkflowOutboundCallsInterceptorBase {
599
+ private final Function <Object [], Object []> overrideArgs ;
600
+ private final Function <String , String > overrideSignalName ;
601
+
602
+ public SignalWorkflowOutboundCallsInterceptor (
603
+ Function <Object [], Object []> overrideArgs ,
604
+ Function <String , String > overrideSignalName ,
605
+ WorkflowOutboundCallsInterceptor next ) {
606
+ super (next );
607
+ this .overrideArgs = overrideArgs ;
608
+ this .overrideSignalName = overrideSignalName ;
609
+ }
610
+
611
+ @ Override
612
+ public SignalExternalOutput signalExternalWorkflow (SignalExternalInput input ) {
613
+ Object [] args = input .getArgs ();
614
+ if (args != null && args .length > 0 ) {
615
+ args = new Object [] {"corrupted signal" };
616
+ }
617
+ return super .signalExternalWorkflow (
618
+ new SignalExternalInput (
619
+ input .getExecution (),
620
+ overrideSignalName .apply (input .getSignalName ()),
621
+ overrideArgs .apply (args )));
622
+ }
623
+ }
624
+
625
+
626
+
627
+
628
+
629
+
630
+
631
+
632
+
633
+
634
+
635
+
636
+ private static class CustomWorkerInterceptor extends WorkerInterceptorBase {
637
+ // remove if you don't need to have a custom WorkflowInboundCallsInterceptor or WorkflowOutboundCallsInterceptor
638
+ @ Override
639
+ public WorkflowInboundCallsInterceptor interceptWorkflow (WorkflowInboundCallsInterceptor next ) {
640
+ return new CustomWorkflowInboundCallsInterceptor (next ) {
641
+ // remove if you don't need to have a custom WorkflowOutboundCallsInterceptor
642
+ @ Override
643
+ public void init (WorkflowOutboundCallsInterceptor outboundCalls ) {
644
+ next .init (new CustomWorkflowOutboundCallsInterceptor (outboundCalls ));
645
+ }
646
+ };
647
+ }
630
648
649
+ // remove if you don't need to have a custom ActivityInboundCallsInterceptor
631
650
@ Override
632
651
public ActivityInboundCallsInterceptor interceptActivity (ActivityInboundCallsInterceptor next ) {
633
- return next ;
652
+ return new CustomActivityInboundCallsInterceptor ( next ) ;
634
653
}
635
654
}
655
+
656
+ private static class CustomWorkflowInboundCallsInterceptor extends WorkflowInboundCallsInterceptorBase {
657
+ public CustomWorkflowInboundCallsInterceptor (WorkflowInboundCallsInterceptor next ) {
658
+ super (next );
659
+ }
660
+
661
+ // override only the methods you need
662
+ }
663
+
664
+ private static class CustomWorkflowOutboundCallsInterceptor extends WorkflowOutboundCallsInterceptorBase {
665
+ public CustomWorkflowOutboundCallsInterceptor (WorkflowOutboundCallsInterceptor next ) {
666
+ super (next );
667
+ }
668
+
669
+ // override only the methods you need
670
+ }
671
+
672
+ private static class CustomActivityInboundCallsInterceptor extends ActivityInboundCallsInterceptorBase {
673
+ public CustomActivityInboundCallsInterceptor (ActivityInboundCallsInterceptor next ) {
674
+ super (next );
675
+ }
676
+
677
+ // override only the methods you need
678
+ }
636
679
}
0 commit comments