Skip to content

Commit d81fbea

Browse files
committed
[Fix #490] Listen & Wait Events unit test
1 parent 10e34fe commit d81fbea

17 files changed

+498
-217
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java

Lines changed: 51 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,42 +18,56 @@
1818
import io.cloudevents.CloudEvent;
1919
import io.serverlessworkflow.api.types.EventFilter;
2020
import io.serverlessworkflow.api.types.EventProperties;
21-
import io.serverlessworkflow.impl.TaskContext;
2221
import io.serverlessworkflow.impl.WorkflowApplication;
23-
import io.serverlessworkflow.impl.WorkflowContext;
2422
import java.util.AbstractCollection;
2523
import java.util.Collection;
2624
import java.util.Iterator;
25+
import java.util.List;
2726
import java.util.Map;
2827
import java.util.concurrent.ConcurrentHashMap;
2928
import java.util.concurrent.CopyOnWriteArrayList;
3029
import java.util.function.Consumer;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
3132

3233
public abstract class AbstractTypeConsumer
3334
implements EventConsumer<TypeEventRegistration, TypeEventRegistrationBuilder> {
3435

36+
private static final Logger logger = LoggerFactory.getLogger(AbstractTypeConsumer.class);
37+
38+
protected abstract void registerToAll(Consumer<CloudEvent> consumer);
39+
40+
protected abstract void unregisterFromAll();
41+
3542
protected abstract void register(String topicName, Consumer<CloudEvent> consumer);
3643

3744
protected abstract void unregister(String topicName);
3845

3946
private Map<String, CloudEventConsumer> registrations = new ConcurrentHashMap<>();
4047

4148
@Override
42-
public TypeEventRegistrationBuilder build(EventFilter register, WorkflowApplication application) {
49+
public TypeEventRegistrationBuilder listen(
50+
EventFilter register, WorkflowApplication application) {
4351
EventProperties properties = register.getWith();
4452
String type = properties.getType();
4553
return new TypeEventRegistrationBuilder(
4654
type, new DefaultCloudEventPredicate(properties, application.expressionFactory()));
4755
}
4856

57+
@Override
58+
public Collection<TypeEventRegistrationBuilder> listenToAll(WorkflowApplication application) {
59+
return List.of(new TypeEventRegistrationBuilder(null, null));
60+
}
61+
4962
private static class CloudEventConsumer extends AbstractCollection<TypeEventRegistration>
5063
implements Consumer<CloudEvent> {
5164
private Collection<TypeEventRegistration> registrations = new CopyOnWriteArrayList<>();
5265

5366
@Override
5467
public void accept(CloudEvent ce) {
68+
logger.debug("Received cloud event {}", ce);
5569
for (TypeEventRegistration registration : registrations) {
56-
if (registration.predicate().test(ce, registration.workflow(), registration.task())) {
70+
if (registration.predicate().test(ce)) {
5771
registration.consumer().accept(ce);
5872
}
5973
}
@@ -76,36 +90,42 @@ public int size() {
7690
}
7791

7892
public TypeEventRegistration register(
79-
TypeEventRegistrationBuilder builder,
80-
Consumer<CloudEvent> ce,
81-
WorkflowContext workflow,
82-
TaskContext taskContext) {
83-
TypeEventRegistration registration =
84-
new TypeEventRegistration(builder.type(), ce, builder.cePredicate(), workflow, taskContext);
85-
registrations
86-
.computeIfAbsent(
87-
registration.type(),
88-
k -> {
89-
CloudEventConsumer consumer = new CloudEventConsumer();
90-
register(k, consumer);
91-
return consumer;
92-
})
93-
.add(registration);
94-
return registration;
93+
TypeEventRegistrationBuilder builder, Consumer<CloudEvent> ce) {
94+
if (builder.type() == null) {
95+
registerToAll(ce);
96+
return new TypeEventRegistration(null, ce, null);
97+
} else {
98+
TypeEventRegistration registration =
99+
new TypeEventRegistration(builder.type(), ce, builder.cePredicate());
100+
registrations
101+
.computeIfAbsent(
102+
registration.type(),
103+
k -> {
104+
CloudEventConsumer consumer = new CloudEventConsumer();
105+
register(k, consumer);
106+
return consumer;
107+
})
108+
.add(registration);
109+
return registration;
110+
}
95111
}
96112

97113
@Override
98114
public void unregister(TypeEventRegistration registration) {
99-
registrations.computeIfPresent(
100-
registration.type(),
101-
(k, v) -> {
102-
v.remove(registration);
103-
if (v.isEmpty()) {
104-
unregister(registration.type());
105-
return null;
106-
} else {
107-
return v;
108-
}
109-
});
115+
if (registration.type() == null) {
116+
unregisterFromAll();
117+
} else {
118+
registrations.computeIfPresent(
119+
registration.type(),
120+
(k, v) -> {
121+
v.remove(registration);
122+
if (v.isEmpty()) {
123+
unregister(registration.type());
124+
return null;
125+
} else {
126+
return v;
127+
}
128+
});
129+
}
110130
}
111131
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.events;
17+
18+
@FunctionalInterface
19+
public interface CloudEventAttrPredicate<T> {
20+
boolean test(T value);
21+
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventPredicate.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
package io.serverlessworkflow.impl.events;
1717

1818
import io.cloudevents.CloudEvent;
19-
import io.serverlessworkflow.impl.TaskContext;
20-
import io.serverlessworkflow.impl.WorkflowContext;
2119

2220
public interface CloudEventPredicate {
23-
boolean test(CloudEvent event, WorkflowContext workflow, TaskContext task);
21+
boolean test(CloudEvent event);
2422
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventUtils.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import io.serverlessworkflow.impl.json.JsonUtils;
2525
import java.io.IOException;
2626
import java.io.UncheckedIOException;
27+
import java.time.OffsetDateTime;
28+
import java.time.ZoneOffset;
29+
import java.util.Date;
2730
import java.util.LinkedHashMap;
2831
import java.util.Map;
2932

@@ -56,6 +59,10 @@ public static JsonNode toJsonNode(CloudEvent event) {
5659
return result;
5760
}
5861

62+
public static OffsetDateTime toOffset(Date date) {
63+
return date.toInstant().atOffset(ZoneOffset.UTC);
64+
}
65+
5966
public static CloudEventBuilder addExtension(
6067
CloudEventBuilder builder, String name, JsonNode value) {
6168
if (value.isTextual()) {

impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java

Lines changed: 118 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,52 +17,138 @@
1717

1818
import com.fasterxml.jackson.databind.JsonNode;
1919
import io.cloudevents.CloudEvent;
20+
import io.serverlessworkflow.api.types.EventData;
21+
import io.serverlessworkflow.api.types.EventDataschema;
2022
import io.serverlessworkflow.api.types.EventProperties;
21-
import io.serverlessworkflow.impl.ExpressionHolder;
22-
import io.serverlessworkflow.impl.TaskContext;
23-
import io.serverlessworkflow.impl.WorkflowContext;
23+
import io.serverlessworkflow.api.types.EventSource;
24+
import io.serverlessworkflow.api.types.EventTime;
25+
import io.serverlessworkflow.api.types.UriTemplate;
2426
import io.serverlessworkflow.impl.WorkflowFilter;
27+
import io.serverlessworkflow.impl.WorkflowUtils;
28+
import io.serverlessworkflow.impl.expressions.Expression;
2529
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
2630
import io.serverlessworkflow.impl.json.JsonUtils;
27-
import java.util.Optional;
31+
import java.net.URI;
32+
import java.time.OffsetDateTime;
33+
import java.util.Map;
34+
import java.util.Objects;
2835

2936
public class DefaultCloudEventPredicate implements CloudEventPredicate {
3037

31-
private final EventPropertiesFilter props;
38+
private final CloudEventAttrPredicate<String> idFilter;
39+
private final CloudEventAttrPredicate<URI> sourceFilter;
40+
private final CloudEventAttrPredicate<String> subjectFilter;
41+
private final CloudEventAttrPredicate<String> contentTypeFilter;
42+
private final CloudEventAttrPredicate<String> typeFilter;
43+
private final CloudEventAttrPredicate<URI> dataSchemaFilter;
44+
private final CloudEventAttrPredicate<OffsetDateTime> timeFilter;
45+
private final CloudEventAttrPredicate<JsonNode> dataFilter;
46+
private final CloudEventAttrPredicate<JsonNode> additionalFilter;
47+
48+
private static final <T> CloudEventAttrPredicate<T> isTrue() {
49+
return x -> true;
50+
}
3251

3352
public DefaultCloudEventPredicate(EventProperties properties, ExpressionFactory exprFactory) {
34-
this.props = EventPropertiesFilter.build(properties, exprFactory);
53+
idFilter = stringFilter(properties.getId());
54+
subjectFilter = stringFilter(properties.getSubject());
55+
typeFilter = stringFilter(properties.getType());
56+
contentTypeFilter = stringFilter(properties.getDatacontenttype());
57+
sourceFilter = sourceFilter(properties.getSource(), exprFactory);
58+
dataSchemaFilter = dataSchemaFilter(properties.getDataschema(), exprFactory);
59+
timeFilter = offsetTimeFilter(properties.getTime(), exprFactory);
60+
dataFilter = dataFilter(properties.getData(), exprFactory);
61+
additionalFilter = additionalFilter(properties.getAdditionalProperties(), exprFactory);
3562
}
3663

37-
@Override
38-
public boolean test(CloudEvent event, WorkflowContext workflow, TaskContext task) {
39-
return test(props.idFilter(), event.getId(), workflow, task)
40-
&& test(props.sourceFilter(), event.getSource().toString(), workflow, task)
41-
&& test(props.subjectFilter(), event.getSubject(), workflow, task)
42-
&& test(props.contentTypeFilter(), event.getDataContentType(), workflow, task)
43-
&& test(props.typeFilter(), event.getType(), workflow, task)
44-
&& test(props.dataSchemaFilter(), event.getDataSchema().toString(), workflow, task)
45-
&& test(props.timeFilter(), event.getTime(), workflow, task)
46-
&& test(props.dataFilter(), CloudEventUtils.toJsonNode(event.getData()), workflow, task)
47-
&& test(
48-
props.additionalFilter(),
49-
JsonUtils.fromValue(CloudEventUtils.extensions(event)),
50-
workflow,
51-
task);
64+
private CloudEventAttrPredicate<JsonNode> additionalFilter(
65+
Map<String, Object> additionalProperties, ExpressionFactory exprFactory) {
66+
return additionalProperties != null && !additionalProperties.isEmpty()
67+
? from(WorkflowUtils.buildWorkflowFilter(exprFactory, null, additionalProperties))
68+
: isTrue();
69+
}
70+
71+
private CloudEventAttrPredicate<JsonNode> from(WorkflowFilter filter) {
72+
return d -> filter.apply(null, null, d).asBoolean();
73+
}
74+
75+
private CloudEventAttrPredicate<JsonNode> dataFilter(
76+
EventData data, ExpressionFactory exprFactory) {
77+
return data != null
78+
? from(
79+
WorkflowUtils.buildWorkflowFilter(
80+
exprFactory, data.getRuntimeExpression(), data.getObject()))
81+
: isTrue();
82+
}
83+
84+
private CloudEventAttrPredicate<OffsetDateTime> offsetTimeFilter(
85+
EventTime time, ExpressionFactory exprFactory) {
86+
if (time != null) {
87+
if (time.getRuntimeExpression() != null) {
88+
final Expression expr = exprFactory.getExpression(time.getRuntimeExpression());
89+
return s -> evalExpr(expr, toString(s));
90+
} else if (time.getLiteralTime() != null) {
91+
return s -> Objects.equals(s, CloudEventUtils.toOffset(time.getLiteralTime()));
92+
}
93+
}
94+
return isTrue();
95+
}
96+
97+
private CloudEventAttrPredicate<URI> dataSchemaFilter(
98+
EventDataschema dataSchema, ExpressionFactory exprFactory) {
99+
if (dataSchema != null) {
100+
if (dataSchema.getExpressionDataSchema() != null) {
101+
final Expression expr = exprFactory.getExpression(dataSchema.getExpressionDataSchema());
102+
return s -> evalExpr(expr, toString(s));
103+
} else if (dataSchema.getLiteralDataSchema() != null) {
104+
return templateFilter(dataSchema.getLiteralDataSchema());
105+
}
106+
}
107+
return isTrue();
108+
}
109+
110+
private CloudEventAttrPredicate<String> stringFilter(String str) {
111+
return str == null ? isTrue() : x -> x.equals(str);
112+
}
113+
114+
private CloudEventAttrPredicate<URI> sourceFilter(
115+
EventSource source, ExpressionFactory exprFactory) {
116+
if (source != null) {
117+
if (source.getRuntimeExpression() != null) {
118+
final Expression expr = exprFactory.getExpression(source.getRuntimeExpression());
119+
return s -> evalExpr(expr, toString(s));
120+
} else if (source.getUriTemplate() != null) {
121+
return templateFilter(source.getUriTemplate());
122+
}
123+
}
124+
return isTrue();
52125
}
53126

54-
private <T, V extends ExpressionHolder<T>> boolean test(
55-
Optional<V> optFilter, T value, WorkflowContext workflow, TaskContext task) {
56-
return optFilter.map(filter -> filter.apply(workflow, task).equals(value)).orElse(true);
127+
private CloudEventAttrPredicate<URI> templateFilter(UriTemplate template) {
128+
if (template.getLiteralUri() != null) {
129+
return u -> Objects.equals(u, template.getLiteralUri());
130+
}
131+
throw new UnsupportedOperationException("Template not supporte here yet");
57132
}
58133

59-
private boolean test(
60-
Optional<WorkflowFilter> optFilter,
61-
JsonNode value,
62-
WorkflowContext workflow,
63-
TaskContext task) {
64-
return optFilter
65-
.map(filter -> filter.apply(workflow, task, task.input()).equals(value))
66-
.orElse(true);
134+
private <T> String toString(T uri) {
135+
return uri != null ? uri.toString() : null;
136+
}
137+
138+
private <T> boolean evalExpr(Expression expr, T value) {
139+
return expr.eval(null, null, JsonUtils.fromValue(value)).asBoolean();
140+
}
141+
142+
@Override
143+
public boolean test(CloudEvent event) {
144+
return idFilter.test(event.getId())
145+
&& sourceFilter.test(event.getSource())
146+
&& subjectFilter.test(event.getSubject())
147+
&& contentTypeFilter.test(event.getDataContentType())
148+
&& typeFilter.test(event.getType())
149+
&& dataSchemaFilter.test(event.getDataSchema())
150+
&& timeFilter.test(event.getTime())
151+
&& dataFilter.test(CloudEventUtils.toJsonNode(event.getData()))
152+
&& additionalFilter.test(JsonUtils.fromValue(CloudEventUtils.extensions(event)));
67153
}
68154
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,17 @@
1717

1818
import io.cloudevents.CloudEvent;
1919
import io.serverlessworkflow.api.types.EventFilter;
20-
import io.serverlessworkflow.impl.TaskContext;
2120
import io.serverlessworkflow.impl.WorkflowApplication;
22-
import io.serverlessworkflow.impl.WorkflowContext;
21+
import java.util.Collection;
2322
import java.util.function.Consumer;
2423

2524
public interface EventConsumer<T extends EventRegistration, V extends EventRegistrationBuilder> {
2625

27-
V build(EventFilter filter, WorkflowApplication workflowApplication);
26+
V listen(EventFilter filter, WorkflowApplication workflowApplication);
2827

29-
T register(V builder, Consumer<CloudEvent> consumer, WorkflowContext context, TaskContext task);
28+
Collection<V> listenToAll(WorkflowApplication workflowApplication);
29+
30+
T register(V builder, Consumer<CloudEvent> consumer);
3031

3132
void unregister(T register);
3233
}

0 commit comments

Comments
 (0)