Skip to content

Commit 3f83e52

Browse files
committed
[Fix #1211] Support filters againts workflow data in listen task
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 1c63857 commit 3f83e52

11 files changed

Lines changed: 100 additions & 45 deletions

File tree

impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
import io.cloudevents.CloudEvent;
1919
import io.serverlessworkflow.api.types.EventFilter;
2020
import io.serverlessworkflow.api.types.EventProperties;
21+
import io.serverlessworkflow.impl.TaskContext;
2122
import io.serverlessworkflow.impl.WorkflowApplication;
23+
import io.serverlessworkflow.impl.WorkflowContext;
2224
import java.util.AbstractCollection;
2325
import java.util.Collection;
2426
import java.util.Iterator;
@@ -67,7 +69,7 @@ private static class CloudEventConsumer extends AbstractCollection<TypeEventRegi
6769
public void accept(CloudEvent ce) {
6870
logger.debug("Received cloud event {}", ce);
6971
for (TypeEventRegistration registration : registrations) {
70-
if (registration.predicate().test(ce)) {
72+
if (registration.predicate().test(ce, registration.workflow(), registration.task())) {
7173
registration.consumer().accept(ce);
7274
}
7375
}
@@ -94,14 +96,18 @@ public int size() {
9496
}
9597
}
9698

99+
@Override
97100
public TypeEventRegistration register(
98-
TypeEventRegistrationBuilder builder, Consumer<CloudEvent> ce) {
101+
TypeEventRegistrationBuilder builder,
102+
Consumer<CloudEvent> ce,
103+
WorkflowContext workflow,
104+
TaskContext task) {
99105
if (builder.type() == null) {
100106
registerToAll(ce);
101-
return new TypeEventRegistration(null, ce, null);
107+
return new TypeEventRegistration(null, ce, null, workflow, task);
102108
} else {
103109
TypeEventRegistration registration =
104-
new TypeEventRegistration(builder.type(), ce, builder.cePredicate());
110+
new TypeEventRegistration(builder.type(), ce, builder.cePredicate(), workflow, task);
105111
registrations
106112
.computeIfAbsent(
107113
registration.type(),

impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventAttrPredicate.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
*/
1616
package io.serverlessworkflow.impl.events;
1717

18+
import io.serverlessworkflow.impl.TaskContext;
19+
import io.serverlessworkflow.impl.WorkflowContext;
20+
1821
@FunctionalInterface
1922
public interface CloudEventAttrPredicate<T> {
20-
boolean test(T value);
23+
boolean test(T value, WorkflowContext workflow, TaskContext task);
2124
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventPredicate.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
package io.serverlessworkflow.impl.events;
1717

1818
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.TaskContext;
20+
import io.serverlessworkflow.impl.WorkflowContext;
1921

2022
public interface CloudEventPredicate {
21-
boolean test(CloudEvent event);
23+
boolean test(CloudEvent event, WorkflowContext workflow, TaskContext task);
2224
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import io.serverlessworkflow.api.types.EventSource;
2424
import io.serverlessworkflow.api.types.EventTime;
2525
import io.serverlessworkflow.api.types.UriTemplate;
26+
import io.serverlessworkflow.impl.TaskContext;
2627
import io.serverlessworkflow.impl.WorkflowApplication;
28+
import io.serverlessworkflow.impl.WorkflowContext;
2729
import io.serverlessworkflow.impl.WorkflowModelFactory;
2830
import io.serverlessworkflow.impl.WorkflowPredicate;
2931
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
@@ -45,7 +47,7 @@ public class DefaultCloudEventPredicate implements CloudEventPredicate {
4547
private final CloudEventAttrPredicate<Map<String, Object>> additionalFilter;
4648

4749
private static final <T> CloudEventAttrPredicate<T> isTrue() {
48-
return x -> true;
50+
return (x, w, t) -> true;
4951
}
5052

5153
public DefaultCloudEventPredicate(EventProperties properties, WorkflowApplication app) {
@@ -72,12 +74,12 @@ private CloudEventAttrPredicate<Map<String, Object>> additionalFilter(
7274

7375
private CloudEventAttrPredicate<CloudEventData> fromCloudEvent(
7476
WorkflowModelFactory workflowModelFactory, WorkflowPredicate filter) {
75-
return d -> filter.test(null, null, workflowModelFactory.from(d));
77+
return (d, w, t) -> filter.test(w, t, workflowModelFactory.from(d));
7678
}
7779

7880
private CloudEventAttrPredicate<Map<String, Object>> fromMap(
7981
WorkflowModelFactory workflowModelFactory, WorkflowPredicate filter) {
80-
return d -> filter.test(null, null, workflowModelFactory.from(d));
82+
return (d, w, t) -> filter.test(w, t, workflowModelFactory.from(d));
8183
}
8284

8385
private CloudEventAttrPredicate<CloudEventData> dataFilter(
@@ -98,9 +100,9 @@ private CloudEventAttrPredicate<OffsetDateTime> offsetTimeFilter(
98100
final WorkflowPredicate expr =
99101
app.expressionFactory()
100102
.buildPredicate(ExpressionDescriptor.from(time.getRuntimeExpression()));
101-
return s -> evalExpr(app.modelFactory(), expr, s);
103+
return (s, w, t) -> evalExpr(app.modelFactory(), expr, s, w, t);
102104
} else if (time.getLiteralTime() != null) {
103-
return s -> Objects.equals(s, CloudEventUtils.toOffset(time.getLiteralTime()));
105+
return (s, w, t) -> Objects.equals(s, CloudEventUtils.toOffset(time.getLiteralTime()));
104106
}
105107
}
106108
return isTrue();
@@ -113,7 +115,7 @@ private CloudEventAttrPredicate<URI> dataSchemaFilter(
113115
final WorkflowPredicate expr =
114116
app.expressionFactory()
115117
.buildPredicate(ExpressionDescriptor.from(dataSchema.getExpressionDataSchema()));
116-
return s -> evalExpr(app.modelFactory(), expr, toString(s));
118+
return (s, w, t) -> evalExpr(app.modelFactory(), expr, toString(s), w, t);
117119
} else if (dataSchema.getLiteralDataSchema() != null) {
118120
return templateFilter(dataSchema.getLiteralDataSchema());
119121
}
@@ -122,7 +124,7 @@ private CloudEventAttrPredicate<URI> dataSchemaFilter(
122124
}
123125

124126
private CloudEventAttrPredicate<String> stringFilter(String str) {
125-
return str == null ? isTrue() : x -> x.equals(str);
127+
return str == null ? isTrue() : (x, w, t) -> str.equals(x);
126128
}
127129

128130
private CloudEventAttrPredicate<URI> sourceFilter(EventSource source, WorkflowApplication app) {
@@ -131,7 +133,7 @@ private CloudEventAttrPredicate<URI> sourceFilter(EventSource source, WorkflowAp
131133
final WorkflowPredicate expr =
132134
app.expressionFactory()
133135
.buildPredicate(ExpressionDescriptor.from(source.getRuntimeExpression()));
134-
return s -> evalExpr(app.modelFactory(), expr, toString(s));
136+
return (s, w, t) -> evalExpr(app.modelFactory(), expr, toString(s), w, t);
135137
} else if (source.getUriTemplate() != null) {
136138
return templateFilter(source.getUriTemplate());
137139
}
@@ -141,7 +143,7 @@ private CloudEventAttrPredicate<URI> sourceFilter(EventSource source, WorkflowAp
141143

142144
private CloudEventAttrPredicate<URI> templateFilter(UriTemplate template) {
143145
if (template.getLiteralUri() != null) {
144-
return u -> Objects.equals(u, template.getLiteralUri());
146+
return (u, w, t) -> Objects.equals(u, template.getLiteralUri());
145147
}
146148
throw new UnsupportedOperationException("Template not supported here yet");
147149
}
@@ -151,25 +153,33 @@ private <T> String toString(T uri) {
151153
}
152154

153155
private boolean evalExpr(
154-
WorkflowModelFactory modelFactory, WorkflowPredicate expr, String value) {
155-
return expr.test(null, null, modelFactory.from(value));
156+
WorkflowModelFactory modelFactory,
157+
WorkflowPredicate expr,
158+
String value,
159+
WorkflowContext workflow,
160+
TaskContext task) {
161+
return expr.test(workflow, task, modelFactory.from(value));
156162
}
157163

158164
private boolean evalExpr(
159-
WorkflowModelFactory modelFactory, WorkflowPredicate expr, OffsetDateTime value) {
160-
return expr.test(null, null, modelFactory.from(value));
165+
WorkflowModelFactory modelFactory,
166+
WorkflowPredicate expr,
167+
OffsetDateTime value,
168+
WorkflowContext workflow,
169+
TaskContext task) {
170+
return expr.test(workflow, task, modelFactory.from(value));
161171
}
162172

163173
@Override
164-
public boolean test(CloudEvent event) {
165-
return idFilter.test(event.getId())
166-
&& sourceFilter.test(event.getSource())
167-
&& subjectFilter.test(event.getSubject())
168-
&& contentTypeFilter.test(event.getDataContentType())
169-
&& typeFilter.test(event.getType())
170-
&& dataSchemaFilter.test(event.getDataSchema())
171-
&& timeFilter.test(event.getTime())
172-
&& dataFilter.test(event.getData())
173-
&& additionalFilter.test(CloudEventUtils.extensions(event));
174+
public boolean test(CloudEvent event, WorkflowContext workflow, TaskContext task) {
175+
return idFilter.test(event.getId(), workflow, task)
176+
&& sourceFilter.test(event.getSource(), workflow, task)
177+
&& subjectFilter.test(event.getSubject(), workflow, task)
178+
&& contentTypeFilter.test(event.getDataContentType(), workflow, task)
179+
&& typeFilter.test(event.getType(), workflow, task)
180+
&& dataSchemaFilter.test(event.getDataSchema(), workflow, task)
181+
&& timeFilter.test(event.getTime(), workflow, task)
182+
&& dataFilter.test(event.getData(), workflow, task)
183+
&& additionalFilter.test(CloudEventUtils.extensions(event), workflow, task);
174184
}
175185
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
import io.cloudevents.CloudEvent;
1919
import io.serverlessworkflow.api.types.EventFilter;
2020
import io.serverlessworkflow.impl.ServicePriority;
21+
import io.serverlessworkflow.impl.TaskContext;
2122
import io.serverlessworkflow.impl.WorkflowApplication;
23+
import io.serverlessworkflow.impl.WorkflowContext;
2224
import java.util.Collection;
2325
import java.util.function.Consumer;
2426

@@ -29,7 +31,12 @@ public interface EventConsumer<T extends EventRegistration, V extends EventRegis
2931

3032
Collection<V> listenToAll(WorkflowApplication workflowApplication);
3133

32-
T register(V builder, Consumer<CloudEvent> consumer);
34+
default T register(V builder, Consumer<CloudEvent> consumer) {
35+
return register(builder, consumer, null, null);
36+
}
37+
38+
T register(
39+
V builder, Consumer<CloudEvent> consumer, WorkflowContext workflow, TaskContext context);
3340

3441
void unregister(T register);
3542
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistration.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,12 @@
1515
*/
1616
package io.serverlessworkflow.impl.events;
1717

18-
public interface EventRegistration {}
18+
import io.serverlessworkflow.impl.TaskContext;
19+
import io.serverlessworkflow.impl.WorkflowContext;
20+
21+
public interface EventRegistration {
22+
23+
WorkflowContext workflow();
24+
25+
TaskContext task();
26+
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationInfo.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package io.serverlessworkflow.impl.events;
1717

1818
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.TaskContext;
20+
import io.serverlessworkflow.impl.WorkflowContext;
1921
import java.util.ArrayList;
2022
import java.util.Collection;
2123
import java.util.concurrent.CompletableFuture;
@@ -27,11 +29,13 @@ public record EventRegistrationInfo(
2729
public static final <T> EventRegistrationInfo build(
2830
EventRegistrationBuilderCollection builderInfo,
2931
BiConsumer<CloudEvent, CompletableFuture<T>> consumer,
30-
EventConsumer eventConsumer) {
31-
Collection<EventRegistration> registrations = new ArrayList();
32+
EventConsumer eventConsumer,
33+
WorkflowContext workflow,
34+
TaskContext task) {
35+
Collection<EventRegistration> registrations = new ArrayList<>();
3236
CompletableFuture<T>[] futures =
3337
builderInfo.registrations().stream()
34-
.map(reg -> toCompletable(reg, registrations, consumer, eventConsumer))
38+
.map(reg -> toCompletable(reg, registrations, consumer, eventConsumer, workflow, task))
3539
.toArray(size -> new CompletableFuture[size]);
3640
return new EventRegistrationInfo(
3741
builderInfo.isAnd() ? CompletableFuture.allOf(futures) : CompletableFuture.anyOf(futures),
@@ -42,10 +46,13 @@ private static final <T> CompletableFuture<T> toCompletable(
4246
EventRegistrationBuilder regBuilder,
4347
Collection<EventRegistration> registrations,
4448
BiConsumer<CloudEvent, CompletableFuture<T>> ceConsumer,
45-
EventConsumer eventConsumer) {
49+
EventConsumer eventConsumer,
50+
WorkflowContext workflow,
51+
TaskContext task) {
4652
final CompletableFuture<T> future = new CompletableFuture<>();
4753
registrations.add(
48-
eventConsumer.register(regBuilder, ce -> ceConsumer.accept((CloudEvent) ce, future)));
54+
eventConsumer.register(
55+
regBuilder, ce -> ceConsumer.accept((CloudEvent) ce, future), workflow, task));
4956
return future;
5057
}
5158
}

impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistration.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,14 @@
1616
package io.serverlessworkflow.impl.events;
1717

1818
import io.cloudevents.CloudEvent;
19+
import io.serverlessworkflow.impl.TaskContext;
20+
import io.serverlessworkflow.impl.WorkflowContext;
1921
import java.util.function.Consumer;
2022

2123
public record TypeEventRegistration(
22-
String type, Consumer<CloudEvent> consumer, CloudEventPredicate predicate)
24+
String type,
25+
Consumer<CloudEvent> consumer,
26+
CloudEventPredicate predicate,
27+
WorkflowContext workflow,
28+
TaskContext task)
2329
implements EventRegistration {}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,14 @@ public OrListenExecutor(ListenExecutorBuilder builder) {
121121

122122
@Override
123123
protected <T> EventRegistrationInfo buildInfo(
124-
BiConsumer<CloudEvent, CompletableFuture<T>> consumer) {
125-
EventRegistrationInfo info = super.buildInfo(consumer);
124+
BiConsumer<CloudEvent, CompletableFuture<T>> consumer,
125+
WorkflowContext workflow,
126+
TaskContext task) {
127+
EventRegistrationInfo info = super.buildInfo(consumer, workflow, task);
126128
if (untilRegBuilders != null) {
127129
EventRegistrationInfo untilInfo =
128130
EventRegistrationInfo.build(
129-
untilRegBuilders, (ce, f) -> f.complete(null), eventConsumer);
131+
untilRegBuilders, (ce, f) -> f.complete(null), eventConsumer, workflow, task);
130132
untilInfo
131133
.completableFuture()
132134
.thenAccept(
@@ -171,7 +173,9 @@ protected CompletableFuture<WorkflowModel> internalExecute(
171173
buildInfo(
172174
(BiConsumer<CloudEvent, CompletableFuture<WorkflowModel>>)
173175
((ce, future) ->
174-
processCe(converter.apply(ce), output, workflow, taskContext, future)));
176+
processCe(converter.apply(ce), output, workflow, taskContext, future)),
177+
workflow,
178+
taskContext);
175179
return info.completableFuture()
176180
.thenApply(
177181
v -> {
@@ -181,9 +185,11 @@ protected CompletableFuture<WorkflowModel> internalExecute(
181185
}
182186

183187
protected <T> EventRegistrationInfo buildInfo(
184-
BiConsumer<CloudEvent, CompletableFuture<T>> consumer) {
188+
BiConsumer<CloudEvent, CompletableFuture<T>> consumer,
189+
WorkflowContext workflow,
190+
TaskContext task) {
185191
return EventRegistrationInfo.build(
186-
builderRegistrationInfo.registrations(), consumer, eventConsumer);
192+
builderRegistrationInfo.registrations(), consumer, eventConsumer, workflow, task);
187193
}
188194

189195
private void processCe(

impl/test/src/test/java/io/serverlessworkflow/impl/test/EventDefinitionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ void testEventListened(String listen, String emit, JsonNode expectedResult, Obje
5757
appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath(listen));
5858
WorkflowDefinition emitDefinition =
5959
appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath(emit));
60-
WorkflowInstance waitingInstance = listenDefinition.instance(Map.of());
60+
WorkflowInstance waitingInstance = listenDefinition.instance(Map.of("threshold", 38));
6161
CompletableFuture<WorkflowModel> future = waitingInstance.start();
6262
assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.WAITING);
6363
emitDefinition.instance(emitInput).start().join();

0 commit comments

Comments
 (0)