diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventPropertiesBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitEventPropertiesBuilder.java similarity index 52% rename from experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventPropertiesBuilder.java rename to experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitEventPropertiesBuilder.java index 27c691ce5..5c46bfc75 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventPropertiesBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitEventPropertiesBuilder.java @@ -16,24 +16,40 @@ package io.serverlessworkflow.fluent.func; import io.cloudevents.CloudEventData; +import io.serverlessworkflow.api.types.func.ContextFunction; import io.serverlessworkflow.api.types.func.EventDataFunction; +import io.serverlessworkflow.api.types.func.FilterFunction; +import io.serverlessworkflow.fluent.func.dsl.SerializableFunction; import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; import java.util.function.Function; -public class FuncEventPropertiesBuilder - extends AbstractEventPropertiesBuilder { +public class FuncEmitEventPropertiesBuilder + extends AbstractEventPropertiesBuilder { @Override - protected FuncEventPropertiesBuilder self() { + protected FuncEmitEventPropertiesBuilder self() { return this; } - public FuncEventPropertiesBuilder data(Function function) { + public FuncEmitEventPropertiesBuilder data(SerializableFunction function) { this.eventProperties.setData(new EventDataFunction().withFunction(function)); return this; } - public FuncEventPropertiesBuilder data(Function function, Class clazz) { + public FuncEmitEventPropertiesBuilder data( + Function function, Class clazz) { + this.eventProperties.setData(new EventDataFunction().withFunction(function, clazz)); + return this; + } + + public FuncEmitEventPropertiesBuilder data( + ContextFunction function, Class clazz) { + this.eventProperties.setData(new EventDataFunction().withFunction(function, clazz)); + return this; + } + + public FuncEmitEventPropertiesBuilder data( + FilterFunction function, Class clazz) { this.eventProperties.setData(new EventDataFunction().withFunction(function, clazz)); return this; } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java index 5f63d8d46..13af9e883 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java @@ -20,7 +20,7 @@ import io.serverlessworkflow.fluent.spec.AbstractEmitTaskBuilder; public class FuncEmitTaskBuilder - extends AbstractEmitTaskBuilder + extends AbstractEmitTaskBuilder implements ConditionalTaskBuilder, FuncTaskTransformations { FuncEmitTaskBuilder() { @@ -33,7 +33,7 @@ protected FuncEmitTaskBuilder self() { } @Override - protected FuncEventPropertiesBuilder newEventPropertiesBuilder() { - return new FuncEventPropertiesBuilder(); + protected FuncEmitEventPropertiesBuilder newEventPropertiesBuilder() { + return new FuncEmitEventPropertiesBuilder(); } } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterBuilder.java index cac572acd..1d033d86d 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterBuilder.java @@ -18,8 +18,7 @@ import io.serverlessworkflow.fluent.spec.AbstractEventFilterBuilder; public class FuncEventFilterBuilder - extends AbstractEventFilterBuilder< - FuncEventFilterBuilder, FuncPredicateEventPropertiesBuilder> { + extends AbstractEventFilterBuilder { @Override protected FuncEventFilterBuilder self() { @@ -27,7 +26,7 @@ protected FuncEventFilterBuilder self() { } @Override - protected FuncPredicateEventPropertiesBuilder newEventPropertiesBuilder() { - return new FuncPredicateEventPropertiesBuilder(); + protected FuncEventFilterPropertiesBuilder newEventPropertiesBuilder() { + return new FuncEventFilterPropertiesBuilder(); } } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java new file mode 100644 index 000000000..1c322bc30 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterPropertiesBuilder.java @@ -0,0 +1,69 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func; + +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.serverlessworkflow.api.types.func.ContextPredicate; +import io.serverlessworkflow.api.types.func.EventDataPredicate; +import io.serverlessworkflow.api.types.func.FilterPredicate; +import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; +import java.util.function.Predicate; + +public class FuncEventFilterPropertiesBuilder + extends AbstractEventPropertiesBuilder { + + @Override + protected FuncEventFilterPropertiesBuilder self() { + return this; + } + + public FuncEventFilterPropertiesBuilder data(Predicate predicate) { + this.eventProperties.setData( + new EventDataPredicate().withPredicate(predicate, CloudEventData.class)); + return this; + } + + public FuncEventFilterPropertiesBuilder data(ContextPredicate predicate) { + this.eventProperties.setData( + new EventDataPredicate().withPredicate(predicate, CloudEventData.class)); + return this; + } + + public FuncEventFilterPropertiesBuilder data(FilterPredicate predicate) { + this.eventProperties.setData( + new EventDataPredicate().withPredicate(predicate, CloudEventData.class)); + return this; + } + + public FuncEventFilterPropertiesBuilder envelope(Predicate predicate) { + this.eventProperties.setData( + new EventDataPredicate().withPredicate(predicate, CloudEvent.class)); + return this; + } + + public FuncEventFilterPropertiesBuilder envelope(ContextPredicate predicate) { + this.eventProperties.setData( + new EventDataPredicate().withPredicate(predicate, CloudEvent.class)); + return this; + } + + public FuncEventFilterPropertiesBuilder envelope(FilterPredicate predicate) { + this.eventProperties.setData( + new EventDataPredicate().withPredicate(predicate, CloudEvent.class)); + return this; + } +} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncEventConfigurer.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncEventConfigurer.java deleted file mode 100644 index aac390748..000000000 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncEventConfigurer.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.fluent.func.configurers; - -import io.serverlessworkflow.fluent.func.FuncEventPropertiesBuilder; -import java.util.function.Consumer; - -@FunctionalInterface -public interface FuncEventConfigurer extends Consumer {} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncPredicateEventConfigurer.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncPredicateEventConfigurer.java deleted file mode 100644 index 5507193f6..000000000 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncPredicateEventConfigurer.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.fluent.func.configurers; - -import io.serverlessworkflow.fluent.func.FuncPredicateEventPropertiesBuilder; -import java.util.function.Consumer; - -@FunctionalInterface -public interface FuncPredicateEventConfigurer - extends Consumer {} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/BaseFuncListenSpec.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/BaseFuncListenSpec.java index 6c7f2d110..5715ddfe0 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/BaseFuncListenSpec.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/BaseFuncListenSpec.java @@ -17,20 +17,17 @@ import io.serverlessworkflow.fluent.func.FuncEventFilterBuilder; import io.serverlessworkflow.fluent.func.FuncListenToBuilder; -import io.serverlessworkflow.fluent.func.configurers.FuncPredicateEventConfigurer; import io.serverlessworkflow.fluent.spec.dsl.BaseListenSpec; import java.util.Objects; import java.util.function.Consumer; import java.util.function.Predicate; public abstract class BaseFuncListenSpec - extends BaseListenSpec< - SELF, LB, FuncListenToBuilder, FuncEventFilterBuilder, FuncPredicateEventConfigurer> { + extends BaseListenSpec { protected BaseFuncListenSpec(ToInvoker toInvoker) { super( toInvoker, - FuncEventFilterBuilder::with, // allApplier (tb, filters) -> tb.all(castFilters(filters)), // anyApplier diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/CommonFuncOps.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/CommonFuncOps.java index eb35b6b99..6da5d4bed 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/CommonFuncOps.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/CommonFuncOps.java @@ -15,12 +15,9 @@ */ package io.serverlessworkflow.fluent.func.dsl; -import io.cloudevents.CloudEventData; import io.serverlessworkflow.api.types.FlowDirectiveEnum; import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder; -import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder; import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder; -import io.serverlessworkflow.fluent.func.configurers.FuncPredicateEventConfigurer; import io.serverlessworkflow.fluent.func.configurers.SwitchCaseConfigurer; import java.util.function.Consumer; import java.util.function.Function; @@ -60,18 +57,4 @@ default SwitchCaseConfigurer caseDefault(String task) { default SwitchCaseConfigurer caseDefault(FlowDirectiveEnum directive) { return s -> s.then(directive); } - - default Consumer event( - String type, Function function) { - return event -> event.event(e -> e.type(type).data(function)); - } - - default Consumer event( - String type, Function function, Class clazz) { - return event -> event.event(e -> e.type(type).data(function, clazz)); - } - - default FuncPredicateEventConfigurer event(String type) { - return e -> e.type(type); - } } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/ConsumeStep.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/ConsumeStep.java index b9a99ac96..ae968ed91 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/ConsumeStep.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/ConsumeStep.java @@ -35,8 +35,7 @@ public final class ConsumeStep extends Step, FuncCallTaskBuild } @Override - protected void configure( - FuncTaskItemListBuilder list, java.util.function.Consumer post) { + protected void configure(FuncTaskItemListBuilder list, Consumer post) { if (name == null) { list.function( cb -> { diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java index ea0edd1a3..6ca79b0da 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java @@ -25,7 +25,6 @@ import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder; import io.serverlessworkflow.fluent.func.configurers.FuncCallHttpConfigurer; import io.serverlessworkflow.fluent.func.configurers.FuncCallOpenAPIConfigurer; -import io.serverlessworkflow.fluent.func.configurers.FuncPredicateEventConfigurer; import io.serverlessworkflow.fluent.func.configurers.FuncTaskConfigurer; import io.serverlessworkflow.fluent.func.configurers.SwitchCaseConfigurer; import io.serverlessworkflow.fluent.spec.configurers.AuthenticationConfigurer; @@ -58,13 +57,13 @@ * *
{@code
  * Workflow wf = FuncWorkflowBuilder.workflow("example")
- *   .tasks(
- *     FuncDSL.function(String::trim, String.class),
- *     FuncDSL.emitJson("org.acme.started", MyPayload.class),
- *     FuncDSL.listen(FuncDSL.toAny("type.one", "type.two"))
- *       .outputAs(map -> map.get("value")),
- *     FuncDSL.switchWhenOrElse((Integer v) -> v > 0, "positive", FlowDirectiveEnum.END, Integer.class)
- *   ).build();
+ * .tasks(
+ * FuncDSL.function(String::trim, String.class),
+ * FuncDSL.emit(FuncDSL.produced("org.acme.started").jsonData(MyPayload.class)),
+ * FuncDSL.listen(FuncDSL.toAny("type.one", "type.two"))
+ * .outputAs(map -> map.get("value")),
+ * FuncDSL.switchWhenOrElse((Integer v) -> v > 0, "positive", FlowDirectiveEnum.END, Integer.class)
+ * ).build();
  * }
*/ public final class FuncDSL { @@ -171,7 +170,11 @@ public static FuncListenSpec to() { * @return a {@link FuncListenSpec} set to {@code one(type)} */ public static FuncListenSpec toOne(String type) { - return new FuncListenSpec().one(e -> e.type(type)); + return new FuncListenSpec().one(consumed(type)); + } + + public static FuncListenSpec toOne(FuncEventFilterSpec filter) { + return new FuncListenSpec().one(filter); } /** @@ -181,9 +184,9 @@ public static FuncListenSpec toOne(String type) { * @return a {@link FuncListenSpec} set to {@code all(types...)} */ public static FuncListenSpec toAll(String... types) { - FuncPredicateEventConfigurer[] events = new FuncPredicateEventConfigurer[types.length]; + FuncEventFilterSpec[] events = new FuncEventFilterSpec[types.length]; for (int i = 0; i < types.length; i++) { - events[i] = event(types[i]); + events[i] = consumed(types[i]); } return new FuncListenSpec().all(events); } @@ -195,9 +198,9 @@ public static FuncListenSpec toAll(String... types) { * @return a {@link FuncListenSpec} set to {@code any(types...)} */ public static FuncListenSpec toAny(String... types) { - FuncPredicateEventConfigurer[] events = new FuncPredicateEventConfigurer[types.length]; + FuncEventFilterSpec[] events = new FuncEventFilterSpec[types.length]; for (int i = 0; i < types.length; i++) { - events[i] = event(types[i]); + events[i] = consumed(types[i]); } return new FuncListenSpec().any(events); } @@ -211,14 +214,15 @@ public static FuncListenSpec toAny(String... types) { * @param input type to the function * @return a consumer to configure {@link FuncEmitTaskBuilder} */ - public static Consumer event( + public static Consumer produced( String type, SerializableFunction function) { - return OPS.event(type, function, ReflectionUtils.inferInputType(function)); + return event -> + event.event(e -> e.type(type).data(function, ReflectionUtils.inferInputType(function))); } /** - * Same as {@link #event(String, SerializableFunction)} but with an explicit input class to guide - * conversion. + * Same as {@link #produced(String, SerializableFunction)} but with an explicit input class to + * guide conversion. * * @param type CloudEvent type * @param function function that maps workflow input to {@link CloudEventData} @@ -226,9 +230,21 @@ public static Consumer event( * @param input type * @return a consumer to configure {@link FuncEmitTaskBuilder} */ - public static Consumer event( + public static Consumer produced( String type, Function function, Class inputClass) { - return OPS.event(type, function, inputClass); + return event -> event.event(e -> e.type(type).data(function, inputClass)); + } + + public static Consumer produced( + String type, ContextFunction function) { + return event -> + event.event(e -> e.type(type).data(function, ReflectionUtils.inferInputType(function))); + } + + public static Consumer produced( + String type, FilterFunction function) { + return event -> + event.event(e -> e.type(type).data(function, ReflectionUtils.inferInputType(function))); } /** @@ -240,7 +256,7 @@ public static Consumer event( * @param input type * @return a consumer to configure {@link FuncEmitTaskBuilder} */ - public static Consumer eventJson(String type, Class inputClass) { + public static Consumer producedJson(String type, Class inputClass) { return b -> new FuncEmitSpec().type(type).jsonData(inputClass).accept(b); } @@ -253,7 +269,7 @@ public static Consumer eventJson(String type, Class * @param input type * @return a consumer to configure {@link FuncEmitTaskBuilder} */ - public static Consumer eventBytes( + public static Consumer producedBytes( String type, Function serializer, Class inputClass) { return b -> new FuncEmitSpec().type(type).bytesData(serializer, inputClass).accept(b); } @@ -265,18 +281,32 @@ public static Consumer eventBytes( * @param type CloudEvent type * @return a consumer to configure {@link FuncEmitTaskBuilder} */ - public static Consumer eventBytesUtf8(String type) { + public static Consumer producedBytesUtf8(String type) { return b -> new FuncEmitSpec().type(type).bytesDataUtf8().accept(b); } /** - * Create a predicate event configurer for {@code listen} specs. + * Starts building an event emission specification with a predefined type. * - * @param type CloudEvent type - * @return predicate event configurer for use in {@link FuncListenSpec} + * @param type CloudEvent type to be emitted + * @return a new {@link FuncEmitSpec} instance pre-configured with the event type + */ + public static FuncEmitSpec produced(String type) { + return new FuncEmitSpec().type(type); + } + + /** + * Starts building a function-centric event filter specification for a specific CloudEvent type. * + * + *

This creates an empty {@link FuncEventFilterSpec} which acts as a fluent builder for + * matching incoming CloudEvents. It is typically passed to a {@code listen} strategy like {@link + * #toOne(String)} or {@code to().any(...)}. + * + * @param type the {@code type} attribute of the CloudEvent to listen for + * @return a new {@link FuncEventFilterSpec} instance pre-configured with the event type */ - public static FuncPredicateEventConfigurer event(String type) { - return OPS.event(type); + public static FuncEventFilterSpec consumed(String type) { + return new FuncEventFilterSpec().type(type); } /** @@ -601,6 +631,19 @@ public static Consumer tasks(FuncTaskConfigurer... step return list -> snapshot.forEach(s -> s.accept(list)); } + /** + * Starts building a function-centric event emission specification. + * + *

This creates an empty {@link FuncEmitSpec} which acts as a fluent builder for the properties + * (e.g., type, source, data) of the CloudEvent to be emitted. It is typically passed to the + * {@link #emit(Consumer)} step. + * + * @return a new {@link FuncEmitSpec} instance for fluent configuration + */ + public static FuncEmitSpec produced() { + return new FuncEmitSpec(); + } + /** * Create an {@code emit} step from a low-level {@link FuncEmitTaskBuilder} configurer. Prefer * higher-level helpers like {@link #emitJson(String, Class)} where possible. @@ -632,7 +675,7 @@ public static EmitStep emit(String name, Consumer cfg) { * @return an {@link EmitStep} */ public static EmitStep emit(String type, SerializableFunction fn) { - return new EmitStep(null, event(type, fn)); + return new EmitStep(null, produced(type, fn)); } /** @@ -646,7 +689,7 @@ public static EmitStep emit(String type, SerializableFunction EmitStep emit( String name, String type, SerializableFunction fn) { - return new EmitStep(name, event(type, fn)); + return new EmitStep(name, produced(type, fn)); } /** @@ -661,7 +704,7 @@ public static EmitStep emit( */ public static EmitStep emit( String name, String type, Function serializer, Class inputClass) { - return new EmitStep(name, eventBytes(type, serializer, inputClass)); + return new EmitStep(name, producedBytes(type, serializer, inputClass)); } /** @@ -675,7 +718,7 @@ public static EmitStep emit( */ public static EmitStep emit( String type, Function serializer, Class inputClass) { - return new EmitStep(null, eventBytes(type, serializer, inputClass)); + return new EmitStep(null, producedBytes(type, serializer, inputClass)); } /** @@ -687,7 +730,7 @@ public static EmitStep emit( * @return an {@link EmitStep} */ public static EmitStep emitJson(String type, Class inputClass) { - return new EmitStep(null, eventJson(type, inputClass)); + return new EmitStep(null, producedJson(type, inputClass)); } /** @@ -700,7 +743,7 @@ public static EmitStep emitJson(String type, Class inputClass) { * @return a named {@link EmitStep} */ public static EmitStep emitJson(String name, String type, Class inputClass) { - return new EmitStep(name, eventJson(type, inputClass)); + return new EmitStep(name, producedJson(type, inputClass)); } /** @@ -789,7 +832,7 @@ public static FuncTaskConfigurer switchWhen( * JQ-based condition: if the JQ expression evaluates truthy → jump to {@code thenTask}. * *

-   *   switchWhen(".approved == true", "approveOrder")
+   * switchWhen(".approved == true", "approveOrder")
    * 
* *

The JQ expression is evaluated against the task input at runtime. When the predicate is @@ -852,7 +895,7 @@ public static FuncTaskConfigurer switchWhenOrElse( * follow the {@link FlowDirectiveEnum} given in {@code otherwise}. * *

-   *   switchWhenOrElse(".approved == true", "sendEmail", FlowDirectiveEnum.END)
+   * switchWhenOrElse(".approved == true", "sendEmail", FlowDirectiveEnum.END)
    * 
* *

The JQ expression is evaluated against the task input at runtime. @@ -878,7 +921,7 @@ public static FuncTaskConfigurer switchWhenOrElse( * to {@code otherwiseTask}. * *

-   *   switchWhenOrElse(".score >= 80", "pass", "fail")
+   * switchWhenOrElse(".score >= 80", "pass", "fail")
    * 
* *

The JQ expression is evaluated against the task input at runtime. @@ -997,11 +1040,11 @@ public static FuncTaskConfigurer call(String name, FuncCallHttpConfigurer config * *

{@code
    * tasks(
-   *   FuncDSL.call(
-   *     FuncDSL.http()
-   *       .GET()
-   *       .endpoint("http://service/api")
-   *   )
+   * FuncDSL.call(
+   * FuncDSL.http()
+   * .GET()
+   * .endpoint("http://service/api")
+   * )
    * );
    * }
* @@ -1017,11 +1060,11 @@ public static FuncTaskConfigurer call(FuncCallHttpStep spec) { * *
{@code
    * tasks(
-   *   FuncDSL.call("fetchUsers",
-   *     FuncDSL.http()
-   *       .GET()
-   *       .endpoint("http://service/users")
-   *   )
+   * FuncDSL.call("fetchUsers",
+   * FuncDSL.http()
+   * .GET()
+   * .endpoint("http://service/users")
+   * )
    * );
    * }
* @@ -1041,14 +1084,14 @@ public static FuncTaskConfigurer call(String name, FuncCallHttpStep spec) { * *
{@code
    * FuncWorkflowBuilder.workflow("openapi-call")
-   *   .tasks(
-   *     FuncDSL.call(
-   *       FuncDSL.openapi()
-   *         .document("https://petstore.swagger.io/v2/swagger.json", DSL.auth("openapi-auth"))
-   *         .operation("getPetById")
-   *     )
-   *   )
-   *   .build();
+   * .tasks(
+   * FuncDSL.call(
+   * FuncDSL.openapi()
+   * .document("https://petstore.swagger.io/v2/swagger.json", DSL.auth("openapi-auth"))
+   * .operation("getPetById")
+   * )
+   * )
+   * .build();
    * }
* * @param spec fluent OpenAPI spec built via {@link #openapi()} @@ -1065,16 +1108,16 @@ public static FuncTaskConfigurer call(FuncCallOpenAPIStep spec) { * *
{@code
    * FuncWorkflowBuilder.workflow("openapi-call-named")
-   *   .tasks(
-   *     FuncDSL.call(
-   *       "fetchPet",
-   *       FuncDSL.openapi()
-   *         .document("https://petstore.swagger.io/v2/swagger.json", DSL.auth("openapi-auth"))
-   *         .operation("getPetById")
-   *         .parameter("id", 123)
-   *     )
-   *   )
-   *   .build();
+   * .tasks(
+   * FuncDSL.call(
+   * "fetchPet",
+   * FuncDSL.openapi()
+   * .document("https://petstore.swagger.io/v2/swagger.json", DSL.auth("openapi-auth"))
+   * .operation("getPetById")
+   * .parameter("id", 123)
+   * )
+   * )
+   * .build();
    * }
* * @param name task name, or {@code null} for an anonymous task @@ -1120,10 +1163,10 @@ public static FuncTaskConfigurer call(String name, FuncCallOpenAPIConfigurer con * *
{@code
    * FuncDSL.call(
-   *   FuncDSL.openapi()
-   *     .document("https://petstore.swagger.io/v2/swagger.json", DSL.auth("openapi-auth"))
-   *     .operation("getPetById")
-   *     .parameter("id", 123)
+   * FuncDSL.openapi()
+   * .document("https://petstore.swagger.io/v2/swagger.json", DSL.auth("openapi-auth"))
+   * .operation("getPetById")
+   * .parameter("id", 123)
    * );
    * }
* @@ -1155,10 +1198,10 @@ public static FuncCallOpenAPIStep openapi(String name) { * *
{@code
    * FuncDSL.call(
-   *   FuncDSL.http()
-   *     .GET()
-   *     .endpoint("http://service/api")
-   *     .acceptJSON()
+   * FuncDSL.http()
+   * .GET()
+   * .endpoint("http://service/api")
+   * .acceptJSON()
    * );
    * }
* @@ -1183,8 +1226,8 @@ public static FuncCallHttpStep http(String name) { * *
{@code
    * FuncDSL.call(
-   *   FuncDSL.http("http://service/api", auth -> auth.use("my-auth"))
-   *     .GET()
+   * FuncDSL.http("http://service/api", auth -> auth.use("my-auth"))
+   * .GET()
    * );
    * }
* @@ -1212,9 +1255,9 @@ public static FuncCallHttpStep http(URI url, AuthenticationConfigurer auth) { * *
{@code
    * tasks(
-   *   FuncDSL.call(
-   *     FuncDSL.get("http://service/health")
-   *   )
+   * FuncDSL.call(
+   * FuncDSL.get("http://service/health")
+   * )
    * );
    * }
* @@ -1231,9 +1274,9 @@ public static FuncCallHttpStep get(String endpoint) { * *
{@code
    * tasks(
-   *   FuncDSL.call(
-   *     FuncDSL.get("checkHealth", "http://service/health")
-   *   )
+   * FuncDSL.call(
+   * FuncDSL.get("checkHealth", "http://service/health")
+   * )
    * );
    * }
* @@ -1251,9 +1294,9 @@ public static FuncCallHttpStep get(String name, String endpoint) { * *
{@code
    * tasks(
-   *   FuncDSL.call(
-   *     FuncDSL.get("http://service/api/users", auth -> auth.use("user-service-auth"))
-   *   )
+   * FuncDSL.call(
+   * FuncDSL.get("http://service/api/users", auth -> auth.use("user-service-auth"))
+   * )
    * );
    * }
* @@ -1326,12 +1369,12 @@ public static FuncCallHttpStep get(String name, URI endpoint, AuthenticationConf * *
{@code
    * tasks(
-   *   FuncDSL.call(
-   *     FuncDSL.post(
-   *       Map.of("name", "Ricardo"),
-   *       "http://service/api/users"
-   *     )
-   *   )
+   * FuncDSL.call(
+   * FuncDSL.post(
+   * Map.of("name", "Ricardo"),
+   * "http://service/api/users"
+   * )
+   * )
    * );
    * }
* diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEmitSpec.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEmitSpec.java index 115419ada..03c119331 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEmitSpec.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEmitSpec.java @@ -15,18 +15,77 @@ */ package io.serverlessworkflow.fluent.func.dsl; +import io.cloudevents.CloudEventData; +import io.cloudevents.core.data.BytesCloudEventData; +import io.cloudevents.core.data.PojoCloudEventData; +import io.serverlessworkflow.api.types.func.ContextFunction; +import io.serverlessworkflow.api.types.func.EventDataFunction; +import io.serverlessworkflow.fluent.func.FuncEmitEventPropertiesBuilder; import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder; import io.serverlessworkflow.fluent.func.configurers.FuncEmitConfigurer; +import io.serverlessworkflow.fluent.spec.dsl.EventEmitPropertiesSpec; +import io.serverlessworkflow.impl.jackson.JsonUtils; +import java.nio.charset.StandardCharsets; +import java.util.function.Function; -public class FuncEmitSpec extends FuncEventFilterSpec implements FuncEmitConfigurer { +public final class FuncEmitSpec + extends EventEmitPropertiesSpec + implements FuncEmitConfigurer { @Override - public void accept(FuncEmitTaskBuilder funcEmitTaskBuilder) { - funcEmitTaskBuilder.event(e -> getSteps().forEach(step -> step.accept(e))); + protected FuncEmitSpec self() { + return this; + } + + /** Sets the event data and the contentType to `application/json` */ + public FuncEmitSpec jsonData(SerializableFunction function) { + Class clazz = ReflectionUtils.inferInputType(function); + addPropertyStep(e -> e.data(new EventDataFunction().withFunction(function, clazz))); + return JSON(); + } + + /** Sets the event data and the contentType to `application/octet-stream` */ + public FuncEmitSpec bytesData(Function serializer, Class clazz) { + addPropertyStep( + e -> e.data(payload -> BytesCloudEventData.wrap(serializer.apply(payload)), clazz)); + return OCTET_STREAM(); + } + + public FuncEmitSpec bytesDataUtf8() { + return bytesData((String s) -> s.getBytes(StandardCharsets.UTF_8), String.class); + } + + /** Sets the event data and the contentType to `application/json` */ + public FuncEmitSpec jsonData(Function function, Class clazz) { + addPropertyStep(e -> e.data(new EventDataFunction().withFunction(function, clazz))); + return JSON(); + } + + /** JSON with default mapper (PojoCloudEventData + application/json). */ + public FuncEmitSpec jsonData(Class clazz) { + addPropertyStep( + e -> + e.data( + payload -> + PojoCloudEventData.wrap( + payload, p -> JsonUtils.mapper().writeValueAsString(p).getBytes()), + clazz)); + return JSON(); + } + + public FuncEmitSpec jsonData(ContextFunction function, Class clazz) { + addPropertyStep( + e -> + e.data( + payload -> + PojoCloudEventData.wrap( + payload, p -> JsonUtils.mapper().writeValueAsString(p).getBytes()), + clazz)); + return JSON(); } @Override - protected FuncEmitSpec self() { - return this; + public void accept(FuncEmitTaskBuilder funcEmitTaskBuilder) { + funcEmitTaskBuilder.event(e -> getPropertySteps().forEach(step -> step.accept(e))); } } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java index a7921273f..b20fc72b6 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java @@ -15,56 +15,351 @@ */ package io.serverlessworkflow.fluent.func.dsl; +import com.fasterxml.jackson.core.type.TypeReference; +import io.cloudevents.CloudEvent; import io.cloudevents.CloudEventData; -import io.cloudevents.core.data.BytesCloudEventData; +import io.cloudevents.core.CloudEventUtils; import io.cloudevents.core.data.PojoCloudEventData; -import io.serverlessworkflow.api.types.func.EventDataFunction; -import io.serverlessworkflow.fluent.func.FuncEventPropertiesBuilder; -import io.serverlessworkflow.fluent.spec.dsl.EventFilterSpec; +import io.cloudevents.jackson.PojoCloudEventDataMapper; +import io.serverlessworkflow.api.types.func.ContextPredicate; +import io.serverlessworkflow.api.types.func.FilterPredicate; +import io.serverlessworkflow.fluent.func.FuncEventFilterBuilder; +import io.serverlessworkflow.fluent.func.FuncEventFilterPropertiesBuilder; +import io.serverlessworkflow.fluent.spec.dsl.AbstractEventFilterSpec; +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowContextData; import io.serverlessworkflow.impl.jackson.JsonUtils; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.function.Function; +import java.util.Map; +import java.util.Objects; -public abstract class FuncEventFilterSpec - extends EventFilterSpec { +/** + * Fluent DSL specification builder for configuring CloudEvent filters within a Serverless Workflow + * execution. + */ +public final class FuncEventFilterSpec + extends AbstractEventFilterSpec< + FuncEventFilterSpec, FuncEventFilterPropertiesBuilder, FuncEventFilterBuilder> { + + /** + * Returns the current instance of the builder. + * + * @return the current {@link FuncEventFilterSpec} instance. + */ + @Override + protected FuncEventFilterSpec self() { + return this; + } + + /** + * Configures the filter to match incoming event based on a Predicate. This is the Listen + * counterpart to Emit's jsonData(Function). + * + * @param predicate the predicate to evaluate against the entire {@link CloudEvent}. + * @return the current {@link FuncEventFilterSpec} instance. + */ + public FuncEventFilterSpec envelope(SerializablePredicate predicate) { + addPropertyStep(e -> e.envelope(predicate)); + return this; + } + + /** + * Configures the filter to match incoming event data based on a ContextPredicate, allowing + * evaluation against the {@link CloudEvent} and the current {@link WorkflowContextData}. + * + * @param predicate the context-aware predicate to evaluate. + * @return the current {@link FuncEventFilterSpec} instance. + */ + public FuncEventFilterSpec envelope(ContextPredicate predicate) { + addPropertyStep(e -> e.envelope(predicate)); + return this; + } + + /** + * Configures the filter to match incoming event data based on a FilterPredicate, allowing + * evaluation against the {@link CloudEvent}, {@link WorkflowContextData}, and {@link + * TaskContextData}. + * + * @param predicate the filter predicate to evaluate. + * @return the current {@link FuncEventFilterSpec} instance. + */ + public FuncEventFilterSpec envelope(FilterPredicate predicate) { + addPropertyStep(e -> e.envelope(predicate)); + return this; + } + + /** + * Configures the filter to match incoming event data based on a Predicate evaluated directly + * against the raw {@link CloudEventData} payload. + * + * @param predicate the predicate to evaluate against the event data. + * @return the current {@link FuncEventFilterSpec} instance. + */ + public FuncEventFilterSpec data(SerializablePredicate predicate) { + addPropertyStep(e -> e.data(predicate)); + return this; + } + + /** + * Configures the filter to match incoming event data based on a ContextPredicate evaluated + * directly against the raw {@link CloudEventData} payload and the current {@link + * WorkflowContextData}. + * + * @param predicate the context-aware predicate to evaluate. + * @return the current {@link FuncEventFilterSpec} instance. + */ + public FuncEventFilterSpec data(ContextPredicate predicate) { + addPropertyStep(e -> e.data(predicate)); + return this; + } + + /** + * Configures the filter to match incoming event data based on a FilterPredicate evaluated + * directly against the raw {@link CloudEventData} payload, {@link WorkflowContextData}, and + * {@link TaskContextData}. + * + * @param predicate the filter predicate to evaluate. + * @return the current {@link FuncEventFilterSpec} instance. + */ + public FuncEventFilterSpec data(FilterPredicate predicate) { + addPropertyStep(e -> e.data(predicate)); + return this; + } + + /** + * Evaluates the given predicate against the CloudEvent data payload, automatically parsed as a + * Map. + * + *

For example, you can evaluate this filter as: + * + *

+   * .dataAsMap(map -> "123".equals(map.get("orderId")))
+   * 
+ * + * @param predicate the predicate to evaluate against the parsed Map. + * @return the current {@link FuncEventFilterSpec} instance. + */ + public FuncEventFilterSpec dataAsMap(SerializablePredicate> predicate) { + addPropertyStep( + e -> + e.envelope( + (CloudEvent ce) -> { + Map ceDataMap = asCEDataMap(ce); + return !ceDataMap.isEmpty() && predicate.test(ceDataMap); + })); + return this; + } - FuncEventFilterSpec() { - super(new ArrayList<>()); + /** + * Evaluates the given ContextPredicate against the CloudEvent data payload (parsed as a Map) and + * the current WorkflowContextData. + * + * @param predicate the context-aware predicate to evaluate against the parsed Map. + * @return the current {@link FuncEventFilterSpec} instance. + */ + public FuncEventFilterSpec dataAsMap(ContextPredicate> predicate) { + addPropertyStep( + e -> + e.envelope( + (CloudEvent ce, WorkflowContextData context) -> { + Map ceDataMap = asCEDataMap(ce); + return !ceDataMap.isEmpty() && predicate.test(ceDataMap, context); + })); + return this; } - /** Sets the event data and the contentType to `application/json` */ - public SELF jsonData(SerializableFunction function) { - Class clazz = ReflectionUtils.inferInputType(function); - addStep(e -> e.data(new EventDataFunction().withFunction(function, clazz))); - return JSON(); + /** + * Evaluates the given FilterPredicate against the CloudEvent data payload (parsed as a Map), the + * current WorkflowContextData, and the TaskContextData. + * + * @param predicate the filter predicate to evaluate against the parsed Map and task/workflow + * contexts. + * @return the current {@link FuncEventFilterSpec} instance. + */ + public FuncEventFilterSpec dataAsMap(FilterPredicate> predicate) { + addPropertyStep( + e -> + e.envelope( + (CloudEvent ce, WorkflowContextData context, TaskContextData taskContext) -> { + Map ceDataMap = asCEDataMap(ce); + return !ceDataMap.isEmpty() && predicate.test(ceDataMap, context, taskContext); + })); + return this; + } + + /** + * Evaluates the given predicate against the CloudEvent data payload, automatically parsed into + * the specified target type. + * + *

For example, you can evaluate this filter as: + * + *

+   * .dataAs(Order.class, order -> order.getId() == 123)
+   * 
+ * + * @param targetType The class of the type to deserialize the payload into. + * @param predicate The predicate to evaluate against the parsed object. + * @param The target type. + * @return the current {@link FuncEventFilterSpec} instance. + */ + public FuncEventFilterSpec dataAs(Class targetType, SerializablePredicate predicate) { + addPropertyStep( + e -> + e.envelope( + (CloudEvent ce) -> { + T parsedData = parseCEData(ce, targetType); + return parsedData != null && predicate.test(parsedData); + })); + return this; } - /** Sets the event data and the contentType to `application/octet-stream` */ - public SELF bytesData(Function serializer, Class clazz) { - addStep(e -> e.data(payload -> BytesCloudEventData.wrap(serializer.apply(payload)), clazz)); - return OCTET_STREAM(); + /** + * Evaluates the given ContextPredicate against the CloudEvent data payload (parsed into the + * specified target type) and the current WorkflowContextData. + * + * @param targetType The class of the type to deserialize the payload into. + * @param predicate The context-aware predicate to evaluate against the parsed object. + * @param The target type. + * @return the current {@link FuncEventFilterSpec} instance. + */ + public FuncEventFilterSpec dataAs(Class targetType, ContextPredicate predicate) { + addPropertyStep( + e -> + e.envelope( + (CloudEvent ce, WorkflowContextData context) -> { + T parsedData = parseCEData(ce, targetType); + return parsedData != null && predicate.test(parsedData, context); + })); + return this; + } + + /** + * Evaluates the given FilterPredicate against the CloudEvent data payload (parsed into the + * specified target type), the current WorkflowContextData, and the TaskContextData. + * + * @param targetType The class of the type to deserialize the payload into. + * @param predicate The filter predicate to evaluate against the parsed object and contexts. + * @param The target type. + * @return the current {@link FuncEventFilterSpec} instance. + */ + public FuncEventFilterSpec dataAs(Class targetType, FilterPredicate predicate) { + addPropertyStep( + e -> + e.envelope( + (CloudEvent ce, WorkflowContextData context, TaskContextData taskContext) -> { + T parsedData = parseCEData(ce, targetType); + return parsedData != null && predicate.test(parsedData, context, taskContext); + })); + return this; } - public SELF bytesDataUtf8() { - return bytesData((String s) -> s.getBytes(StandardCharsets.UTF_8), String.class); + /** + * Filter events which field carries the current workflow instance ID. For example, given the data + * payload: + * + *
+   * {
+   * "order": { "number": 123 },
+   * "workflowInstanceId": "123456789"
+   * }
+   * 
+ * + *

You would call dataByInstanceId("workflowInstanceId"). Events matching the + * current instance ID in the field workflowInstanceId would match this filter. + * + * @param fieldName name of the field in the CE data that carries the instance ID. + * @return the current {@link FuncEventFilterSpec} instance. + */ + public FuncEventFilterSpec dataByInstanceId(String fieldName) { + addPropertyStep( + e -> + e.envelope( + (CloudEvent ce, WorkflowContextData context) -> { + Map ceDataMap = asCEDataMap(ce); + return Objects.equals(ceDataMap.get(fieldName), context.instanceData().id()); + })); + return this; } - /** Sets the event data and the contentType to `application/json` */ - public SELF jsonData(Function function, Class clazz) { - addStep(e -> e.data(new EventDataFunction().withFunction(function, clazz))); - return JSON(); + /** + * Same as {@link #dataByInstanceId(String)}, but now the filter looks at the CE extension name. + * + * @param extensionName the extension name where to fetch the given workflow instance ID to match + * with the current execution. + * @return the current {@link FuncEventFilterSpec} instance. + */ + public FuncEventFilterSpec extensionByInstanceId(String extensionName) { + addPropertyStep( + e -> + e.envelope( + (CloudEvent ce, WorkflowContextData context) -> + context.instanceData().id().equals(ce.getExtension(extensionName)))); + return this; } - /** JSON with default mapper (PojoCloudEventData + application/json). */ - public SELF jsonData(Class clazz) { - addStep( + /** + * Matches events that carry in the CE data payload fields with the same values as the input of + * the current task. + * + *

For example, you can filter events carrying specific data using + * dataFields("orderId", "customerId"). Events where the CE data payload matches the task + * input for all provided fields will pass this filter. + * + * @param fieldNames the field names to match this filter. + * @return the current {@link FuncEventFilterSpec} instance. + */ + public FuncEventFilterSpec dataFields(String... fieldNames) { + if (fieldNames == null || fieldNames.length == 0) return this; + + addPropertyStep( e -> - e.data( - payload -> - PojoCloudEventData.wrap( - payload, p -> JsonUtils.mapper().writeValueAsString(p).getBytes()), - clazz)); - return JSON(); + e.envelope( + (CloudEvent ce, WorkflowContextData context, TaskContextData taskContext) -> { + Map input = taskContext.rawInput().asMap().orElse(Map.of()); + Map ceDataMap = asCEDataMap(ce); + + return !ceDataMap.isEmpty() + && java.util.Arrays.stream(fieldNames) + .allMatch( + fieldName -> + Objects.equals(ceDataMap.get(fieldName), input.get(fieldName))); + })); + return this; + } + + /** + * Helper method to safely extract and parse the {@link CloudEvent} data payload into a Map. + * + * @param ce the incoming {@link CloudEvent} to parse. + * @return a {@link Map} containing the parsed data, or an empty map if parsing fails or data is + * null. + */ + private Map asCEDataMap(CloudEvent ce) { + if (ce.getData() == null) return Map.of(); + PojoCloudEventData> mappedData = + CloudEventUtils.mapData( + ce, PojoCloudEventDataMapper.from(JsonUtils.mapper(), new TypeReference<>() {})); + if (mappedData == null || mappedData.getValue() == null) { + return Map.of(); + } + + return mappedData.getValue(); + } + + /** + * Helper method to safely parse the {@link CloudEvent} data payload into a specified target class + * type. + * + * @param ce the incoming {@link CloudEvent} to parse. + * @param targetType the class representing the target type. + * @param the target type parameter. + * @return an instance of the parsed type, or null if parsing fails or data is null. + */ + private T parseCEData(CloudEvent ce, Class targetType) { + if (ce.getData() == null) return null; + try { + return JsonUtils.mapper().readValue(ce.getData().toBytes(), targetType); + } catch (Exception e) { + return null; + } } } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/Step.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/Step.java index 5e22a65d9..94670629b 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/Step.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/Step.java @@ -107,7 +107,7 @@ public SELF then(FlowDirectiveEnum directive) { * It allows chaining transformations in a fluent manner. * *

{@code exportAs} controls what the task exports for downstream consumers - * (the next task, events, etc.) without immediately updating global workflow data. + * (the next task, events, etc.), immediately updating the workflow context. * *

Example: * @@ -117,8 +117,8 @@ public SELF then(FlowDirectiveEnum directive) { * .when(condition); * } * - * @param the task result type - * @param the export type (what gets forwarded to the next step) + * @param the context type + * @param the export type (what gets written in the workflow context) * @param function the transformation function * @return this step for method chaining * @see io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations#exportAs(Function) @@ -132,15 +132,15 @@ public SELF exportAs(SerializableFunction function) { } /** - * Shapes what the task exports for downstream consumers using a Java function with explicit input - * type. + * Shapes what the task exports for downstream consumers using a Java function with explicit + * context input type. * *

This variant allows you to explicitly specify the input type class for better type safety. * - * @param the task result type - * @param the export type (what gets forwarded to the next step) + * @param the workflow context type + * @param the export type (what gets written in the context) * @param function the transformation function - * @param taskResultClass the class of the task result type + * @param taskResultClass the class of the workflow context type * @return this step for method chaining * @see io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations#exportAs(Function, Class) */ @@ -155,8 +155,8 @@ public SELF exportAs(Function function, Class taskResultClass) { *

This variant provides access to both workflow and task context, allowing you to inspect * metadata when shaping the export. * - * @param the task result type - * @param the export type (what gets forwarded to the next step) + * @param the workflow context type + * @param the export type (what gets written in the context) * @param function the filter function with workflow and task context * @return this step for method chaining * @see io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations#exportAs(FilterFunction) @@ -173,10 +173,10 @@ public SELF exportAs(FilterFunction function) { * Shapes what the task exports for downstream consumers using a context-aware filter function * with explicit input type. * - * @param the task result type - * @param the export type (what gets forwarded to the next step) + * @param the workflow context type + * @param the export type (what gets written in the context) * @param function the filter function with workflow and task context - * @param taskResultClass the class of the task result type + * @param taskResultClass the class of the workflow context type * @return this step for method chaining * @see io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations#exportAs(FilterFunction, * Class) @@ -192,8 +192,8 @@ public SELF exportAs(FilterFunction function, Class taskResultCl *

This variant provides access to workflow context, allowing you to inspect workflow metadata * when shaping the export. * - * @param the task result type - * @param the export type (what gets forwarded to the next step) + * @param the workflow context type + * @param the export type (what gets written in the context) * @param function the context function with workflow context * @return this step for method chaining * @see io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations#exportAs(ContextFunction) @@ -210,10 +210,10 @@ public SELF exportAs(ContextFunction function) { * Shapes what the task exports for downstream consumers using a context-aware function with * explicit input type. * - * @param the task result type - * @param the export type (what gets forwarded to the next step) + * @param the workflow context type + * @param the export type (what gets written in the context) * @param function the context function with workflow context - * @param taskResultClass the class of the task result type + * @param taskResultClass the class of the workflow context type * @return this step for method chaining * @see io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations#exportAs(ContextFunction, * Class) @@ -223,6 +223,19 @@ public SELF exportAs(ContextFunction function, Class taskResultC return self(); } + /** + * Overrides the workflow context with the current task output. + * + * @return this step for method chaining + */ + public SELF exportAsTaskOutput() { + postConfigurers.add( + b -> + ((FuncTaskTransformations) b) + .exportAs((context, workflowContext, taskContext) -> taskContext.output())); + return self(); + } + /** * Shapes what the task exports for downstream consumers using a JQ expression. * @@ -232,6 +245,7 @@ public SELF exportAs(ContextFunction function, Class taskResultC *

Example: * *

{@code
+   * // Given that your context has this attribute already
    * exportAs("$.username")
    * }
* diff --git a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLTest.java b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLTest.java index db0386709..a11d2c753 100644 --- a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLTest.java +++ b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLTest.java @@ -18,11 +18,12 @@ import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.call; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.consume; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.emit; -import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.event; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.get; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.http; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.listen; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.produced; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.switchWhenOrElse; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.toOne; import static io.serverlessworkflow.fluent.spec.dsl.DSL.use; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -30,7 +31,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; -import io.cloudevents.core.data.BytesCloudEventData; import io.serverlessworkflow.api.types.CallHTTP; import io.serverlessworkflow.api.types.Export; import io.serverlessworkflow.api.types.FlowDirectiveEnum; @@ -40,10 +40,7 @@ import io.serverlessworkflow.api.types.func.CallJava; import io.serverlessworkflow.api.types.func.FilterFunction; import io.serverlessworkflow.fluent.func.dsl.FuncDSL; -import io.serverlessworkflow.fluent.func.dsl.FuncEmitSpec; -import io.serverlessworkflow.fluent.func.dsl.FuncListenSpec; import java.net.URI; -import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import org.junit.jupiter.api.DisplayName; @@ -92,18 +89,14 @@ void function_step_when_compiles_and_builds() { @Test void emit_step_exportAs_javaFilter_sets_export() { - // Build an emit spec using your DSL (type + data function) - FuncEmitSpec spec = - new FuncEmitSpec() - .type("org.acme.signal") - .bytesData((String s) -> s.getBytes(StandardCharsets.UTF_8), String.class); - FilterFunction> jf = (val, wfCtx, taskCtx) -> Map.of("wrapped", val, "wfId", wfCtx.instanceData().id()); Workflow wf = FuncWorkflowBuilder.workflow("step-emit-export") - .tasks(emit("emitWrapped", spec).exportAs(jf)) // chaining on Step + .tasks( + emit("emitWrapped", produced("org.acme.signal").bytesDataUtf8()) + .exportAs(jf)) // chaining on Step .build(); List items = wf.getDo(); @@ -122,11 +115,11 @@ void emit_step_exportAs_javaFilter_sets_export() { @Test @DisplayName("listen(spec).exportAs(Function) sets Export on ListenTask holder") void listen_step_exportAs_function_sets_export() { - FuncListenSpec spec = toOne("org.acme.review.done"); // using your existing DSL helper - Workflow wf = FuncWorkflowBuilder.workflow("step-listen-export") - .tasks(listen("waitHumanReview", spec).exportAs((Object e) -> Map.of("seen", true))) + .tasks( + listen("waitHumanReview", toOne("org.acme.review.done")) + .exportAs((Object e) -> Map.of("seen", true))) .build(); List items = wf.getDo(); @@ -142,13 +135,11 @@ void listen_step_exportAs_function_sets_export() { } @Test - @DisplayName("emit(event(type, fn)).when(...) -> still an EmitTask and builds") + @DisplayName("emit(produced(type, fn)).when(...) -> still an EmitTask and builds") void emit_step_when_compiles_and_builds() { Workflow wf = FuncWorkflowBuilder.workflow("step-emit-when") - .tasks( - emit(event("org.acme.sig", (String s) -> BytesCloudEventData.wrap(s.getBytes()))) - .when((Object ctx) -> true)) + .tasks(emit(produced("org.acme.sig").bytesDataUtf8()).when((Object ctx) -> true)) .build(); List items = wf.getDo(); @@ -163,9 +154,7 @@ void mixed_chaining_order_and_exports() { FuncWorkflowBuilder.workflow("step-mixed") .tasks( function(String::strip, String.class).exportAs((String s) -> Map.of("s", s)), - emit(event( - "org.acme.kickoff", (String s) -> BytesCloudEventData.wrap(s.getBytes()))) - .when((Object ignore) -> true), + emit(produced("org.acme.kickoff").bytesDataUtf8()).when((Object ignore) -> true), listen(toOne("org.acme.done")).exportAs((Object e) -> Map.of("ok", true))) .build(); @@ -189,7 +178,7 @@ void mixed_chaining_order_and_exports() { void switchWhenOrElse_jq_to_taskName() { Workflow wf = FuncWorkflowBuilder.workflow("jqSwitch") - .tasks(FuncDSL.switchWhenOrElse(".approved", "send", "draft")) + .tasks(switchWhenOrElse(".approved", "send", "draft")) .build(); Task switchTask = wf.getDo().get(0).getTask(); assertNotNull(switchTask.getSwitchTask()); @@ -202,7 +191,7 @@ void switchWhenOrElse_jq_to_taskName() { void switchWhenOrElse_jq_to_directive() { Workflow wf = FuncWorkflowBuilder.workflow("jqSwitchDir") - .tasks(FuncDSL.switchWhenOrElse(".score >= 80", "pass", FlowDirectiveEnum.END)) + .tasks(switchWhenOrElse(".score >= 80", "pass", FlowDirectiveEnum.END)) .build(); Task switchTask = wf.getDo().get(0).getTask(); var items = switchTask.getSwitchTask().getSwitch(); diff --git a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java index 213abc941..9b3b5136e 100644 --- a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java +++ b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java @@ -17,8 +17,11 @@ import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.agent; import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.withUniqueId; -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import io.serverlessworkflow.api.types.Task; import io.serverlessworkflow.api.types.TaskItem; diff --git a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncEventFilterSpecTest.java b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncEventFilterSpecTest.java new file mode 100644 index 000000000..26c8be35b --- /dev/null +++ b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncEventFilterSpecTest.java @@ -0,0 +1,141 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func; + +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.consumed; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.listen; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.toOne; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import io.serverlessworkflow.api.types.Task; +import io.serverlessworkflow.api.types.TaskItem; +import io.serverlessworkflow.api.types.Workflow; +import java.util.List; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +/** + * Tests for the Event Filter DSL specification. Verifies that the fluent builder correctly wires + * the payload parsing and contextual lambdas into the final Workflow definitions. + */ +class FuncEventFilterSpecTest { + + // Dummy POJO for dataAs(Class) testing + static class TestOrder { + private int id; + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + } + + @Test + @DisplayName("consumed(...).dataAsMap builds successfully into a ListenTask") + void dataAsMap_compilesAndBuildsListenTask() { + Workflow wf = + FuncWorkflowBuilder.workflow("filter-data-as-map") + .tasks( + listen( + "listenMap", + toOne(consumed("org.test.event").dataAsMap(map -> map.containsKey("orderId"))))) + .build(); + + List items = wf.getDo(); + assertEquals(1, items.size()); + + Task t = items.get(0).getTask(); + assertNotNull(t.getListenTask(), "ListenTask expected for dataAsMap filter"); + } + + @Test + @DisplayName("consumed(...).dataAs(Class) builds successfully into a ListenTask") + void dataAsClass_compilesAndBuildsListenTask() { + Workflow wf = + FuncWorkflowBuilder.workflow("filter-data-as-class") + .tasks( + listen( + "listenClass", + toOne( + consumed("org.test.event") + .dataAs(TestOrder.class, order -> order.getId() > 0)))) + .build(); + + List items = wf.getDo(); + assertEquals(1, items.size()); + + Task t = items.get(0).getTask(); + assertNotNull(t.getListenTask(), "ListenTask expected for dataAs(Class) filter"); + } + + @Test + @DisplayName("consumed(...).dataFields builds successfully into a ListenTask") + void dataFields_compilesAndBuildsListenTask() { + Workflow wf = + FuncWorkflowBuilder.workflow("filter-data-fields") + .tasks( + listen( + "listenFields", + toOne(consumed("org.test.event").dataFields("orderId", "customerId")))) + .build(); + + List items = wf.getDo(); + assertEquals(1, items.size()); + + Task t = items.get(0).getTask(); + assertNotNull(t.getListenTask(), "ListenTask expected for dataFields filter"); + } + + @Test + @DisplayName("consumed(...).dataByInstanceId builds successfully into a ListenTask") + void dataByInstanceId_compilesAndBuildsListenTask() { + Workflow wf = + FuncWorkflowBuilder.workflow("filter-data-instance-id") + .tasks( + listen( + "listenDataId", + toOne(consumed("org.test.event").dataByInstanceId("workflowInstanceId")))) + .build(); + + List items = wf.getDo(); + assertEquals(1, items.size()); + + Task t = items.get(0).getTask(); + assertNotNull(t.getListenTask(), "ListenTask expected for dataByInstanceId filter"); + } + + @Test + @DisplayName("consumed(...).extensionByInstanceId builds successfully into a ListenTask") + void extensionByInstanceId_compilesAndBuildsListenTask() { + Workflow wf = + FuncWorkflowBuilder.workflow("filter-ext-instance-id") + .tasks( + listen( + "listenExtId", + toOne(consumed("org.test.event").extensionByInstanceId("workflowinstanceid")))) + .build(); + + List items = wf.getDo(); + assertEquals(1, items.size()); + + Task t = items.get(0).getTask(); + assertNotNull(t.getListenTask(), "ListenTask expected for extensionByInstanceId filter"); + } +} diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/EventDataFunction.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/EventDataFunction.java index 7c7193895..b7fea24ef 100644 --- a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/EventDataFunction.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/EventDataFunction.java @@ -31,4 +31,24 @@ public EventData withFunction(Function value, Class argClass) { setObject(new TypedFunction<>(value, argClass)); return this; } + + public EventData withFunction(FilterFunction value) { + setObject(value); + return this; + } + + public EventData withFunction(FilterFunction value, Class argClass) { + setObject(new TypedFilterFunction<>(value, argClass)); + return this; + } + + public EventData withFunction(ContextFunction value) { + setObject(value); + return this; + } + + public EventData withFunction(ContextFunction value, Class argClass) { + setObject(new TypedContextFunction<>(value, argClass)); + return this; + } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventFilterBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventFilterBuilder.java index 9d9099ceb..80bfba154 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventFilterBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventFilterBuilder.java @@ -37,11 +37,8 @@ public SELF with(Consumer

c) { } public SELF correlate(String key, Consumer c) { - ListenTaskBuilder.CorrelatePropertyBuilder cpb = - new ListenTaskBuilder.CorrelatePropertyBuilder(); - c.accept(cpb); - correlate.withAdditionalProperty(key, cpb.build()); - return self(); + throw new UnsupportedOperationException( + "correlate is not supported in the engine level: https://github.com/serverlessworkflow/sdk-java/issues/1206"); } public EventFilter build() { diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractListenTaskBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractListenTaskBuilder.java index 28458d313..c94d78453 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractListenTaskBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractListenTaskBuilder.java @@ -70,12 +70,12 @@ public ListenTask build() { public static final class CorrelatePropertyBuilder { private final CorrelateProperty prop = new CorrelateProperty(); - public ListenTaskBuilder.CorrelatePropertyBuilder from(String expr) { + public CorrelatePropertyBuilder from(String expr) { prop.setFrom(expr); return this; } - public ListenTaskBuilder.CorrelatePropertyBuilder expect(String val) { + public CorrelatePropertyBuilder expect(String val) { prop.setExpect(val); return this; } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/configurers/EventConfigurer.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/configurers/EventPropertiesConfigurer.java similarity index 90% rename from fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/configurers/EventConfigurer.java rename to fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/configurers/EventPropertiesConfigurer.java index 3cb365f99..4e24c0ea6 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/configurers/EventConfigurer.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/configurers/EventPropertiesConfigurer.java @@ -19,4 +19,4 @@ import java.util.function.Consumer; @FunctionalInterface -public interface EventConfigurer extends Consumer {} +public interface EventPropertiesConfigurer extends Consumer {} diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/AbstractEventFilterSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/AbstractEventFilterSpec.java new file mode 100644 index 000000000..29799c801 --- /dev/null +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/AbstractEventFilterSpec.java @@ -0,0 +1,61 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.spec.dsl; + +import io.serverlessworkflow.fluent.spec.AbstractEventFilterBuilder; +import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +public abstract class AbstractEventFilterSpec< + SELF, + EVENT_PROPS extends AbstractEventPropertiesBuilder, + EVENT_FILTER extends AbstractEventFilterBuilder> + extends ExprEventFilterSpec implements Consumer { + + private final List> filterSteps = new ArrayList<>(); + + protected AbstractEventFilterSpec() {} + + protected abstract SELF self(); + + protected void addFilterStep(Consumer step) { + filterSteps.add(step); + } + + protected List> getFilterSteps() { + return filterSteps; + } + + // TODO: "correlate is not supported in the engine level: + // https://github.com/serverlessworkflow/sdk-java/issues/1206". Keeping the code for a future + // reference. + // public SELF correlate(String key, Consumer c) { + // filterSteps.add(f -> f.correlate(key, c)); + // return self(); + // } + + @Override + public void accept(EVENT_FILTER filterBuilder) { + filterBuilder.with( + pb -> { + getPropertySteps().forEach(step -> step.accept(pb)); + }); + + filterSteps.forEach(step -> step.accept(filterBuilder)); + } +} diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/BaseListenSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/BaseListenSpec.java index fbf45545a..d32efe396 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/BaseListenSpec.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/BaseListenSpec.java @@ -21,52 +21,43 @@ /** * Generic base for Listen specs. * - *

Type params: SELF - fluent self type (the concrete spec) LB - ListenTaskBuilder type (e.g., - * ListenTaskBuilder, AgentListenTaskBuilder, FuncListenTaskBuilder) TB - ListenToBuilder type - * (e.g., ListenToBuilder, FuncListenToBuilder) FB - EventFilterBuilder type (e.g., - * EventFilterBuilder, FuncEventFilterBuilder) EC - Event configurer type (e.g., EventConfigurer, - * FuncPredicateEventConfigurer) + *

Type params: SELF - fluent self type (the concrete spec) LISTEN_TASK - {@link + * io.serverlessworkflow.fluent.spec.ListenTaskBuilder} type LISTEN_TO - {@link + * io.serverlessworkflow.fluent.spec.ListenToBuilder} type EVENT_FILTER - {@link + * io.serverlessworkflow.fluent.spec.EventFilterBuilder} type */ -public abstract class BaseListenSpec { +public abstract class BaseListenSpec { @FunctionalInterface - public interface ToInvoker { - void to(LB listenTaskBuilder, Consumer toStep); + public interface ToInvoker { + void to(LISTEN_TASK listenTaskBuilder, Consumer toStep); } @FunctionalInterface - public interface WithApplier { - void with(FB filterBuilder, EC eventConfigurer); + public interface FiltersApplier { + void apply(LISTEN_TO toBuilder, @SuppressWarnings("rawtypes") Consumer[] filters); } @FunctionalInterface - public interface FiltersApplier { - void apply(TB toBuilder, @SuppressWarnings("rawtypes") Consumer[] filters); + public interface OneFilterApplier { + void apply(LISTEN_TO toBuilder, Consumer filter); } - @FunctionalInterface - public interface OneFilterApplier { - void apply(TB toBuilder, Consumer filter); - } - - private final ToInvoker toInvoker; - private final WithApplier withApplier; - private final FiltersApplier allApplier; - private final FiltersApplier anyApplier; - private final OneFilterApplier oneApplier; + private final ToInvoker toInvoker; + private final FiltersApplier allApplier; + private final FiltersApplier anyApplier; + private final OneFilterApplier oneApplier; - private Consumer strategyStep; - private Consumer untilStep; + private Consumer strategyStep; + private Consumer untilStep; protected BaseListenSpec( - ToInvoker toInvoker, - WithApplier withApplier, - FiltersApplier allApplier, - FiltersApplier anyApplier, - OneFilterApplier oneApplier) { + ToInvoker toInvoker, + FiltersApplier allApplier, + FiltersApplier anyApplier, + OneFilterApplier oneApplier) { this.toInvoker = Objects.requireNonNull(toInvoker, "toInvoker"); - this.withApplier = Objects.requireNonNull(withApplier, "withApplier"); this.allApplier = Objects.requireNonNull(allApplier, "allApplier"); this.anyApplier = Objects.requireNonNull(anyApplier, "anyApplier"); this.oneApplier = Objects.requireNonNull(oneApplier, "oneApplier"); @@ -74,42 +65,31 @@ protected BaseListenSpec( protected abstract SELF self(); - protected final void setUntilStep(Consumer untilStep) { + protected final void setUntilStep(Consumer untilStep) { this.untilStep = untilStep; } - /** Convert EC[] -> Consumer[] that call `filterBuilder.with(event)` */ - @SuppressWarnings("unchecked") - protected final Consumer[] asFilters(EC... events) { - Objects.requireNonNull(events, "events"); - Consumer[] filters = new Consumer[events.length]; - for (int i = 0; i < events.length; i++) { - EC ev = Objects.requireNonNull(events[i], "events[" + i + "]"); - filters[i] = fb -> withApplier.with(fb, ev); - } - return filters; - } - @SafeVarargs - public final SELF all(EC... events) { - strategyStep = t -> allApplier.apply(t, asFilters(events)); + public final SELF all(Consumer... filters) { + Objects.requireNonNull(filters, "filters"); + strategyStep = t -> allApplier.apply(t, filters); return self(); } @SafeVarargs - public final SELF any(EC... events) { - strategyStep = t -> anyApplier.apply(t, asFilters(events)); + public final SELF any(Consumer... filters) { + Objects.requireNonNull(filters, "filters"); + strategyStep = t -> anyApplier.apply(t, filters); return self(); } - public final SELF one(EC event) { - Objects.requireNonNull(event, "event"); - strategyStep = t -> oneApplier.apply(t, fb -> withApplier.with(fb, event)); + public final SELF one(Consumer filter) { + Objects.requireNonNull(filter, "filter"); + strategyStep = t -> oneApplier.apply(t, filter); return self(); } - /** Concrete 'accept' should delegate here with its concrete LB. */ - protected final void acceptInto(LB listenTaskBuilder) { + protected final void acceptInto(LISTEN_TASK listenTaskBuilder) { Objects.requireNonNull(strategyStep, "listening strategy must be set (all/any/one)"); toInvoker.to( listenTaskBuilder, diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/DSL.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/DSL.java index 52a6f07e6..4e6d890a7 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/DSL.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/DSL.java @@ -60,8 +60,7 @@ private DSL() {} // ---- Convenient shortcuts ----// /** - * Create a new HTTP call specification to be used with {@link #call(CallHttpConfigurer)} or - * {@link #call(io.serverlessworkflow.fluent.func.dsl.FuncCallHttpSpec)}. + * Create a new HTTP call specification to be used with {@link #call(CallHttpConfigurer)} * *

Typical usage: * @@ -201,15 +200,17 @@ public static ListenSpec to() { } /** - * Start building an event emission specification. + * Start building an event specification for use with {@code emit} tasks. * - *

Use methods on {@link EventSpec} to define event type and payload, and pass it to {@link - * #emit(Consumer)}. + *

This method returns an {@link EventFilterSpec}, which is primarily used for event filtering + * (for {@code listen} tasks) but is also reused here as a filter-like description of the event + * that will be emitted. Configure the event type, source, and payload using {@link + * EventFilterSpec} methods, then pass the resulting spec to {@link #emit(Consumer)}. * - * @return a new {@link EventSpec} + * @return a new {@link EventFilterSpec} instance for event emission */ - public static EventSpec event() { - return new EventSpec(); + public static EventFilterSpec event() { + return new EventFilterSpec(); } /** @@ -644,15 +645,65 @@ public static TasksConfigurer set(String expr) { } /** - * Create a {@link TasksConfigurer} that adds an {@code emit} task. + * Adds an {@code emit} task to the workflow's task sequence using a custom configurer. * * - * @param configurer consumer configuring {@link EmitTaskBuilder} - * @return a {@link TasksConfigurer} that adds an EmitTask + *

This method is typically used in conjunction with {@link #produced()} to fluently define the + * properties of the event being emitted. * + * + *

Example usage: + * + *

{@code
+   * emit(produced().type("my.custom.event").source("my/source"))
+   * }
+ * + * @param configurer a consumer, such as an {@link EmitSpec}, that configures the {@link + * EmitTaskBuilder} + * @return a {@link TasksConfigurer} to continue building the task list */ public static TasksConfigurer emit(Consumer configurer) { return list -> list.emit(configurer); } + /** + * @see #emit(Consumer) + */ + public static TasksConfigurer emit(String name, Consumer configurer) { + return list -> list.emit(name, configurer); + } + + /** + * A convenient shortcut to add an {@code emit} task that only requires a CloudEvent type. * + * + *

Use this method when you only need to specify the {@code type} attribute of the emitted + * event and do not need to configure additional properties like data or source. + * + * @param cloudEventType the {@code type} attribute of the CloudEvent to be emitted + * @return a {@link TasksConfigurer} to continue building the task list + */ + public static TasksConfigurer emit(String cloudEventType) { + return list -> list.emit(new EmitSpec().type(cloudEventType)); + } + + /** + * Starts building an event emission specification. * + * + *

This creates a new {@link EmitSpec} which acts as a fluent builder for the properties (e.g., + * type, source, data) of the CloudEvent to be emitted. The resulting spec can then be passed + * directly to {@link #emit(Consumer)}. + * + * @return a new {@link EmitSpec} instance for fluent configuration + */ + public static EmitSpec produced() { + return new EmitSpec(); + } + + /** + * @see #produced() + */ + public static EmitSpec produced(String cloudEventType) { + return new EmitSpec().type(cloudEventType); + } + /** * Create a {@link TasksConfigurer} that adds a {@code listen} task. * diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EmitSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EmitSpec.java index f07dd77c6..a4af29c5a 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EmitSpec.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EmitSpec.java @@ -16,9 +16,11 @@ package io.serverlessworkflow.fluent.spec.dsl; import io.serverlessworkflow.fluent.spec.EmitTaskBuilder; +import io.serverlessworkflow.fluent.spec.EventPropertiesBuilder; import io.serverlessworkflow.fluent.spec.configurers.EmitConfigurer; -public final class EmitSpec extends ExprEventFilterSpec implements EmitConfigurer { +public final class EmitSpec extends ExprEventEmitPropertiesSpec + implements EmitConfigurer { @Override protected EmitSpec self() { @@ -27,6 +29,6 @@ protected EmitSpec self() { @Override public void accept(EmitTaskBuilder emitTaskBuilder) { - emitTaskBuilder.event(e -> getSteps().forEach(step -> step.accept(e))); + emitTaskBuilder.event(e -> getPropertySteps().forEach(step -> step.accept(e))); } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventEmitPropertiesSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventEmitPropertiesSpec.java new file mode 100644 index 000000000..022a985b7 --- /dev/null +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventEmitPropertiesSpec.java @@ -0,0 +1,49 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.spec.dsl; + +import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; +import java.time.Instant; +import java.util.Date; +import java.util.UUID; + +/** Event properties handlers for emit tasks (setting attributes). */ +public abstract class EventEmitPropertiesSpec< + SELF, EVENT_PROPS extends AbstractEventPropertiesBuilder> + extends EventPropertiesSpec { + + /** Sets the CloudEvent id to a random UUID */ + public SELF randomId() { + addPropertyStep(e -> e.id(UUID.randomUUID().toString())); + return self(); + } + + /** Sets the CloudEvent time to the current system time */ + public SELF now() { + addPropertyStep(e -> e.time(Date.from(Instant.now()))); + return self(); + } + + public SELF contentType(String ct) { + addPropertyStep(e -> e.dataContentType(ct)); + return self(); + } + + public SELF OCTET_STREAM() { + addPropertyStep(e -> e.dataContentType("application/octet-stream")); + return self(); + } +} diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventFilterSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventFilterSpec.java index 873883694..2d61f830f 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventFilterSpec.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventFilterSpec.java @@ -15,72 +15,14 @@ */ package io.serverlessworkflow.fluent.spec.dsl; -import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; -import java.net.URI; -import java.time.Instant; -import java.util.Date; -import java.util.List; -import java.util.UUID; -import java.util.function.Consumer; +import io.serverlessworkflow.fluent.spec.EventFilterBuilder; +import io.serverlessworkflow.fluent.spec.EventPropertiesBuilder; -public abstract class EventFilterSpec> { +public final class EventFilterSpec + extends AbstractEventFilterSpec { - private final List> steps; - - protected EventFilterSpec(List> steps) { - this.steps = steps; - } - - protected abstract SELF self(); - - protected void addStep(Consumer step) { - steps.add(step); - } - - protected List> getSteps() { - return steps; - } - - public SELF type(String eventType) { - steps.add(e -> e.type(eventType)); - return self(); - } - - /** Sets the CloudEvent id to a random UUID */ - public SELF randomId() { - steps.add(e -> e.id(UUID.randomUUID().toString())); - return self(); - } - - /** Sets the CloudEvent time to the current system time */ - public SELF now() { - steps.add(e -> e.time(Date.from(Instant.now()))); - return self(); - } - - public SELF contentType(String ct) { - steps.add(e -> e.dataContentType(ct)); - return self(); - } - - /** Sets the CloudEvent dataContentType to `application/json` */ - public SELF JSON() { - steps.add(e -> e.dataContentType("application/json")); - return self(); - } - - public SELF OCTET_STREAM() { - steps.add(e -> e.dataContentType("application/octet-stream")); - return self(); - } - - public SELF source(String source) { - steps.add(e -> e.source(source)); - return self(); - } - - public SELF source(URI source) { - steps.add(e -> e.source(source)); - return self(); + @Override + protected EventFilterSpec self() { + return this; } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventPropertiesSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventPropertiesSpec.java new file mode 100644 index 000000000..ef80dc04c --- /dev/null +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventPropertiesSpec.java @@ -0,0 +1,63 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.spec.dsl; + +import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +/** + * Base class for defining events properties used either on filter (consume) use cases or emit + * (produce). + */ +public abstract class EventPropertiesSpec< + SELF, EVENT_PROPS extends AbstractEventPropertiesBuilder> { + + private final List> propertySteps = new ArrayList<>(); + + protected abstract SELF self(); + + protected void addPropertyStep(Consumer step) { + propertySteps.add(step); + } + + protected List> getPropertySteps() { + return propertySteps; + } + + public SELF type(String eventType) { + propertySteps.add(e -> e.type(eventType)); + return self(); + } + + /** Sets the CloudEvent dataContentType to `application/json` */ + public SELF JSON() { + propertySteps.add(e -> e.dataContentType("application/json")); + return self(); + } + + public SELF source(String source) { + propertySteps.add(e -> e.source(source)); + return self(); + } + + public SELF source(URI source) { + propertySteps.add(e -> e.source(source)); + return self(); + } +} diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventSpec.java deleted file mode 100644 index 3e93c9d2a..000000000 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventSpec.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.fluent.spec.dsl; - -import io.serverlessworkflow.fluent.spec.EventPropertiesBuilder; -import io.serverlessworkflow.fluent.spec.configurers.EventConfigurer; - -public final class EventSpec extends ExprEventFilterSpec implements EventConfigurer { - - @Override - protected EventSpec self() { - return this; - } - - @Override - public void accept(EventPropertiesBuilder eventPropertiesBuilder) { - getSteps().forEach(step -> step.accept(eventPropertiesBuilder)); - } -} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncPredicateEventPropertiesBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ExprEventEmitPropertiesSpec.java similarity index 54% rename from experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncPredicateEventPropertiesBuilder.java rename to fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ExprEventEmitPropertiesSpec.java index a0692d331..2a45c7163 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncPredicateEventPropertiesBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ExprEventEmitPropertiesSpec.java @@ -13,23 +13,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.fluent.func; +package io.serverlessworkflow.fluent.spec.dsl; -import io.cloudevents.CloudEventData; -import io.serverlessworkflow.api.types.func.EventDataPredicate; import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; -import java.util.function.Predicate; +import java.util.Map; -public class FuncPredicateEventPropertiesBuilder - extends AbstractEventPropertiesBuilder { +public abstract class ExprEventEmitPropertiesSpec< + SELF, EVENT_PROPS extends AbstractEventPropertiesBuilder> + extends EventEmitPropertiesSpec { - @Override - protected FuncPredicateEventPropertiesBuilder self() { - return this; + /** Sets the event data and the contentType to `application/json` */ + public SELF jsonData(String expr) { + addPropertyStep(e -> e.data(expr)); + return JSON(); } - public FuncPredicateEventPropertiesBuilder data(Predicate predicate) { - this.eventProperties.setData(new EventDataPredicate().withPredicate(predicate)); - return this; + /** Sets the event data and the contentType to `application/json` */ + public SELF jsonData(Map data) { + addPropertyStep(e -> e.data(data)); + return JSON(); } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ExprEventFilterSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ExprEventFilterSpec.java index 4d4850385..2db621b6d 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ExprEventFilterSpec.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ExprEventFilterSpec.java @@ -15,26 +15,15 @@ */ package io.serverlessworkflow.fluent.spec.dsl; -import io.serverlessworkflow.fluent.spec.EventPropertiesBuilder; -import java.util.ArrayList; -import java.util.Map; +import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; -public abstract class ExprEventFilterSpec - extends EventFilterSpec { +public abstract class ExprEventFilterSpec< + SELF, EVENT_PROPS extends AbstractEventPropertiesBuilder> + extends EventPropertiesSpec { - ExprEventFilterSpec() { - super(new ArrayList<>()); - } - - /** Sets the event data and the contentType to `application/json` */ + /** Filters events with `application/json` content type */ public SELF jsonData(String expr) { - addStep(e -> e.data(expr)); - return JSON(); - } - - /** Sets the event data and the contentType to `application/json` */ - public SELF jsonData(Map data) { - addStep(e -> e.data(data)); + addPropertyStep(e -> e.data(expr)); return JSON(); } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ListenSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ListenSpec.java index 56d7a38fb..b26702b2f 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ListenSpec.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ListenSpec.java @@ -16,26 +16,22 @@ package io.serverlessworkflow.fluent.spec.dsl; import io.serverlessworkflow.fluent.spec.AbstractEventConsumptionStrategyBuilder; -import io.serverlessworkflow.fluent.spec.AbstractEventFilterBuilder; import io.serverlessworkflow.fluent.spec.AbstractListenTaskBuilder; import io.serverlessworkflow.fluent.spec.EventFilterBuilder; import io.serverlessworkflow.fluent.spec.ListenTaskBuilder; import io.serverlessworkflow.fluent.spec.ListenToBuilder; -import io.serverlessworkflow.fluent.spec.configurers.EventConfigurer; +import io.serverlessworkflow.fluent.spec.configurers.ListenConfigurer; import java.util.Objects; import java.util.function.Consumer; public final class ListenSpec - extends BaseListenSpec< - ListenSpec, ListenTaskBuilder, ListenToBuilder, EventFilterBuilder, EventConfigurer> - implements io.serverlessworkflow.fluent.spec.configurers.ListenConfigurer { + extends BaseListenSpec + implements ListenConfigurer { public ListenSpec() { super( // toInvoker AbstractListenTaskBuilder::to, - // withApplier - AbstractEventFilterBuilder::with, // allApplier (tb, filters) -> tb.all(castFilters(filters)), // anyApplier diff --git a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java index d62fd22af..d479ebcce 100644 --- a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java +++ b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java @@ -17,8 +17,9 @@ import static io.serverlessworkflow.fluent.spec.dsl.DSL.basic; import static io.serverlessworkflow.fluent.spec.dsl.DSL.cases; -import static io.serverlessworkflow.fluent.spec.dsl.DSL.event; +import static io.serverlessworkflow.fluent.spec.dsl.DSL.emit; import static io.serverlessworkflow.fluent.spec.dsl.DSL.http; +import static io.serverlessworkflow.fluent.spec.dsl.DSL.produced; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -28,8 +29,11 @@ import io.serverlessworkflow.api.types.CallHTTP; import io.serverlessworkflow.api.types.CatchErrors; import io.serverlessworkflow.api.types.Document; +import io.serverlessworkflow.api.types.EmitEventDefinition; +import io.serverlessworkflow.api.types.EmitTask; import io.serverlessworkflow.api.types.ErrorFilter; import io.serverlessworkflow.api.types.EventFilter; +import io.serverlessworkflow.api.types.EventProperties; import io.serverlessworkflow.api.types.FlowDirectiveEnum; import io.serverlessworkflow.api.types.HTTPArguments; import io.serverlessworkflow.api.types.HTTPHeaders; @@ -209,23 +213,17 @@ void testDoTaskEmitEvent() { Workflow wf = WorkflowBuilder.workflow("flowEmit") .tasks( - d -> - d.emit( - "emitEvent", - e -> - e.event( - event() - .type("com.petstore.order.placed.v1") - .source(URI.create("https://petstore.com")) - .jsonData( - Map.of( - "client", - Map.of( - "firstName", "Cruella", "lastName", "de Vil"), - "items", - List.of( - Map.of( - "breed", "dalmatian", "quantity", 101))))))) + emit( + "emitEvent", + produced() + .type("com.petstore.order.placed.v1") + .source(URI.create("https://petstore.com")) + .jsonData( + Map.of( + "client", + Map.of("firstName", "Cruella", "lastName", "de Vil"), + "items", + List.of(Map.of("breed", "dalmatian", "quantity", 101)))))) .build(); List items = wf.getDo(); @@ -234,12 +232,12 @@ void testDoTaskEmitEvent() { TaskItem item = items.get(0); assertEquals("emitEvent", item.getName(), "TaskItem name should match"); - io.serverlessworkflow.api.types.EmitTask et = item.getTask().getEmitTask(); + EmitTask et = item.getTask().getEmitTask(); assertNotNull(et, "EmitTask should be present"); - io.serverlessworkflow.api.types.EmitEventDefinition ed = et.getEmit().getEvent(); + EmitEventDefinition ed = et.getEmit().getEvent(); assertNotNull(ed, "EmitEventDefinition should be present"); - io.serverlessworkflow.api.types.EventProperties props = ed.getWith(); + EventProperties props = ed.getWith(); assertEquals( "https://petstore.com", props.getSource().getUriTemplate().getLiteralUri().toString(), diff --git a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java index 80b9d1e1b..41304b1ba 100644 --- a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java +++ b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java @@ -22,6 +22,7 @@ import static io.serverlessworkflow.fluent.spec.dsl.DSL.error; import static io.serverlessworkflow.fluent.spec.dsl.DSL.event; import static io.serverlessworkflow.fluent.spec.dsl.DSL.http; +import static io.serverlessworkflow.fluent.spec.dsl.DSL.produced; import static io.serverlessworkflow.fluent.spec.dsl.DSL.secrets; import static io.serverlessworkflow.fluent.spec.dsl.DSL.to; import static org.assertj.core.api.Assertions.assertThat; @@ -68,7 +69,7 @@ public void when_listen_all_then_emit() { to().all( event().type("org.acme.listen"), event().type("org.example.listen"))) - .emit(e -> e.event(event().type("org.example.emit")))) + .emit(produced("org.example.emit"))) .build(); // Sanity diff --git a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/TryCatchDslTest.java b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/TryCatchDslTest.java index bd2cb3c05..0aefb745a 100644 --- a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/TryCatchDslTest.java +++ b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/TryCatchDslTest.java @@ -17,7 +17,6 @@ import static io.serverlessworkflow.fluent.spec.dsl.DSL.call; import static io.serverlessworkflow.fluent.spec.dsl.DSL.emit; -import static io.serverlessworkflow.fluent.spec.dsl.DSL.event; import static io.serverlessworkflow.fluent.spec.dsl.DSL.http; import static io.serverlessworkflow.fluent.spec.dsl.DSL.set; import static io.serverlessworkflow.fluent.spec.dsl.DSL.tryCatch; @@ -47,7 +46,7 @@ void when_try_with_tasks_and_catch_when_with_retry_and_tasks() { .catches() .when("$.error == true") .errors(Errors.RUNTIME, 500) - .tasks(emit(e -> e.event(event().type("org.acme.failed")))) + .tasks(emit("org.acme.failed")) .retry() .when("$.retries < 3") .limit("PT5S") @@ -122,7 +121,7 @@ void when_try_with_multiple_tasks_and_catch_except_when_with_uri_error_filter() .catches() .exceptWhen("$.code == 502") .errors(errType, 502) - .tasks(emit(e -> e.event(event().type("org.acme.recover")))) + .tasks(emit("org.acme.recover")) .done() // back to TrySpec )) .build(); @@ -175,7 +174,7 @@ void when_try_with_catch_and_simple_retry_limit_only() { .retry() .limit("PT2S") .done() - .tasks(emit(e -> e.event(event().type("org.acme.retrying")))) + .tasks(emit("org.acme.retrying")) .done())) .build();