2424import io .serverlessworkflow .impl .WorkflowApplication ;
2525import io .serverlessworkflow .impl .WorkflowContext ;
2626import io .serverlessworkflow .impl .WorkflowModel ;
27- import io .serverlessworkflow .impl .WorkflowModelFactory ;
2827import java .util .AbstractCollection ;
2928import java .util .ArrayList ;
3029import java .util .Collection ;
@@ -61,8 +60,7 @@ public TypeEventRegistrationBuilder listen(
6160 application .cloudEventPredicateFactory ().build (application , properties );
6261 Collection <CloudEventPredicate > correlationPredicates =
6362 buildCorrelationPredicates (register .getCorrelate (), application );
64- return new TypeEventRegistrationBuilder (
65- type , cePredicate , correlationPredicates , application .modelFactory ());
63+ return new TypeEventRegistrationBuilder (type , cePredicate , correlationPredicates );
6664 }
6765
6866 private Collection <CloudEventPredicate > buildCorrelationPredicates (
@@ -83,27 +81,21 @@ private Collection<CloudEventPredicate> buildCorrelationPredicates(
8381
8482 @ Override
8583 public Collection <TypeEventRegistrationBuilder > listenToAll (WorkflowApplication application ) {
86- return List .of (
87- new TypeEventRegistrationBuilder (null , null , List .of (), application .modelFactory ()));
84+ return List .of (new TypeEventRegistrationBuilder (null , null , List .of ()));
8885 }
8986
9087 private static class CloudEventConsumer extends AbstractCollection <TypeEventRegistration >
9188 implements Consumer <CloudEvent > {
92- private final WorkflowModelFactory modelFactory ;
9389 private Collection <TypeEventRegistration > registrations = new CopyOnWriteArrayList <>();
9490
95- CloudEventConsumer (WorkflowModelFactory modelFactory ) {
96- this .modelFactory = modelFactory ;
97- }
98-
9991 @ Override
10092 public void accept (CloudEvent ce ) {
10193 logger .debug ("Received cloud event {}" , ce );
10294 for (TypeEventRegistration registration : registrations ) {
103- if ( registration . predicate (). test ( ce , registration . workflow (), registration . task ())) {
104- if (! testCorrelation ( ce , registration )) {
105- continue ;
106- }
95+ boolean predicateMatch =
96+ registration . predicate () == null
97+ || registration . predicate (). test ( ce , registration . workflow (), registration . task ()) ;
98+ if ( predicateMatch && testCorrelation ( ce , registration )) {
10799 registration .consumer ().accept (ce );
108100 }
109101 }
@@ -118,7 +110,7 @@ private boolean testCorrelation(CloudEvent ce, TypeEventRegistration registratio
118110 for (CloudEventPredicate pred : predicates ) {
119111 if (pred instanceof ModelAwareCloudEventPredicate ma ) {
120112 if (eventModel == null ) {
121- eventModel = modelFactory .from (ce );
113+ eventModel = registration . workflow (). definition (). application (). modelFactory () .from (ce );
122114 }
123115 if (!ma .test (eventModel , registration .workflow (), registration .task ())) {
124116 return false ;
@@ -175,7 +167,7 @@ public TypeEventRegistration register(
175167 .computeIfAbsent (
176168 registration .type (),
177169 k -> {
178- CloudEventConsumer consumer = new CloudEventConsumer (builder . modelFactory () );
170+ CloudEventConsumer consumer = new CloudEventConsumer ();
179171 register (k , consumer );
180172 return consumer ;
181173 })
0 commit comments