Skip to content

Commit f6f14be

Browse files
committed
[Fix #1211] Fixing cancel behaviour
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 19b48e9 commit f6f14be

4 files changed

Lines changed: 34 additions & 7 deletions

File tree

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent;
2727
import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent;
2828
import java.time.Instant;
29+
import java.util.ArrayList;
30+
import java.util.Collection;
2931
import java.util.Map;
3032
import java.util.Optional;
3133
import java.util.concurrent.CancellationException;
@@ -54,6 +56,8 @@ public class WorkflowMutableInstance implements WorkflowInstance {
5456
private Lock statusLock = new ReentrantLock();
5557
private Map<CompletableFuture<TaskContext>, TaskContext> suspended;
5658

59+
private Collection<CompletableFuture<?>> cancelables = new ArrayList<>();
60+
5761
protected WorkflowMutableInstance(WorkflowDefinition definition, String id, WorkflowModel input) {
5862
this.id = id;
5963
this.input = input;
@@ -280,7 +284,8 @@ public boolean cancel() {
280284
try {
281285
statusLock.lock();
282286
if (TaskExecutorHelper.isActive(status.get())) {
283-
futureRef.get().cancel(true);
287+
cancelables.forEach(t -> t.cancel(true));
288+
cancelables.clear();
284289
status(WorkflowStatus.CANCELLED);
285290
publishEvent(
286291
workflowContext,
@@ -294,6 +299,30 @@ public boolean cancel() {
294299
}
295300
}
296301

302+
public void addCancelable(CompletableFuture<?> cancelable) {
303+
try {
304+
statusLock.lock();
305+
if (status.get() == WorkflowStatus.CANCELLED) {
306+
statusLock.unlock();
307+
cancelable.cancel(true);
308+
} else {
309+
cancelables.add(cancelable);
310+
statusLock.unlock();
311+
cancelable.thenAccept(
312+
__ -> {
313+
try {
314+
statusLock.lock();
315+
cancelables.remove(cancelable);
316+
} finally {
317+
statusLock.unlock();
318+
}
319+
});
320+
}
321+
} finally {
322+
statusLock.unlock();
323+
}
324+
}
325+
297326
public <T> T additionalObject(String key, Supplier<T> supplier) {
298327
return (T) additionalObjects.computeIfAbsent(key, k -> supplier.get());
299328
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ protected CompletableFuture<WorkflowModel> internalExecute(
173173
processCe(converter.apply(ce), output, workflow, taskContext, future)),
174174
workflow,
175175
taskContext);
176+
workflow.instance().addCancelable(info.completableFuture());
176177
return info.completableFuture()
177178
.whenComplete((__, e) -> info.registrations().forEach(eventConsumer::unregister))
178179
.thenApply(__ -> output);

impl/test/src/test/java/io/serverlessworkflow/impl/test/EventDefinitionTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,7 @@ void testEventListened(
6969

7070
@ParameterizedTest
7171
@MethodSource("wrongEventListenerParameters")
72-
void testWrongEvent(
73-
String listen, String emit, JsonNode expectedResult, Object emitInput, Object listenInput)
72+
void testWrongEvent(String listen, String emit, Object emitInput, Object listenInput)
7473
throws IOException {
7574
WorkflowDefinition listenDefinition =
7675
appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath(listen));
@@ -83,7 +82,7 @@ void testWrongEvent(
8382
assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.WAITING);
8483
waitingInstance.cancel();
8584
assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.CANCELLED);
86-
assertThat(future).isCancelled();
85+
assertThat(future).isDone();
8786
}
8887

8988
@ParameterizedTest
@@ -140,7 +139,6 @@ private static Stream<Arguments> wrongEventListenerParameters() {
140139
Arguments.of(
141140
"workflows-samples/listen-to-any-filter.yaml",
142141
"workflows-samples/emit-doctor.yaml",
143-
doctor(),
144142
Map.of("temperature", 38),
145143
Map.of("threshold", 39)));
146144
}

impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import java.util.Map;
4646
import java.util.Objects;
4747
import java.util.Optional;
48-
import java.util.concurrent.CancellationException;
4948
import java.util.concurrent.CompletableFuture;
5049
import java.util.concurrent.CompletionException;
5150
import java.util.concurrent.CopyOnWriteArrayList;
@@ -169,7 +168,7 @@ void testCancel() throws IOException, InterruptedException {
169168
.instance(Map.of());
170169
CompletableFuture<WorkflowModel> future = instance.start();
171170
instance.cancel();
172-
assertThat(catchThrowableOfType(CancellationException.class, () -> future.get().asMap()))
171+
assertThat(catchThrowableOfType(ExecutionException.class, () -> future.get().asMap()))
173172
.isNotNull();
174173
assertThat(instance.status()).isEqualTo(WorkflowStatus.CANCELLED);
175174
assertPojoInCE("io.serverlessworkflow.task.cancelled.v1", TaskCancelledCEData.class);

0 commit comments

Comments
 (0)