diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java index b02d07155..cdea2d56a 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java @@ -26,6 +26,8 @@ import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent; import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.Optional; import java.util.concurrent.CancellationException; @@ -54,6 +56,8 @@ public class WorkflowMutableInstance implements WorkflowInstance { private Lock statusLock = new ReentrantLock(); private Map, TaskContext> suspended; + private Collection> cancelables = new ArrayList<>(); + protected WorkflowMutableInstance(WorkflowDefinition definition, String id, WorkflowModel input) { this.id = id; this.input = input; @@ -226,22 +230,28 @@ protected final void internalSuspend() { @Override public boolean resume() { + boolean result; try { statusLock.lock(); if (TaskExecutorHelper.isActive(status.get()) && suspended != null) { - publishEvent( - workflowContext, l -> l.onWorkflowResumed(new WorkflowResumedEvent(workflowContext))); + suspended.forEach( (k, v) -> { k.complete(v); }); suspended = null; - return true; + result = true; + } else { + result = false; } } finally { statusLock.unlock(); } - return false; + if (result) { + publishEvent( + workflowContext, l -> l.onWorkflowResumed(new WorkflowResumedEvent(workflowContext))); + } + return result; } public CompletableFuture cancelCheck(TaskContext t) { @@ -277,20 +287,49 @@ public CompletableFuture suspendedCheck(TaskContext t) { @Override public boolean cancel() { + boolean result; + Collection> toCancel = null; try { statusLock.lock(); if (TaskExecutorHelper.isActive(status.get())) { + toCancel = new ArrayList<>(cancelables); + cancelables.clear(); status(WorkflowStatus.CANCELLED); - publishEvent( - workflowContext, - l -> l.onWorkflowCancelled(new WorkflowCancelledEvent(workflowContext))); - return true; + result = true; } else { - return false; + result = false; } } finally { statusLock.unlock(); } + if (result) { + publishEvent( + workflowContext, l -> l.onWorkflowCancelled(new WorkflowCancelledEvent(workflowContext))); + if (toCancel != null) { + toCancel.forEach(t -> t.cancel(true)); + } + } + return result; + } + + public void addCancelable(CompletableFuture cancelable) { + statusLock.lock(); + if (status.get() == WorkflowStatus.CANCELLED) { + statusLock.unlock(); + cancelable.cancel(true); + } else { + cancelables.add(cancelable); + statusLock.unlock(); + cancelable.thenAccept( + __ -> { + try { + statusLock.lock(); + cancelables.remove(cancelable); + } finally { + statusLock.unlock(); + } + }); + } } public T additionalObject(String key, Supplier supplier) { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java index d4a613d29..f589055df 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java @@ -18,7 +18,9 @@ import io.cloudevents.CloudEvent; import io.serverlessworkflow.api.types.EventFilter; import io.serverlessworkflow.api.types.EventProperties; +import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContext; import java.util.AbstractCollection; import java.util.Collection; import java.util.Iterator; @@ -67,7 +69,7 @@ private static class CloudEventConsumer extends AbstractCollection ce) { + TypeEventRegistrationBuilder builder, + Consumer ce, + WorkflowContext workflow, + TaskContext task) { if (builder.type() == null) { registerToAll(ce); - return new TypeEventRegistration(null, ce, null); + return new TypeEventRegistration(null, ce, null, workflow, task); } else { TypeEventRegistration registration = - new TypeEventRegistration(builder.type(), ce, builder.cePredicate()); + new TypeEventRegistration(builder.type(), ce, builder.cePredicate(), workflow, task); registrations .computeIfAbsent( registration.type(), diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventAttrPredicate.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventAttrPredicate.java index 6029d484b..f5c03397a 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventAttrPredicate.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventAttrPredicate.java @@ -15,7 +15,10 @@ */ package io.serverlessworkflow.impl.events; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; + @FunctionalInterface public interface CloudEventAttrPredicate { - boolean test(T value); + boolean test(T value, WorkflowContext workflow, TaskContext task); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventPredicate.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventPredicate.java index a790e3718..4f7b15f09 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventPredicate.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventPredicate.java @@ -16,7 +16,9 @@ package io.serverlessworkflow.impl.events; import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; public interface CloudEventPredicate { - boolean test(CloudEvent event); + boolean test(CloudEvent event, WorkflowContext workflow, TaskContext task); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java index a67cf36f5..c58f7de05 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/DefaultCloudEventPredicate.java @@ -23,7 +23,9 @@ import io.serverlessworkflow.api.types.EventSource; import io.serverlessworkflow.api.types.EventTime; import io.serverlessworkflow.api.types.UriTemplate; +import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowModelFactory; import io.serverlessworkflow.impl.WorkflowPredicate; import io.serverlessworkflow.impl.expressions.ExpressionDescriptor; @@ -45,7 +47,7 @@ public class DefaultCloudEventPredicate implements CloudEventPredicate { private final CloudEventAttrPredicate> additionalFilter; private static final CloudEventAttrPredicate isTrue() { - return x -> true; + return (x, w, t) -> true; } public DefaultCloudEventPredicate(EventProperties properties, WorkflowApplication app) { @@ -72,12 +74,12 @@ private CloudEventAttrPredicate> additionalFilter( private CloudEventAttrPredicate fromCloudEvent( WorkflowModelFactory workflowModelFactory, WorkflowPredicate filter) { - return d -> filter.test(null, null, workflowModelFactory.from(d)); + return (d, w, t) -> filter.test(w, t, workflowModelFactory.from(d)); } private CloudEventAttrPredicate> fromMap( WorkflowModelFactory workflowModelFactory, WorkflowPredicate filter) { - return d -> filter.test(null, null, workflowModelFactory.from(d)); + return (d, w, t) -> filter.test(w, t, workflowModelFactory.from(d)); } private CloudEventAttrPredicate dataFilter( @@ -98,9 +100,9 @@ private CloudEventAttrPredicate offsetTimeFilter( final WorkflowPredicate expr = app.expressionFactory() .buildPredicate(ExpressionDescriptor.from(time.getRuntimeExpression())); - return s -> evalExpr(app.modelFactory(), expr, s); + return (s, w, t) -> evalExpr(app.modelFactory(), expr, s, w, t); } else if (time.getLiteralTime() != null) { - return s -> Objects.equals(s, CloudEventUtils.toOffset(time.getLiteralTime())); + return (s, w, t) -> Objects.equals(s, CloudEventUtils.toOffset(time.getLiteralTime())); } } return isTrue(); @@ -113,7 +115,7 @@ private CloudEventAttrPredicate dataSchemaFilter( final WorkflowPredicate expr = app.expressionFactory() .buildPredicate(ExpressionDescriptor.from(dataSchema.getExpressionDataSchema())); - return s -> evalExpr(app.modelFactory(), expr, toString(s)); + return (s, w, t) -> evalExpr(app.modelFactory(), expr, toString(s), w, t); } else if (dataSchema.getLiteralDataSchema() != null) { return templateFilter(dataSchema.getLiteralDataSchema()); } @@ -122,7 +124,7 @@ private CloudEventAttrPredicate dataSchemaFilter( } private CloudEventAttrPredicate stringFilter(String str) { - return str == null ? isTrue() : x -> x.equals(str); + return str == null ? isTrue() : (x, w, t) -> str.equals(x); } private CloudEventAttrPredicate sourceFilter(EventSource source, WorkflowApplication app) { @@ -131,7 +133,7 @@ private CloudEventAttrPredicate sourceFilter(EventSource source, WorkflowAp final WorkflowPredicate expr = app.expressionFactory() .buildPredicate(ExpressionDescriptor.from(source.getRuntimeExpression())); - return s -> evalExpr(app.modelFactory(), expr, toString(s)); + return (s, w, t) -> evalExpr(app.modelFactory(), expr, toString(s), w, t); } else if (source.getUriTemplate() != null) { return templateFilter(source.getUriTemplate()); } @@ -141,7 +143,7 @@ private CloudEventAttrPredicate sourceFilter(EventSource source, WorkflowAp private CloudEventAttrPredicate templateFilter(UriTemplate template) { if (template.getLiteralUri() != null) { - return u -> Objects.equals(u, template.getLiteralUri()); + return (u, w, t) -> Objects.equals(u, template.getLiteralUri()); } throw new UnsupportedOperationException("Template not supported here yet"); } @@ -151,25 +153,33 @@ private String toString(T uri) { } private boolean evalExpr( - WorkflowModelFactory modelFactory, WorkflowPredicate expr, String value) { - return expr.test(null, null, modelFactory.from(value)); + WorkflowModelFactory modelFactory, + WorkflowPredicate expr, + String value, + WorkflowContext workflow, + TaskContext task) { + return expr.test(workflow, task, modelFactory.from(value)); } private boolean evalExpr( - WorkflowModelFactory modelFactory, WorkflowPredicate expr, OffsetDateTime value) { - return expr.test(null, null, modelFactory.from(value)); + WorkflowModelFactory modelFactory, + WorkflowPredicate expr, + OffsetDateTime value, + WorkflowContext workflow, + TaskContext task) { + return expr.test(workflow, task, modelFactory.from(value)); } @Override - public boolean test(CloudEvent event) { - return idFilter.test(event.getId()) - && sourceFilter.test(event.getSource()) - && subjectFilter.test(event.getSubject()) - && contentTypeFilter.test(event.getDataContentType()) - && typeFilter.test(event.getType()) - && dataSchemaFilter.test(event.getDataSchema()) - && timeFilter.test(event.getTime()) - && dataFilter.test(event.getData()) - && additionalFilter.test(CloudEventUtils.extensions(event)); + public boolean test(CloudEvent event, WorkflowContext workflow, TaskContext task) { + return idFilter.test(event.getId(), workflow, task) + && sourceFilter.test(event.getSource(), workflow, task) + && subjectFilter.test(event.getSubject(), workflow, task) + && contentTypeFilter.test(event.getDataContentType(), workflow, task) + && typeFilter.test(event.getType(), workflow, task) + && dataSchemaFilter.test(event.getDataSchema(), workflow, task) + && timeFilter.test(event.getTime(), workflow, task) + && dataFilter.test(event.getData(), workflow, task) + && additionalFilter.test(CloudEventUtils.extensions(event), workflow, task); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java index 68ac9a009..3461bae85 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventConsumer.java @@ -18,7 +18,9 @@ import io.cloudevents.CloudEvent; import io.serverlessworkflow.api.types.EventFilter; import io.serverlessworkflow.impl.ServicePriority; +import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContext; import java.util.Collection; import java.util.function.Consumer; @@ -29,7 +31,11 @@ public interface EventConsumer listenToAll(WorkflowApplication workflowApplication); - T register(V builder, Consumer consumer); + default T register(V builder, Consumer consumer) { + return register(builder, consumer, null, null); + } + + T register(V builder, Consumer consumer, WorkflowContext workflow, TaskContext task); void unregister(T register); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationInfo.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationInfo.java index 665fb924d..3d4e75ca8 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationInfo.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationInfo.java @@ -16,6 +16,8 @@ package io.serverlessworkflow.impl.events; import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -27,11 +29,13 @@ public record EventRegistrationInfo( public static final EventRegistrationInfo build( EventRegistrationBuilderCollection builderInfo, BiConsumer> consumer, - EventConsumer eventConsumer) { - Collection registrations = new ArrayList(); + EventConsumer eventConsumer, + WorkflowContext workflow, + TaskContext task) { + Collection registrations = new ArrayList<>(); CompletableFuture[] futures = builderInfo.registrations().stream() - .map(reg -> toCompletable(reg, registrations, consumer, eventConsumer)) + .map(reg -> toCompletable(reg, registrations, consumer, eventConsumer, workflow, task)) .toArray(size -> new CompletableFuture[size]); return new EventRegistrationInfo( builderInfo.isAnd() ? CompletableFuture.allOf(futures) : CompletableFuture.anyOf(futures), @@ -42,10 +46,13 @@ private static final CompletableFuture toCompletable( EventRegistrationBuilder regBuilder, Collection registrations, BiConsumer> ceConsumer, - EventConsumer eventConsumer) { + EventConsumer eventConsumer, + WorkflowContext workflow, + TaskContext task) { final CompletableFuture future = new CompletableFuture<>(); registrations.add( - eventConsumer.register(regBuilder, ce -> ceConsumer.accept((CloudEvent) ce, future))); + eventConsumer.register( + regBuilder, ce -> ceConsumer.accept((CloudEvent) ce, future), workflow, task)); return future; } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistration.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistration.java index c5828e721..288fbbe7c 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistration.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistration.java @@ -16,8 +16,14 @@ package io.serverlessworkflow.impl.events; import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; import java.util.function.Consumer; public record TypeEventRegistration( - String type, Consumer consumer, CloudEventPredicate predicate) + String type, + Consumer consumer, + CloudEventPredicate predicate, + WorkflowContext workflow, + TaskContext task) implements EventRegistration {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java index 8d242f317..41a36b906 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java @@ -121,18 +121,24 @@ public OrListenExecutor(ListenExecutorBuilder builder) { @Override protected EventRegistrationInfo buildInfo( - BiConsumer> consumer) { - EventRegistrationInfo info = super.buildInfo(consumer); + BiConsumer> consumer, + WorkflowContext workflow, + TaskContext task) { + EventRegistrationInfo info = super.buildInfo(consumer, workflow, task); if (untilRegBuilders != null) { EventRegistrationInfo untilInfo = EventRegistrationInfo.build( - untilRegBuilders, (ce, f) -> f.complete(null), eventConsumer); + untilRegBuilders, (ce, f) -> f.complete(null), eventConsumer, workflow, task); untilInfo .completableFuture() - .thenAccept( - v -> { - info.completableFuture().complete(null); - untilInfo.registrations().forEach(reg -> eventConsumer.unregister(reg)); + .whenComplete( + (__, e) -> { + untilInfo.registrations().forEach(eventConsumer::unregister); + if (e == null) { + info.completableFuture().complete(null); + } else { + info.completableFuture().completeExceptionally(e); + } }); } return info; @@ -171,19 +177,21 @@ protected CompletableFuture internalExecute( buildInfo( (BiConsumer>) ((ce, future) -> - processCe(converter.apply(ce), output, workflow, taskContext, future))); + processCe(converter.apply(ce), output, workflow, taskContext, future)), + workflow, + taskContext); + workflow.instance().addCancelable(info.completableFuture()); return info.completableFuture() - .thenApply( - v -> { - info.registrations().forEach(eventConsumer::unregister); - return output; - }); + .whenComplete((__, e) -> info.registrations().forEach(eventConsumer::unregister)) + .thenApply(__ -> output); } protected EventRegistrationInfo buildInfo( - BiConsumer> consumer) { + BiConsumer> consumer, + WorkflowContext workflow, + TaskContext task) { return EventRegistrationInfo.build( - builderRegistrationInfo.registrations(), consumer, eventConsumer); + builderRegistrationInfo.registrations(), consumer, eventConsumer, workflow, task); } private void processCe( diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/EventDefinitionTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/EventDefinitionTest.java index 9be637373..70f5103b2 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/EventDefinitionTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/EventDefinitionTest.java @@ -51,13 +51,14 @@ static void init() { @ParameterizedTest @MethodSource("eventListenerParameters") - void testEventListened(String listen, String emit, JsonNode expectedResult, Object emitInput) + void testEventListened( + String listen, String emit, JsonNode expectedResult, Object emitInput, Object listenInput) throws IOException { WorkflowDefinition listenDefinition = appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath(listen)); WorkflowDefinition emitDefinition = appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath(emit)); - WorkflowInstance waitingInstance = listenDefinition.instance(Map.of()); + WorkflowInstance waitingInstance = listenDefinition.instance(listenInput); CompletableFuture future = waitingInstance.start(); assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.WAITING); emitDefinition.instance(emitInput).start().join(); @@ -66,6 +67,24 @@ void testEventListened(String listen, String emit, JsonNode expectedResult, Obje assertThat(waitingInstance.outputAs(JsonNode.class)).isEqualTo(expectedResult); } + @ParameterizedTest + @MethodSource("wrongEventListenerParameters") + void testWrongEvent(String listen, String emit, Object emitInput, Object listenInput) + throws IOException { + WorkflowDefinition listenDefinition = + appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath(listen)); + WorkflowDefinition emitDefinition = + appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath(emit)); + WorkflowInstance waitingInstance = listenDefinition.instance(listenInput); + CompletableFuture future = waitingInstance.start(); + assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.WAITING); + emitDefinition.instance(emitInput).start().join(); + assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.WAITING); + waitingInstance.cancel(); + assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.CANCELLED); + assertThat(future).isDone(); + } + @ParameterizedTest @MethodSource("eventsListenerParameters") void testEventsListened(String listen, String emit1, String emit2, JsonNode expectedResult) @@ -115,18 +134,29 @@ private static Instant getInstant(ArrayNode result, int index) { return Instant.ofEpochSecond(result.get(index).get("time").asLong()); } + private static Stream wrongEventListenerParameters() { + return Stream.of( + Arguments.of( + "workflows-samples/listen-to-any-filter.yaml", + "workflows-samples/emit-doctor.yaml", + Map.of("temperature", 38), + Map.of("threshold", 39))); + } + private static Stream eventListenerParameters() { return Stream.of( Arguments.of( "workflows-samples/listen-to-any.yaml", "workflows-samples/emit.yaml", array(cruellaDeVil()), + Map.of(), Map.of()), Arguments.of( "workflows-samples/listen-to-any-filter.yaml", "workflows-samples/emit-doctor.yaml", doctor(), - Map.of("temperature", 39))); + Map.of("temperature", 39), + Map.of("threshold", 38))); } private static Stream eventsListenerParameters() { diff --git a/impl/test/src/test/resources/workflows-samples/listen-to-any-filter.yaml b/impl/test/src/test/resources/workflows-samples/listen-to-any-filter.yaml index 491858706..e7266062e 100644 --- a/impl/test/src/test/resources/workflows-samples/listen-to-any-filter.yaml +++ b/impl/test/src/test/resources/workflows-samples/listen-to-any-filter.yaml @@ -10,7 +10,7 @@ do: any: - with: type: com.fake-hospital.vitals.measurements.temperature - data: ${ .temperature > 38 } + data: ${ .temperature > $input.threshold } - with: type: com.fake-hospital.vitals.measurements.bpm data: ${ .bpm < 60 or .bpm > 100 }