15
15
*/
16
16
package io .serverlessworkflow .impl ;
17
17
18
+ import static io .serverlessworkflow .impl .WorkflowUtils .*;
18
19
import static io .serverlessworkflow .impl .json .JsonUtils .*;
19
20
20
21
import com .fasterxml .jackson .databind .JsonNode ;
22
+ import io .serverlessworkflow .api .types .Input ;
23
+ import io .serverlessworkflow .api .types .Output ;
21
24
import io .serverlessworkflow .api .types .TaskBase ;
22
25
import io .serverlessworkflow .api .types .TaskItem ;
23
26
import io .serverlessworkflow .api .types .Workflow ;
24
27
import io .serverlessworkflow .impl .executors .DefaultTaskExecutorFactory ;
25
28
import io .serverlessworkflow .impl .executors .TaskExecutor ;
26
29
import io .serverlessworkflow .impl .executors .TaskExecutorFactory ;
30
+ import io .serverlessworkflow .impl .expressions .ExpressionFactory ;
31
+ import io .serverlessworkflow .impl .expressions .JQExpressionFactory ;
27
32
import io .serverlessworkflow .impl .json .JsonUtils ;
33
+ import io .serverlessworkflow .impl .jsonschema .DefaultSchemaValidatorFactory ;
34
+ import io .serverlessworkflow .impl .jsonschema .SchemaValidator ;
35
+ import io .serverlessworkflow .impl .jsonschema .SchemaValidatorFactory ;
36
+ import io .serverlessworkflow .resources .DefaultResourceLoaderFactory ;
37
+ import io .serverlessworkflow .resources .ResourceLoaderFactory ;
38
+ import java .nio .file .Path ;
28
39
import java .util .Collection ;
29
40
import java .util .Collections ;
30
41
import java .util .HashSet ;
31
42
import java .util .List ;
32
43
import java .util .Map ;
44
+ import java .util .Optional ;
33
45
import java .util .concurrent .ConcurrentHashMap ;
34
46
35
47
public class WorkflowDefinition {
36
48
37
49
private WorkflowDefinition (
38
50
Workflow workflow ,
39
- TaskExecutorFactory taskFactory ,
40
- Collection < WorkflowExecutionListener > listeners ) {
51
+ Collection < WorkflowExecutionListener > listeners ,
52
+ WorkflowFactories factories ) {
41
53
this .workflow = workflow ;
42
- this .taskFactory = taskFactory ;
43
54
this .listeners = listeners ;
55
+ this .factories = factories ;
56
+ if (workflow .getInput () != null ) {
57
+ Input input = workflow .getInput ();
58
+ this .inputSchemaValidator =
59
+ getSchemaValidator (
60
+ factories .getValidatorFactory (), schemaToNode (factories , input .getSchema ()));
61
+ this .inputFilter = buildWorkflowFilter (factories .getExpressionFactory (), input .getFrom ());
62
+ }
63
+ if (workflow .getOutput () != null ) {
64
+ Output output = workflow .getOutput ();
65
+ this .outputSchemaValidator =
66
+ getSchemaValidator (
67
+ factories .getValidatorFactory (), schemaToNode (factories , output .getSchema ()));
68
+ this .outputFilter = buildWorkflowFilter (factories .getExpressionFactory (), output .getAs ());
69
+ }
44
70
}
45
71
46
72
private final Workflow workflow ;
47
73
private final Collection <WorkflowExecutionListener > listeners ;
48
- private final TaskExecutorFactory taskFactory ;
74
+ private final WorkflowFactories factories ;
75
+ private Optional <SchemaValidator > inputSchemaValidator = Optional .empty ();
76
+ private Optional <SchemaValidator > outputSchemaValidator = Optional .empty ();
77
+ private Optional <WorkflowFilter > inputFilter = Optional .empty ();
78
+ private Optional <WorkflowFilter > outputFilter = Optional .empty ();
79
+
49
80
private final Map <String , TaskExecutor <? extends TaskBase >> taskExecutors =
50
81
new ConcurrentHashMap <>();
51
82
52
83
public static class Builder {
53
84
private final Workflow workflow ;
54
85
private TaskExecutorFactory taskFactory = DefaultTaskExecutorFactory .get ();
86
+ private ExpressionFactory exprFactory = JQExpressionFactory .get ();
55
87
private Collection <WorkflowExecutionListener > listeners ;
88
+ private ResourceLoaderFactory resourceLoaderFactory = DefaultResourceLoaderFactory .get ();
89
+ private SchemaValidatorFactory schemaValidatorFactory = DefaultSchemaValidatorFactory .get ();
90
+ private Path path ;
56
91
57
92
private Builder (Workflow workflow ) {
58
93
this .workflow = workflow ;
@@ -71,13 +106,39 @@ public Builder withTaskExecutorFactory(TaskExecutorFactory factory) {
71
106
return this ;
72
107
}
73
108
109
+ public Builder withExpressionFactory (ExpressionFactory factory ) {
110
+ this .exprFactory = factory ;
111
+ return this ;
112
+ }
113
+
114
+ public Builder withPath (Path path ) {
115
+ this .path = path ;
116
+ return this ;
117
+ }
118
+
119
+ public Builder withResourceLoaderFactory (ResourceLoaderFactory resourceLoader ) {
120
+ this .resourceLoaderFactory = resourceLoader ;
121
+ return this ;
122
+ }
123
+
124
+ public Builder withSchemaValidatorFactory (SchemaValidatorFactory factory ) {
125
+ this .schemaValidatorFactory = factory ;
126
+ return this ;
127
+ }
128
+
74
129
public WorkflowDefinition build () {
75
- return new WorkflowDefinition (
76
- workflow ,
77
- taskFactory ,
78
- listeners == null
79
- ? Collections .emptySet ()
80
- : Collections .unmodifiableCollection (listeners ));
130
+ WorkflowDefinition def =
131
+ new WorkflowDefinition (
132
+ workflow ,
133
+ listeners == null
134
+ ? Collections .emptySet ()
135
+ : Collections .unmodifiableCollection (listeners ),
136
+ new WorkflowFactories (
137
+ taskFactory ,
138
+ resourceLoaderFactory .getResourceLoader (path ),
139
+ exprFactory ,
140
+ schemaValidatorFactory ));
141
+ return def ;
81
142
}
82
143
}
83
144
@@ -86,7 +147,7 @@ public static Builder builder(Workflow workflow) {
86
147
}
87
148
88
149
public WorkflowInstance execute (Object input ) {
89
- return new WorkflowInstance (taskFactory , JsonUtils .fromValue (input ));
150
+ return new WorkflowInstance (JsonUtils .fromValue (input ));
90
151
}
91
152
92
153
enum State {
@@ -101,11 +162,15 @@ public class WorkflowInstance {
101
162
private State state ;
102
163
private WorkflowContext context ;
103
164
104
- private WorkflowInstance (TaskExecutorFactory factory , JsonNode input ) {
165
+ private WorkflowInstance (JsonNode input ) {
105
166
this .output = input ;
106
- this . state = State . STARTED ;
167
+ inputSchemaValidator . ifPresent ( v -> v . validate ( input )) ;
107
168
this .context = WorkflowContext .builder (input ).build ();
169
+ inputFilter .ifPresent (f -> output = f .apply (context , Optional .empty (), output ));
170
+ this .state = State .STARTED ;
108
171
processDo (workflow .getDo ());
172
+ outputFilter .ifPresent (f -> output = f .apply (context , Optional .empty (), output ));
173
+ outputSchemaValidator .ifPresent (v -> v .validate (output ));
109
174
}
110
175
111
176
private void processDo (List <TaskItem > tasks ) {
@@ -118,7 +183,7 @@ private void processDo(List<TaskItem> tasks) {
118
183
taskExecutors
119
184
.computeIfAbsent (
120
185
context .position ().jsonPointer (),
121
- k -> taskFactory . getTaskExecutor (task .getTask ()))
186
+ k -> factories . getTaskFactory (). getTaskExecutor (task .getTask (), factories ))
122
187
.apply (context , output );
123
188
listeners .forEach (l -> l .onTaskEnded (context .position (), task .getTask ()));
124
189
context .position ().back ().back ();
0 commit comments