Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## XX.XX.XX
* Fixed a bug where the request queue would stall after sending the first request, preventing subsequent persisted requests from being sent.

## 24.1.5
* Fixed a bug where a non-JSON server response would cause a permanent networking deadlock, preventing all subsequent requests from being sent.
* Fixed a bug where a NullPointerException in SDKCore.recover() would permanently block SDK initialization when a crash file from a previous session existed on disk.
Expand Down
11 changes: 6 additions & 5 deletions sdk-java/src/main/java/ly/count/sdk/java/internal/Tasks.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,9 @@ <T> Future<T> run(final Task<T> task, final Callback<T> callback) {
@Override
public T call() throws Exception {
running = task.id;
T result;
try {
T result = task.call();
if (callback != null) {
callback.call(result);
}
return result;
result = task.call();
} finally {
synchronized (pending) {
if (!task.id.equals(0L)) {
Expand All @@ -106,6 +103,10 @@ public T call() throws Exception {
running = null;
}
}
if (callback != null) {
callback.call(result);
}
return result;
}
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package ly.count.sdk.java.internal;

import com.sun.net.httpserver.HttpServer;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import ly.count.sdk.java.Config;
import ly.count.sdk.java.Countly;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/**
* End-to-end reproducer for GitHub issue #271:
* "Request queue stalls after the first request in SDK 24.1.5".
*
* The race the user hit in production: while request #1 is in-flight, the SDK
* queues additional requests to disk. When #1 completes, its callback inside
* Tasks.java re-enters DefaultNetworking.check() — which short-circuits if
* tasks.isRunning() returns true. In 24.1.5 the callback fired before
* `running` was cleared, so check() saw isRunning()=true and skipped
* scheduling the next request. The queue silently stopped draining.
*
* To reproduce deterministically we hold request #1 open with a CountDownLatch
* while we generate a backlog, then release it and assert the backlog drains
* without any external trigger.
*/
@RunWith(JUnit4.class)
public class ScenarioRequestQueueStallTests {

private HttpServer server;
private int port;

@Before
public void setUp() {
TestUtils.createCleanTestState();
}

@After
public void tearDown() {
Countly.instance().halt();
if (server != null) {
server.stop(0);
}
}

private Config configForLocalServer() {
return new Config("http://localhost:" + port, TestUtils.SERVER_APP_KEY, TestUtils.getTestSDirectory())
.setLoggingLevel(Config.LoggingLevel.VERBOSE)
.setDeviceIdStrategy(Config.DeviceIdStrategy.UUID)
.enableFeatures(Config.Feature.Events, Config.Feature.Sessions)
.setEventQueueSizeToSend(1);
}

/**
* Reproduces the user-reported symptom: with multiple requests piled up
* on disk while the network is busy, the queue must drain without
* external prompting once the network is free again.
*
* Buggy 24.1.5 Tasks.java: only request #1 reaches the server.
* Fixed: all queued requests reach the server.
*/
@Test
public void backloggedRequests_drainAfterInFlightCompletes() throws Exception {
AtomicInteger requestCount = new AtomicInteger(0);
CountDownLatch firstRequestArrived = new CountDownLatch(1);
CountDownLatch releaseFirstRequest = new CountDownLatch(1);

server = HttpServer.create(new InetSocketAddress(0), 0);
port = server.getAddress().getPort();
server.createContext("/", exchange -> {
int n = requestCount.incrementAndGet();
if (n == 1) {
// Hold request #1 open until the test has built up a backlog.
firstRequestArrived.countDown();
try {
releaseFirstRequest.await(10, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {
}
}
String body = "{\"result\":\"Success\"}";
exchange.sendResponseHeaders(200, body.length());
OutputStream os = exchange.getResponseBody();
os.write(body.getBytes());
os.close();
});
server.start();

Countly.instance().init(configForLocalServer());
Countly.session().begin();

// Wait until request #1 has reached the server and is being held.
Assert.assertTrue(
"request #1 should reach the server within 5s",
firstRequestArrived.await(5, TimeUnit.SECONDS)
);

// Build up a backlog: each recordEvent flushes a new request to disk.
// While the server holds #1, all of these queue up because
// DefaultNetworking.check() short-circuits on isRunning() == true.
final int backlogSize = 5;
for (int i = 0; i < backlogSize; i++) {
Countly.instance().events().recordEvent("backlog_evt_" + i);
}

// Give the event flushes time to actually write request files to disk.
Thread.sleep(500);

// Release #1. From this point on no external code calls check() —
// the queue must self-drain via the callback re-entry path that
// issue #271 broke.
releaseFirstRequest.countDown();

// Poll for drain. Total expected = 1 (begin_session) + backlogSize.
// Use >= because device-id resolution or merge requests may add extras;
// the regression is "queue stops at 1", so any number > 1 + a generous
// wait is the meaningful signal.
int expectedMinimum = 1 + backlogSize;
long deadline = System.currentTimeMillis() + 10_000;
while (System.currentTimeMillis() < deadline && requestCount.get() < expectedMinimum) {
Thread.sleep(100);
}

Assert.assertTrue(
"request queue should drain to >= " + expectedMinimum + " requests "
+ "without external check() calls — got " + requestCount.get()
+ " (queue stalled if << expected)",
requestCount.get() >= expectedMinimum
);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ly.count.sdk.java.internal;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -131,10 +133,19 @@ public Boolean call() {
Assert.assertFalse("Executor should not be running", tasks.isRunning());
}

private static Object getField(Object target, String fieldName) throws Exception {
Field field = target.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return field.get(target);
}

/**
* When a callback throws an exception, the executor should still recover
* and not deadlock. The callback runs inside the try block, so its exception
* is caught by the finally block.
* and not deadlock. After the issue #271 fix the callback runs outside the
* try/finally, so `running = null` has already executed before the throw —
* the exception propagates into the Future without leaving the executor
* stuck. This test uses id=0L; for the `pending.remove` branch on a
* non-zero-id task see callbackExceptionDoesNotLeakPendingForIdTask.
*/
@Test
public void callbackException_executorRecovers() throws Exception {
Expand Down Expand Up @@ -287,4 +298,101 @@ public Boolean call() throws Exception {
tasks.isRunning()
);
}

/**
* After a task with a non-zero id throws, the `pending` map must be empty.
* Locks the cleanup contract directly rather than inferring it from dedup
* behavior — guards against future refactors that move pending.remove()
* out of the finally block.
*/
@Test
public void pendingIsEmptyAfterIdTaskThrows() throws Exception {
Long taskId = 42L;

tasks.run(new Tasks.Task<Boolean>(taskId) {
@Override
public Boolean call() {
throw new RuntimeException("Simulated failure");
}
});

tasks.await();

Map<?, ?> pending = (Map<?, ?>) getField(tasks, "pending");
synchronized (pending) {
Assert.assertTrue("pending map should be empty after task throws", pending.isEmpty());
}
}

/**
* Issue #271 + #264 interaction: when a callback throws on a task with a
* non-zero id, both `running` and `pending` must be cleared. The existing
* callbackException_executorRecovers test uses id=0L, so it never exercises
* the `pending.remove(task.id)` branch — this test does.
*/
@Test
public void callbackExceptionDoesNotLeakPendingForIdTask() throws Exception {
Long taskId = 99L;

tasks.run(new Tasks.Task<Boolean>(taskId) {
@Override
public Boolean call() {
return true;
}
}, result -> {
throw new RuntimeException("Simulated callback failure");
});

tasks.await();

Assert.assertFalse("running should be cleared after callback throws", tasks.isRunning());
Map<?, ?> pending = (Map<?, ?>) getField(tasks, "pending");
synchronized (pending) {
Assert.assertTrue("pending should be empty after callback throws on id-task", pending.isEmpty());
}
}

/**
* Issue #271 + #264 recovery path: after a callback throws (e.g. transient
* parse error in DefaultNetworking.check), a subsequent task's callback
* must still be able to re-enter the scheduler and queue the next request.
* This is the production scenario — taking down the request queue requires
* BOTH "callback A throws" AND "callback B can no longer reschedule".
*/
@Test
public void callbackThrows_subsequentCallbackCanReschedule() throws Exception {
// Task A: succeeds, callback throws.
tasks.run(new Tasks.Task<Boolean>(0L) {
@Override
public Boolean call() {
return true;
}
}, result -> {
throw new RuntimeException("Simulated callback A failure");
});
tasks.await();

// Task B: succeeds, callback re-enters scheduler to run task C
// (mirrors DefaultNetworking.check() recovering after a failed callback).
CountDownLatch taskCRan = new CountDownLatch(1);
tasks.run(new Tasks.Task<Boolean>(0L) {
@Override
public Boolean call() {
return true;
}
}, paramB -> {
if (!tasks.isRunning()) {
tasks.run(new Tasks.Task<Boolean>(0L) {
@Override
public Boolean call() {
taskCRan.countDown();
return true;
}
});
}
});

Assert.assertTrue("task C should run — recovery path after a failed callback must still allow re-entry",
taskCRan.await(2, TimeUnit.SECONDS));
}
}
61 changes: 61 additions & 0 deletions sdk-java/src/test/java/ly/count/sdk/java/internal/TasksTests.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package ly.count.sdk.java.internal;

import java.lang.reflect.Field;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -76,6 +78,65 @@ public Object call() {
Assert.assertEquals(Boolean.TRUE, called[1]);
}

/**
* Regression for issue #271, end-to-end: mirrors the
* DefaultNetworking.check() pattern where task A's callback re-enters the
* scheduler with `if (!isRunning()) tasks.run(taskB)`. The actual user-facing
* failure was not "isRunning() reports the wrong value" but "the next request
* never runs". This test proves task B is both scheduled AND executed.
*/
@Test
public void testCallbackCanScheduleAndRunNextTask() throws Exception {
final CountDownLatch taskBRan = new CountDownLatch(1);
final boolean[] taskBScheduled = { false };

tasks.run(new Tasks.Task<Integer>(0L) {
@Override
public Integer call() {
return 1;
}
}, paramA -> {
if (!tasks.isRunning()) {
taskBScheduled[0] = true;
tasks.run(new Tasks.Task<Integer>(0L) {
@Override
public Integer call() {
taskBRan.countDown();
return 2;
}
});
}
});

Assert.assertTrue("task B should have been scheduled and run by callback A",
taskBRan.await(2, TimeUnit.SECONDS));
Assert.assertTrue("callback A should have observed isRunning() == false and entered the schedule branch",
taskBScheduled[0]);
}

/**
* Regression for issue #271: callback must observe isRunning() == false
* so that a callback re-entering the scheduler (e.g. DefaultNetworking.check)
* can schedule the next task. In 24.1.5 the callback was invoked before
* `running` was cleared, deadlocking the request queue.
*/
@Test
public void testCallbackSeesIsRunningFalse() throws Exception {
final Boolean[] runningInsideCallback = new Boolean[] { null };

tasks.run(new Tasks.Task<Integer>(0L) {
@Override
public Integer call() {
return 1;
}
}, param -> runningInsideCallback[0] = tasks.isRunning());

tasks.await();

Assert.assertNotNull("callback was not invoked", runningInsideCallback[0]);
Assert.assertEquals(Boolean.FALSE, runningInsideCallback[0]);
}

@Test
public void testTaskIdsWork() throws Exception {
final int result = 123;
Expand Down
Loading