[Fix #1211] Support filters againts workflow data in listen task#1221
[Fix #1211] Support filters againts workflow data in listen task#1221fjtirado merged 2 commits intoserverlessworkflow:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR fixes #1211 by enabling listen task event filters to evaluate expressions against workflow/task data (e.g., $input.threshold) in addition to the incoming event payload.
Changes:
- Thread
WorkflowContextandTaskContextthrough event registration and predicate evaluation so filter expressions can access workflow data. - Update
ListenExecutorto pass the workflow/task context into event registrations (includinguntilregistrations). - Update the sample workflow and tests to exercise
$inputaccess in an event filter.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| impl/test/src/test/resources/workflows-samples/listen-to-any-filter.yaml | Updates sample event filter to use $input.threshold. |
| impl/test/src/test/java/io/serverlessworkflow/impl/test/EventDefinitionTest.java | Provides workflow input (threshold) to support the updated filter. |
| impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java | Passes workflow/task context into event registration building. |
| impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistration.java | Stores workflow/task context on registrations for later predicate evaluation. |
| impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationInfo.java | Propagates workflow/task context into EventConsumer.register(...). |
| impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistration.java | Adds workflow() / task() accessors to registrations. |
| impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java | Extends register API to accept workflow/task context (keeps a default overload). |
| impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java | Evaluates event-property predicates using the provided workflow/task context. |
| impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventPredicate.java | Updates predicate signature to accept workflow/task context. |
| impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventAttrPredicate.java | Updates attribute predicate signature to accept workflow/task context. |
| impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java | Passes registration workflow/task context into predicate evaluation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (2)
impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java:184
internalExecuteunregisters event registrations only on successful completion viathenApply. Ifinfo.completableFuture()completes exceptionally (e.g., consumer throws, predicate evaluation fails), registrations may remain active and retainWorkflowContext/TaskContextreferences. PreferwhenComplete/handleto always unregister in both success and failure paths.
EventRegistrationInfo info =
buildInfo(
(BiConsumer<CloudEvent, CompletableFuture<WorkflowModel>>)
((ce, future) ->
processCe(converter.apply(ce), output, workflow, taskContext, future)),
workflow,
taskContext);
return info.completableFuture()
.thenApply(
v -> {
info.registrations().forEach(eventConsumer::unregister);
return output;
});
impl/test/src/test/java/io/serverlessworkflow/impl/test/EventDefinitionTest.java:66
- The updated filter sample now depends on
$input.threshold, but the parameterized test always starts all listen workflows withthreshold=38(includinglisten-to-any.yaml, which doesn’t exercise this feature). Consider passing the listen input as a separate parameter per test case and adding a negative assertion (e.g., threshold higher than emitted temperature keeps the instanceWAITING) to verify the$input-based filter actually affects matching.
void testEventListened(String listen, String emit, JsonNode expectedResult, Object emitInput)
throws IOException {
WorkflowDefinition listenDefinition =
appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath(listen));
WorkflowDefinition emitDefinition =
appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath(emit));
WorkflowInstance waitingInstance = listenDefinition.instance(Map.of("threshold", 38));
CompletableFuture<WorkflowModel> future = waitingInstance.start();
assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.WAITING);
emitDefinition.instance(emitInput).start().join();
assertThat(future).isCompleted();
assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.COMPLETED);
assertThat(waitingInstance.outputAs(JsonNode.class)).isEqualTo(expectedResult);
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistration.java
Outdated
Show resolved
Hide resolved
impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventPredicate.java
Show resolved
Hide resolved
impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventAttrPredicate.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java
Show resolved
Hide resolved
impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java
Outdated
Show resolved
Hide resolved
impl/test/src/test/java/io/serverlessworkflow/impl/test/EventDefinitionTest.java
Outdated
Show resolved
Hide resolved
687b263 to
97dc8fe
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java
Show resolved
Hide resolved
impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java
Outdated
Show resolved
Hide resolved
…n listen task Signed-off-by: fjtirado <ftirados@redhat.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java
Outdated
Show resolved
Hide resolved
impl/test/src/test/java/io/serverlessworkflow/impl/test/EventDefinitionTest.java
Outdated
Show resolved
Hide resolved
impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java
Show resolved
Hide resolved
717fe1d to
d31d90e
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java
Show resolved
Hide resolved
impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java
Outdated
Show resolved
Hide resolved
impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java
Show resolved
Hide resolved
impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java
Show resolved
Hide resolved
Signed-off-by: fjtirado <ftirados@redhat.com>
Fix #1211