Skip to content

Commit 4b70fe4

Browse files
fix: ensure onCancelled callback is properly invoked (#376)
The onCancelled callback was not being called when a subscription was stopped by the user or when the onEvent handler threw an exception. This fixes both cases by propagating cancellation through the consumer and catching handler exceptions in ReadResponseObserver. Also reorders onSubscriptionConfirmation to complete the future before calling the listener, preventing race conditions. Co-authored-by: William Chong <william-chong@outlook.com>
1 parent 70c1fd6 commit 4b70fe4

File tree

5 files changed

+175
-3
lines changed

5 files changed

+175
-3
lines changed

src/main/java/io/kurrent/dbclient/ReadResponseObserver.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ void cancel(String reason, Throwable cause) {
5252
this.requestStream.cancel(reason, cause);
5353
if (cause instanceof StreamNotFoundException)
5454
this.consumer.onStreamNotFound(((StreamNotFoundException) cause).getStreamName());
55+
else
56+
this.consumer.onCancelled(cause);
5557
}
5658

5759
void manageFlowControl() {
@@ -105,6 +107,18 @@ public void onNext(StreamsOuterClass.ReadResp value) {
105107
return;
106108
}
107109

110+
try {
111+
handleMessage(value);
112+
} catch (Exception ex) {
113+
logger.error("Exception thrown by subscription handler", ex);
114+
cancel("subscription handler threw an exception", ex);
115+
return;
116+
}
117+
118+
manageFlowControl();
119+
}
120+
121+
private void handleMessage(StreamsOuterClass.ReadResp value) {
108122
if (value.hasEvent())
109123
consumer.onEvent(ResolvedEvent.fromWire(value.getEvent()));
110124
else if (value.hasConfirmation())
@@ -136,8 +150,6 @@ else if (value.hasLastAllStreamPosition()) {
136150
} else {
137151
logger.warn("received unknown message variant");
138152
}
139-
140-
manageFlowControl();
141153
}
142154

143155
@Override
@@ -149,6 +161,7 @@ public void onError(Throwable t) {
149161
StatusRuntimeException e = (StatusRuntimeException) t;
150162

151163
if (e.getStatus().getCode() == Status.Code.CANCELLED) {
164+
this.consumer.onCancelled(null);
152165
return;
153166
}
154167

src/main/java/io/kurrent/dbclient/SubscriptionStreamConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ public void onEvent(ResolvedEvent event) {
3232
@Override
3333
public void onSubscriptionConfirmation(String subscriptionId) {
3434
this.subscription = new Subscription(this.internal, subscriptionId, this.checkpointer);
35-
this.listener.onConfirmation(this.subscription);
3635
this.future.complete(this.subscription);
36+
this.listener.onConfirmation(this.subscription);
3737
}
3838

3939
@Override
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package io.kurrent.dbclient;
22

3+
import org.junit.platform.suite.api.SelectClasses;
34
import org.junit.platform.suite.api.SelectPackages;
45
import org.junit.platform.suite.api.Suite;
56

67
@Suite
78
@SelectPackages("io.kurrent.dbclient.misc")
9+
@SelectClasses(SubscriptionStreamConsumerTests.class)
810
public class MiscTests {}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package io.kurrent.dbclient;
2+
3+
import org.junit.jupiter.api.Assertions;
4+
import org.junit.jupiter.api.Test;
5+
6+
import java.util.concurrent.CompletableFuture;
7+
import java.util.concurrent.ExecutionException;
8+
import java.util.concurrent.atomic.AtomicBoolean;
9+
import java.util.concurrent.atomic.AtomicReference;
10+
11+
public class SubscriptionStreamConsumerTests {
12+
private static final SubscriptionTracingCallback NO_OP_TRACING =
13+
(subscriptionId, event, action) -> action.run();
14+
15+
@Test
16+
public void testOnCancelledCompletesFutureExceptionallyBeforeConfirmation() {
17+
CompletableFuture<Subscription> future = new CompletableFuture<>();
18+
AtomicBoolean listenerCalled = new AtomicBoolean(false);
19+
AtomicReference<Throwable> listenerException = new AtomicReference<>();
20+
21+
SubscriptionStreamConsumer consumer = new SubscriptionStreamConsumer(
22+
new SubscriptionListener() {
23+
@Override
24+
public void onCancelled(Subscription subscription, Throwable exception) {
25+
listenerCalled.set(true);
26+
listenerException.set(exception);
27+
}
28+
},
29+
null,
30+
future,
31+
NO_OP_TRACING
32+
);
33+
34+
RuntimeException error = new RuntimeException("server went away");
35+
consumer.onCancelled(error);
36+
37+
Assertions.assertTrue(future.isCompletedExceptionally());
38+
ExecutionException ex = Assertions.assertThrows(ExecutionException.class, future::get);
39+
Assertions.assertSame(error, ex.getCause());
40+
Assertions.assertTrue(listenerCalled.get());
41+
Assertions.assertSame(error, listenerException.get());
42+
}
43+
44+
@Test
45+
public void testOnCancelledAfterConfirmationDoesNotAffectFuture() {
46+
CompletableFuture<Subscription> future = new CompletableFuture<>();
47+
AtomicBoolean listenerCalled = new AtomicBoolean(false);
48+
AtomicReference<Subscription> listenerSubscription = new AtomicReference<>();
49+
50+
SubscriptionStreamConsumer consumer = new SubscriptionStreamConsumer(
51+
new SubscriptionListener() {
52+
@Override
53+
public void onCancelled(Subscription subscription, Throwable exception) {
54+
listenerCalled.set(true);
55+
listenerSubscription.set(subscription);
56+
}
57+
},
58+
null,
59+
future,
60+
NO_OP_TRACING
61+
);
62+
63+
consumer.onSubscribe(new org.reactivestreams.Subscription() {
64+
@Override public void request(long n) {}
65+
@Override public void cancel() {}
66+
});
67+
68+
consumer.onSubscriptionConfirmation("test-sub-id");
69+
Assertions.assertTrue(future.isDone());
70+
Assertions.assertFalse(future.isCompletedExceptionally());
71+
72+
consumer.onCancelled(null);
73+
74+
Assertions.assertFalse(future.isCompletedExceptionally(), "future should remain successfully completed");
75+
Assertions.assertTrue(listenerCalled.get());
76+
Assertions.assertNotNull(listenerSubscription.get());
77+
Assertions.assertEquals("test-sub-id", listenerSubscription.get().getSubscriptionId());
78+
}
79+
}

src/test/java/io/kurrent/dbclient/streams/SubscriptionTests.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
import java.util.Optional;
1313
import java.util.concurrent.CountDownLatch;
1414
import java.util.concurrent.TimeUnit;
15+
import java.util.concurrent.atomic.AtomicBoolean;
1516
import java.util.concurrent.atomic.AtomicInteger;
17+
import java.util.concurrent.atomic.AtomicReference;
1618

1719
public interface SubscriptionTests extends ConnectionAware {
1820
@Test
@@ -185,4 +187,80 @@ public void onCaughtUp(Subscription subscription, Instant timestamp, Long stream
185187
caughtUp.await();
186188
subscription.stop();
187189
}
190+
191+
@Test
192+
@Timeout(value = 2, unit = TimeUnit.MINUTES)
193+
default void testOnCancelledIsInvokedWhenSubscriptionIsStopped() throws Throwable {
194+
KurrentDBClient client = getDefaultClient();
195+
String streamName = generateName();
196+
197+
client.appendToStream(
198+
streamName,
199+
EventData.builderAsJson(generateName(), "{}".getBytes()).build()
200+
).get();
201+
202+
final CountDownLatch onEventReceived = new CountDownLatch(1);
203+
final CountDownLatch onCancelledReceived = new CountDownLatch(1);
204+
final AtomicReference<Throwable> cancelException = new AtomicReference<>();
205+
final AtomicBoolean onCancelledInvoked = new AtomicBoolean(false);
206+
207+
Subscription subscription = client.subscribeToStream(streamName, new SubscriptionListener() {
208+
@Override
209+
public void onEvent(Subscription subscription, ResolvedEvent event) {
210+
onEventReceived.countDown();
211+
}
212+
213+
@Override
214+
public void onCancelled(Subscription subscription, Throwable exception) {
215+
onCancelledInvoked.set(true);
216+
cancelException.set(exception);
217+
onCancelledReceived.countDown();
218+
}
219+
}).get();
220+
221+
onEventReceived.await();
222+
subscription.stop();
223+
onCancelledReceived.await();
224+
225+
Assertions.assertTrue(onCancelledInvoked.get());
226+
Assertions.assertNull(cancelException.get());
227+
}
228+
229+
@Test
230+
@Timeout(value = 2, unit = TimeUnit.MINUTES)
231+
default void testOnCancelledIsInvokedWhenOnEventThrows() throws Throwable {
232+
KurrentDBClient client = getDefaultClient();
233+
String streamName = generateName();
234+
235+
final CountDownLatch onCancelledReceived = new CountDownLatch(1);
236+
final AtomicReference<Throwable> cancelException = new AtomicReference<>();
237+
final AtomicBoolean onCancelledInvoked = new AtomicBoolean(false);
238+
239+
client.subscribeToStream(streamName, new SubscriptionListener() {
240+
@Override
241+
public void onEvent(Subscription subscription, ResolvedEvent event) {
242+
throw new RuntimeException("failure");
243+
}
244+
245+
@Override
246+
public void onCancelled(Subscription subscription, Throwable exception) {
247+
onCancelledInvoked.set(true);
248+
cancelException.set(exception);
249+
onCancelledReceived.countDown();
250+
}
251+
}).get();
252+
253+
client.appendToStream(
254+
streamName,
255+
EventData.builderAsJson(generateName(), "{}".getBytes()).build()
256+
).get();
257+
258+
onCancelledReceived.await();
259+
260+
Assertions.assertTrue(onCancelledInvoked.get());
261+
Assertions.assertNotNull(cancelException.get());
262+
Assertions.assertInstanceOf(RuntimeException.class, cancelException.get());
263+
Assertions.assertEquals("failure", cancelException.get().getMessage());
264+
}
265+
188266
}

0 commit comments

Comments
 (0)