Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent;
import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;
Expand Down Expand Up @@ -54,6 +56,8 @@ public class WorkflowMutableInstance implements WorkflowInstance {
private Lock statusLock = new ReentrantLock();
private Map<CompletableFuture<TaskContext>, TaskContext> suspended;

private Collection<CompletableFuture<?>> cancelables = new ArrayList<>();

protected WorkflowMutableInstance(WorkflowDefinition definition, String id, WorkflowModel input) {
this.id = id;
this.input = input;
Expand Down Expand Up @@ -226,22 +230,28 @@ protected final void internalSuspend() {

@Override
public boolean resume() {
boolean result;
try {
statusLock.lock();
if (TaskExecutorHelper.isActive(status.get()) && suspended != null) {
publishEvent(
workflowContext, l -> l.onWorkflowResumed(new WorkflowResumedEvent(workflowContext)));

suspended.forEach(
(k, v) -> {
k.complete(v);
});
suspended = null;
return true;
result = true;
} else {
result = false;
}
} finally {
statusLock.unlock();
}
return false;
if (result) {
publishEvent(
workflowContext, l -> l.onWorkflowResumed(new WorkflowResumedEvent(workflowContext)));
}
return result;
}

public CompletableFuture<TaskContext> cancelCheck(TaskContext t) {
Expand Down Expand Up @@ -277,20 +287,49 @@ public CompletableFuture<TaskContext> suspendedCheck(TaskContext t) {

@Override
public boolean cancel() {
boolean result;
Collection<CompletableFuture<?>> toCancel = null;
try {
statusLock.lock();
if (TaskExecutorHelper.isActive(status.get())) {
toCancel = new ArrayList<>(cancelables);
cancelables.clear();
status(WorkflowStatus.CANCELLED);
publishEvent(
workflowContext,
l -> l.onWorkflowCancelled(new WorkflowCancelledEvent(workflowContext)));
return true;
result = true;
} else {
return false;
result = false;
}
} finally {
statusLock.unlock();
}
if (result) {
publishEvent(
workflowContext, l -> l.onWorkflowCancelled(new WorkflowCancelledEvent(workflowContext)));
if (toCancel != null) {
toCancel.forEach(t -> t.cancel(true));
}
}
return result;
}

public void addCancelable(CompletableFuture<?> cancelable) {
statusLock.lock();
if (status.get() == WorkflowStatus.CANCELLED) {
statusLock.unlock();
cancelable.cancel(true);
} else {
cancelables.add(cancelable);
statusLock.unlock();
cancelable.thenAccept(
__ -> {
try {
statusLock.lock();
cancelables.remove(cancelable);
} finally {
statusLock.unlock();
}
});
}
}

public <T> T additionalObject(String key, Supplier<T> supplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import io.cloudevents.CloudEvent;
import io.serverlessworkflow.api.types.EventFilter;
import io.serverlessworkflow.api.types.EventProperties;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowContext;
import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Iterator;
Expand Down Expand Up @@ -67,7 +69,7 @@ private static class CloudEventConsumer extends AbstractCollection<TypeEventRegi
public void accept(CloudEvent ce) {
logger.debug("Received cloud event {}", ce);
for (TypeEventRegistration registration : registrations) {
if (registration.predicate().test(ce)) {
if (registration.predicate().test(ce, registration.workflow(), registration.task())) {
registration.consumer().accept(ce);
}
}
Expand All @@ -94,14 +96,18 @@ public int size() {
}
}

@Override
public TypeEventRegistration register(
TypeEventRegistrationBuilder builder, Consumer<CloudEvent> ce) {
TypeEventRegistrationBuilder builder,
Consumer<CloudEvent> ce,
WorkflowContext workflow,
TaskContext task) {
if (builder.type() == null) {
registerToAll(ce);
return new TypeEventRegistration(null, ce, null);
return new TypeEventRegistration(null, ce, null, workflow, task);
} else {
TypeEventRegistration registration =
new TypeEventRegistration(builder.type(), ce, builder.cePredicate());
new TypeEventRegistration(builder.type(), ce, builder.cePredicate(), workflow, task);
registrations
.computeIfAbsent(
registration.type(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/
package io.serverlessworkflow.impl.events;

import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;

@FunctionalInterface
public interface CloudEventAttrPredicate<T> {
boolean test(T value);
boolean test(T value, WorkflowContext workflow, TaskContext task);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package io.serverlessworkflow.impl.events;

import io.cloudevents.CloudEvent;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;

public interface CloudEventPredicate {
boolean test(CloudEvent event);
boolean test(CloudEvent event, WorkflowContext workflow, TaskContext task);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import io.serverlessworkflow.api.types.EventSource;
import io.serverlessworkflow.api.types.EventTime;
import io.serverlessworkflow.api.types.UriTemplate;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowModelFactory;
import io.serverlessworkflow.impl.WorkflowPredicate;
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
Expand All @@ -45,7 +47,7 @@ public class DefaultCloudEventPredicate implements CloudEventPredicate {
private final CloudEventAttrPredicate<Map<String, Object>> additionalFilter;

private static final <T> CloudEventAttrPredicate<T> isTrue() {
return x -> true;
return (x, w, t) -> true;
}

public DefaultCloudEventPredicate(EventProperties properties, WorkflowApplication app) {
Expand All @@ -72,12 +74,12 @@ private CloudEventAttrPredicate<Map<String, Object>> additionalFilter(

private CloudEventAttrPredicate<CloudEventData> fromCloudEvent(
WorkflowModelFactory workflowModelFactory, WorkflowPredicate filter) {
return d -> filter.test(null, null, workflowModelFactory.from(d));
return (d, w, t) -> filter.test(w, t, workflowModelFactory.from(d));
}

private CloudEventAttrPredicate<Map<String, Object>> fromMap(
WorkflowModelFactory workflowModelFactory, WorkflowPredicate filter) {
return d -> filter.test(null, null, workflowModelFactory.from(d));
return (d, w, t) -> filter.test(w, t, workflowModelFactory.from(d));
}

private CloudEventAttrPredicate<CloudEventData> dataFilter(
Expand All @@ -98,9 +100,9 @@ private CloudEventAttrPredicate<OffsetDateTime> offsetTimeFilter(
final WorkflowPredicate expr =
app.expressionFactory()
.buildPredicate(ExpressionDescriptor.from(time.getRuntimeExpression()));
return s -> evalExpr(app.modelFactory(), expr, s);
return (s, w, t) -> evalExpr(app.modelFactory(), expr, s, w, t);
} else if (time.getLiteralTime() != null) {
return s -> Objects.equals(s, CloudEventUtils.toOffset(time.getLiteralTime()));
return (s, w, t) -> Objects.equals(s, CloudEventUtils.toOffset(time.getLiteralTime()));
}
}
return isTrue();
Expand All @@ -113,7 +115,7 @@ private CloudEventAttrPredicate<URI> dataSchemaFilter(
final WorkflowPredicate expr =
app.expressionFactory()
.buildPredicate(ExpressionDescriptor.from(dataSchema.getExpressionDataSchema()));
return s -> evalExpr(app.modelFactory(), expr, toString(s));
return (s, w, t) -> evalExpr(app.modelFactory(), expr, toString(s), w, t);
} else if (dataSchema.getLiteralDataSchema() != null) {
return templateFilter(dataSchema.getLiteralDataSchema());
}
Expand All @@ -122,7 +124,7 @@ private CloudEventAttrPredicate<URI> dataSchemaFilter(
}

private CloudEventAttrPredicate<String> stringFilter(String str) {
return str == null ? isTrue() : x -> x.equals(str);
return str == null ? isTrue() : (x, w, t) -> str.equals(x);
}

private CloudEventAttrPredicate<URI> sourceFilter(EventSource source, WorkflowApplication app) {
Expand All @@ -131,7 +133,7 @@ private CloudEventAttrPredicate<URI> sourceFilter(EventSource source, WorkflowAp
final WorkflowPredicate expr =
app.expressionFactory()
.buildPredicate(ExpressionDescriptor.from(source.getRuntimeExpression()));
return s -> evalExpr(app.modelFactory(), expr, toString(s));
return (s, w, t) -> evalExpr(app.modelFactory(), expr, toString(s), w, t);
} else if (source.getUriTemplate() != null) {
return templateFilter(source.getUriTemplate());
}
Expand All @@ -141,7 +143,7 @@ private CloudEventAttrPredicate<URI> sourceFilter(EventSource source, WorkflowAp

private CloudEventAttrPredicate<URI> templateFilter(UriTemplate template) {
if (template.getLiteralUri() != null) {
return u -> Objects.equals(u, template.getLiteralUri());
return (u, w, t) -> Objects.equals(u, template.getLiteralUri());
}
throw new UnsupportedOperationException("Template not supported here yet");
}
Expand All @@ -151,25 +153,33 @@ private <T> String toString(T uri) {
}

private boolean evalExpr(
WorkflowModelFactory modelFactory, WorkflowPredicate expr, String value) {
return expr.test(null, null, modelFactory.from(value));
WorkflowModelFactory modelFactory,
WorkflowPredicate expr,
String value,
WorkflowContext workflow,
TaskContext task) {
return expr.test(workflow, task, modelFactory.from(value));
}

private boolean evalExpr(
WorkflowModelFactory modelFactory, WorkflowPredicate expr, OffsetDateTime value) {
return expr.test(null, null, modelFactory.from(value));
WorkflowModelFactory modelFactory,
WorkflowPredicate expr,
OffsetDateTime value,
WorkflowContext workflow,
TaskContext task) {
return expr.test(workflow, task, modelFactory.from(value));
}

@Override
public boolean test(CloudEvent event) {
return idFilter.test(event.getId())
&& sourceFilter.test(event.getSource())
&& subjectFilter.test(event.getSubject())
&& contentTypeFilter.test(event.getDataContentType())
&& typeFilter.test(event.getType())
&& dataSchemaFilter.test(event.getDataSchema())
&& timeFilter.test(event.getTime())
&& dataFilter.test(event.getData())
&& additionalFilter.test(CloudEventUtils.extensions(event));
public boolean test(CloudEvent event, WorkflowContext workflow, TaskContext task) {
return idFilter.test(event.getId(), workflow, task)
&& sourceFilter.test(event.getSource(), workflow, task)
&& subjectFilter.test(event.getSubject(), workflow, task)
&& contentTypeFilter.test(event.getDataContentType(), workflow, task)
&& typeFilter.test(event.getType(), workflow, task)
&& dataSchemaFilter.test(event.getDataSchema(), workflow, task)
&& timeFilter.test(event.getTime(), workflow, task)
&& dataFilter.test(event.getData(), workflow, task)
&& additionalFilter.test(CloudEventUtils.extensions(event), workflow, task);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import io.cloudevents.CloudEvent;
import io.serverlessworkflow.api.types.EventFilter;
import io.serverlessworkflow.impl.ServicePriority;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowContext;
import java.util.Collection;
import java.util.function.Consumer;

Expand All @@ -29,7 +31,11 @@ public interface EventConsumer<T extends EventRegistration, V extends EventRegis

Collection<V> listenToAll(WorkflowApplication workflowApplication);

T register(V builder, Consumer<CloudEvent> consumer);
default T register(V builder, Consumer<CloudEvent> consumer) {
return register(builder, consumer, null, null);
}

T register(V builder, Consumer<CloudEvent> consumer, WorkflowContext workflow, TaskContext task);

void unregister(T register);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.serverlessworkflow.impl.events;

import io.cloudevents.CloudEvent;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
Expand All @@ -27,11 +29,13 @@ public record EventRegistrationInfo(
public static final <T> EventRegistrationInfo build(
EventRegistrationBuilderCollection builderInfo,
BiConsumer<CloudEvent, CompletableFuture<T>> consumer,
EventConsumer eventConsumer) {
Collection<EventRegistration> registrations = new ArrayList();
EventConsumer eventConsumer,
WorkflowContext workflow,
TaskContext task) {
Collection<EventRegistration> registrations = new ArrayList<>();
CompletableFuture<T>[] futures =
builderInfo.registrations().stream()
.map(reg -> toCompletable(reg, registrations, consumer, eventConsumer))
.map(reg -> toCompletable(reg, registrations, consumer, eventConsumer, workflow, task))
.toArray(size -> new CompletableFuture[size]);
return new EventRegistrationInfo(
builderInfo.isAnd() ? CompletableFuture.allOf(futures) : CompletableFuture.anyOf(futures),
Expand All @@ -42,10 +46,13 @@ private static final <T> CompletableFuture<T> toCompletable(
EventRegistrationBuilder regBuilder,
Collection<EventRegistration> registrations,
BiConsumer<CloudEvent, CompletableFuture<T>> ceConsumer,
EventConsumer eventConsumer) {
EventConsumer eventConsumer,
WorkflowContext workflow,
TaskContext task) {
final CompletableFuture<T> future = new CompletableFuture<>();
registrations.add(
eventConsumer.register(regBuilder, ce -> ceConsumer.accept((CloudEvent) ce, future)));
eventConsumer.register(
regBuilder, ce -> ceConsumer.accept((CloudEvent) ce, future), workflow, task));
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@
package io.serverlessworkflow.impl.events;

import io.cloudevents.CloudEvent;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import java.util.function.Consumer;

public record TypeEventRegistration(
String type, Consumer<CloudEvent> consumer, CloudEventPredicate predicate)
String type,
Consumer<CloudEvent> consumer,
CloudEventPredicate predicate,
WorkflowContext workflow,
TaskContext task)
implements EventRegistration {}
Loading