Skip to content

Commit 7fefded

Browse files
committed
Fix stackoverflow with large pages in paginator
Previously, signalig onNext() to the subscriber was done via recursion, pulling elements from an iterator over the current page returned by the service. However, this can quickly lead to a stackoverflow error since the stack will grow linearly with the size of the page. - Replace sendNextElement recursion with a loop - Ensure that handleRequests does not recurse into itself
1 parent 82dd743 commit 7fefded

2 files changed

Lines changed: 192 additions & 46 deletions

File tree

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/pagination/async/ItemsSubscription.java

Lines changed: 68 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package software.amazon.awssdk.core.internal.pagination.async;
1717

1818
import java.util.Iterator;
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.concurrent.atomic.AtomicBoolean;
1921
import java.util.function.Function;
2022
import org.reactivestreams.Subscription;
2123
import software.amazon.awssdk.annotations.SdkInternalApi;
@@ -32,6 +34,8 @@
3234
public final class ItemsSubscription<ResponseT, ItemT> extends PaginationSubscription<ResponseT> {
3335
private final Function<ResponseT, Iterator<ItemT>> getIteratorFunction;
3436
private volatile Iterator<ItemT> singlePageItemsIterator;
37+
private final AtomicBoolean handlingRequests = new AtomicBoolean();
38+
private volatile boolean awaitingNewPage = false;
3539

3640
private ItemsSubscription(BuilderImpl builder) {
3741
super(builder);
@@ -47,61 +51,79 @@ public static Builder builder() {
4751

4852
@Override
4953
protected void handleRequests() {
50-
if (!hasMoreItems() && !hasNextPage()) {
51-
completeSubscription();
54+
// Prevent recursion if we already invoked handleRequests
55+
if (!handlingRequests.compareAndSet(false, true)) {
5256
return;
5357
}
5458

55-
synchronized (this) {
56-
if (outstandingRequests.get() <= 0) {
57-
stopTask();
58-
return;
59-
}
60-
}
61-
62-
if (!isTerminated()) {
63-
/**
64-
* Current page is null only the first time the method is called.
65-
* Once initialized, current page will never be null
66-
*/
67-
if (currentPage == null || (!hasMoreItems() && hasNextPage())) {
68-
fetchNextPage();
69-
70-
} else if (hasMoreItems()) {
71-
sendNextElement();
72-
73-
// All valid cases are covered above. Throw an exception if any combination is missed
74-
} else {
75-
throw new IllegalStateException("Execution should have not reached here");
59+
try {
60+
while (true) {
61+
if (!hasMoreItems() && !hasNextPage()) {
62+
completeSubscription();
63+
return;
64+
}
65+
66+
synchronized (this) {
67+
if (outstandingRequests.get() <= 0) {
68+
stopTask();
69+
return;
70+
}
71+
}
72+
73+
if (isTerminated()) {
74+
return;
75+
}
76+
77+
if (shouldFetchNextPage()) {
78+
awaitingNewPage = true;
79+
fetchNextPage().whenComplete((r, e) -> {
80+
if (e == null) {
81+
awaitingNewPage = false;
82+
handleRequests();
83+
} else {
84+
subscriber.onError(e);
85+
}
86+
});
87+
} else if (hasMoreItems()) {
88+
synchronized (this) {
89+
if (outstandingRequests.get() <= 0) {
90+
continue;
91+
}
92+
93+
subscriber.onNext(singlePageItemsIterator.next());
94+
outstandingRequests.getAndDecrement();
95+
}
96+
} else {
97+
// Outstanding demand AND no items in current page AND waiting for next page. Just return for now, and
98+
// we'll handle demand when the new page arrives.
99+
return;
100+
}
76101
}
102+
} finally {
103+
handlingRequests.set(false);
77104
}
78105
}
79106

80-
private void fetchNextPage() {
81-
nextPageFetcher.nextPage(currentPage)
82-
.whenComplete(((response, error) -> {
83-
if (response != null) {
84-
currentPage = response;
85-
singlePageItemsIterator = getIteratorFunction.apply(response);
86-
sendNextElement();
87-
}
88-
if (error != null) {
89-
subscriber.onError(error);
90-
cleanup();
91-
}
92-
}));
107+
private CompletableFuture<ResponseT> fetchNextPage() {
108+
return nextPageFetcher.nextPage(currentPage)
109+
.whenComplete((response, error) -> {
110+
if (response != null) {
111+
currentPage = response;
112+
singlePageItemsIterator = getIteratorFunction.apply(response);
113+
} else if (error != null) {
114+
subscriber.onError(error);
115+
cleanup();
116+
}
117+
});
93118
}
94119

95-
/**
96-
* Calls onNext and calls the recursive method.
97-
*/
98-
private void sendNextElement() {
99-
if (singlePageItemsIterator.hasNext()) {
100-
subscriber.onNext(singlePageItemsIterator.next());
101-
outstandingRequests.getAndDecrement();
102-
}
103-
104-
handleRequests();
120+
// Conditions when to fetch the next page:
121+
// - Either we still need to fetch the first page OR
122+
// - We've exhausted the current page AND there is a next page AND we haven't requested the new page yet
123+
private boolean shouldFetchNextPage() {
124+
// Current page is null only the first time the method is called.
125+
// Once initialized, current page will never be null.
126+
return currentPage == null || (!hasMoreItems() && hasNextPage() && !awaitingNewPage);
105127
}
106128

107129
private boolean hasMoreItems() {
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package software.amazon.awssdk.core.pagination.async;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.mockito.Mockito.mock;
5+
import static org.mockito.Mockito.when;
6+
7+
import java.util.Iterator;
8+
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.TimeUnit;
10+
import java.util.concurrent.atomic.AtomicLong;
11+
import java.util.function.Function;
12+
import org.junit.jupiter.api.Test;
13+
import org.junit.jupiter.api.Timeout;
14+
import software.amazon.awssdk.core.SdkResponse;
15+
16+
public class PaginatedItemsPublisherTest {
17+
@Test
18+
public void subscribe_largePage_doesNotFail() throws Exception {
19+
int nItems = 100_000;
20+
21+
Function<SdkResponse, Iterator<String>> iteratorFn = resp ->
22+
new Iterator<String>() {
23+
private int count = 0;
24+
25+
@Override
26+
public boolean hasNext() {
27+
return count < nItems;
28+
}
29+
30+
@Override
31+
public String next() {
32+
++count;
33+
return "item";
34+
}
35+
};
36+
37+
AsyncPageFetcher<SdkResponse> pageFetcher = new AsyncPageFetcher<SdkResponse>() {
38+
@Override
39+
public boolean hasNextPage(SdkResponse oldPage) {
40+
return false;
41+
}
42+
43+
@Override
44+
public CompletableFuture<SdkResponse> nextPage(SdkResponse oldPage) {
45+
return CompletableFuture.completedFuture(mock(SdkResponse.class));
46+
}
47+
};
48+
49+
PaginatedItemsPublisher<SdkResponse, String> publisher = PaginatedItemsPublisher.builder()
50+
.isLastPage(false)
51+
.nextPageFetcher(pageFetcher)
52+
.iteratorFunction(iteratorFn)
53+
.build();
54+
55+
AtomicLong counter = new AtomicLong();
56+
publisher.subscribe(i -> counter.incrementAndGet()).join();
57+
assertThat(counter.get()).isEqualTo(nItems);
58+
}
59+
60+
@Test
61+
@Timeout(value = 1, unit = TimeUnit.MINUTES)
62+
public void subscribe_longStream_doesNotFail() throws Exception {
63+
int nPages = 100_000;
64+
int nItemsPerPage = 1;
65+
Function<SdkResponse, Iterator<String>> iteratorFn = resp ->
66+
new Iterator<String>() {
67+
private int count = 0;
68+
69+
@Override
70+
public boolean hasNext() {
71+
return count < nItemsPerPage;
72+
}
73+
74+
@Override
75+
public String next() {
76+
++count;
77+
return "item";
78+
}
79+
};
80+
81+
AsyncPageFetcher<TestResponse> pageFetcher = new AsyncPageFetcher<TestResponse>() {
82+
@Override
83+
public boolean hasNextPage(TestResponse oldPage) {
84+
return oldPage.pageNumber() < nPages - 1;
85+
}
86+
87+
@Override
88+
public CompletableFuture<TestResponse> nextPage(TestResponse oldPage) {
89+
TestResponse resp;
90+
if (oldPage == null) {
91+
resp = createResponse(0);
92+
} else {
93+
resp = createResponse(oldPage.pageNumber() + 1);
94+
}
95+
return CompletableFuture.completedFuture(resp);
96+
}
97+
};
98+
99+
PaginatedItemsPublisher<SdkResponse, String> publisher = PaginatedItemsPublisher.builder()
100+
.isLastPage(false)
101+
.nextPageFetcher(pageFetcher)
102+
.iteratorFunction(iteratorFn)
103+
.build();
104+
105+
AtomicLong counter = new AtomicLong();
106+
publisher.subscribe(i -> counter.incrementAndGet()).join();
107+
assertThat(counter.get()).isEqualTo(nPages * nItemsPerPage);
108+
}
109+
110+
private abstract class TestResponse extends SdkResponse {
111+
112+
protected TestResponse(Builder builder) {
113+
super(builder);
114+
}
115+
116+
abstract Integer pageNumber();
117+
}
118+
119+
private static TestResponse createResponse(Integer pageNumber) {
120+
TestResponse mock = mock(TestResponse.class);
121+
when(mock.pageNumber()).thenReturn(pageNumber);
122+
return mock;
123+
}
124+
}

0 commit comments

Comments
 (0)