From f248b3c83b8cb778eec5edc2cf68e727b87394b3 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 1 Jul 2026 11:50:03 +0200 Subject: [PATCH 1/4] 4.x: Streamable + fromIterable, fromStream, hide, never, fixes --- .../io/reactivex/rxjava4/core/Streamable.java | 138 ++++++++++++++---- .../io/reactivex/rxjava4/core/Streamer.java | 33 +---- .../fuseable/HasUpstreamStreamableSource.java | 33 +++++ .../operators/streamable/StreamableEmpty.java | 14 +- .../streamable/StreamableFromArray.java | 5 - .../streamable/StreamableFromIterable.java | 82 +++++++++++ .../streamable/StreamableFromStream.java | 35 +++++ .../operators/streamable/StreamableHide.java | 48 ++++++ .../operators/streamable/StreamableJust.java | 4 - .../operators/streamable/StreamableNever.java | 53 +++++++ .../rxjava4/observers/BaseTestConsumer.java | 54 +++++++ .../StreamableFromIterableTest.java | 71 +++++++++ .../streamable/StreamableFromStreamTest.java | 81 ++++++++++ .../operators/streamable/StreamableTest.java | 86 ++++++++++- .../SharedSchedulerIsolatedTest.java | 90 ++++++++++++ .../schedulers/SharedSchedulerTest.java | 65 +-------- .../subscribers/TestSubscriberTest.java | 8 +- .../testsupport/BaseTestConsumerEx.java | 54 ------- .../validators/CheckParamValidationTest.java | 20 ++- 19 files changed, 775 insertions(+), 199 deletions(-) create mode 100644 src/main/java/io/reactivex/rxjava4/internal/fuseable/HasUpstreamStreamableSource.java create mode 100644 src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromIterable.java create mode 100644 src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromStream.java create mode 100644 src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableHide.java create mode 100644 src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableNever.java create mode 100644 src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromIterableTest.java create mode 100644 src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromStreamTest.java create mode 100644 src/test/java/io/reactivex/rxjava4/internal/schedulers/SharedSchedulerIsolatedTest.java diff --git a/src/main/java/io/reactivex/rxjava4/core/Streamable.java b/src/main/java/io/reactivex/rxjava4/core/Streamable.java index caf39c875a6..66df4940bb3 100644 --- a/src/main/java/io/reactivex/rxjava4/core/Streamable.java +++ b/src/main/java/io/reactivex/rxjava4/core/Streamable.java @@ -16,6 +16,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; import io.reactivex.rxjava4.annotations.*; import io.reactivex.rxjava4.disposables.*; @@ -54,17 +55,6 @@ public interface Streamable<@NonNull T> { // HELPERS // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo - // TODO, why no public final so it is not unnecessarily reimplemented, Java? - /** - * Realizes the stream and returns an interface that lets one consume it. - * @return the Streamer instance to consume. - */ - @CheckReturnValue - @NonNull - default Streamer stream() { - return stream(new CompositeDisposable()); // FIXME, use a practically no-op disposable container instead - } - // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo // Data sources and wrappers // oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo @@ -74,10 +64,11 @@ default Streamer stream() { * @param the element type * @return the {@code Streamable} instance */ + @SuppressWarnings("unchecked") @CheckReturnValue @NonNull static <@NonNull T> Streamable empty() { - return RxJavaPlugins.onAssembly(new StreamableEmpty<>()); + return RxJavaPlugins.onAssembly((Streamable)StreamableEmpty.INSTANCE); } /** @@ -101,17 +92,46 @@ default Streamer stream() { * @throws NullPointerException if {@code items} is {@code null} */ @SafeVarargs + @CheckReturnValue @NonNull - static <@NonNull T> Streamable fromArray(T... items) { + static <@NonNull T> Streamable fromArray(@NonNull T... items) { Objects.requireNonNull(items, "items is null"); return RxJavaPlugins.onAssembly(new StreamableFromArray<>(items)); } /** - * Convert any Flow.Publisher into a Streamable sequence. + * Streams all elements of the given {@link Iterable} sequence. + * @param the element type of the items + * @param items the iterable of items to stream + * @return the new {@code Streamable} instance + * @throws NullPointerException if {@code items} is {@code null} + */ + @CheckReturnValue + @NonNull + static <@NonNull T> Streamable fromIterable(@NonNull Iterable items) { + Objects.requireNonNull(items, "items is null"); + return RxJavaPlugins.onAssembly(new StreamableFromIterable<>(items)); + } + + /** + * Streams all elements of the given {@link Stream} sequence. + * @param the element type of the items + * @param items the stream of items to stream + * @return the new {@code Streamable} instance + * @throws NullPointerException if {@code items} is {@code null} + */ + @CheckReturnValue + @NonNull + static <@NonNull T> Streamable fromStream(@NonNull Stream items) { + Objects.requireNonNull(items, "items is null"); + return RxJavaPlugins.onAssembly(new StreamableFromStream<>(items)); + } + + /** + * Convert any {@link java.util.concurrent.Flow.Publisher} into a {@code Streamable} sequence. * @param the element type * @param source Flow.Publisher to convert - * @return the new Streamable instance + * @return the new {@code Streamable} instance */ @CheckReturnValue @NonNull @@ -121,24 +141,48 @@ static Streamable fromPublisher(@NonNull Flow.Publisher source) { } /** - * Convert any Flow.Publisher into a Streamable sequence. + * Convert any {@link java.util.concurrent.Flow.Publisher} into a {@code Streamable} sequence. * @param the element type * @param source Flow.Publisher to convert * @param executor where the conversion will run - * @return the new Streamable instance + * @return the new {@code Streamable} instance */ @CheckReturnValue @NonNull static Streamable fromPublisher(@NonNull Flow.Publisher source, @NonNull ExecutorService executor) { Objects.requireNonNull(source, "source is null"); + Objects.requireNonNull(executor, "executor is null"); return RxJavaPlugins.onAssembly(new StreamableFromPublisher<>(source, executor)); } + /** + * Returns an {@code Streamable} that never produces an item and never terminates. + * @param the element type + * @return the {@code Streamable} instance + */ + @SuppressWarnings("unchecked") + @CheckReturnValue + @NonNull + static <@NonNull T> Streamable never() { + return RxJavaPlugins.onAssembly((Streamable)StreamableNever.INSTANCE); + } + /** * Generate a sequence of values via a virtual generator callback (yielder) * which is free to block and is natively backpressured. *

* Runs on the {@link Schedulers#virtual()} scheduler. + *

+ * Example + *


+     * Streamable.create(emitter -> {
+     *     emitter.emit(1);
+     *     emitter.emit(2);
+     *     emitter.emit(3);
+     * })
+     * .forEach(System.out::println)
+     * ;
+     * 
* @param the element type * @param generator the generator to use * @return the streamable instance @@ -156,6 +200,17 @@ static Streamable fromPublisher(@NonNull Flow.Publisher source, @NonNu * which is free to block and is natively backpressured. *

* Runs on the given scheduler. + *

+ * Example + *


+     * Streamable.create(emitter -> {
+     *     emitter.emit(1);
+     *     emitter.emit(2);
+     *     emitter.emit(3);
+     * }, Schedulers.cached())
+     * .forEach(System.out::println)
+     * ;
+     * 
* @param the element type * @param generator the generator to use * @param scheduler the scheduler to run the virtual generator on @@ -195,8 +250,12 @@ static Streamable fromPublisher(@NonNull Flow.Publisher source, @NonNu * @return the new Streamable instance */ @SuppressWarnings("unchecked") + @CheckReturnValue @NonNull - static <@NonNull T> Streamable> fromStages(@NonNull Iterable> stages, ExecutorService executor) { + static <@NonNull T> Streamable> fromStages( + @NonNull Iterable> stages, ExecutorService executor) { + Objects.requireNonNull(stages, "stages is null"); + Objects.requireNonNull(executor, "executor is null"); return create(emitter -> { var list = new ArrayList>(); for(var stage : stages) { @@ -217,6 +276,8 @@ static Streamable fromPublisher(@NonNull Flow.Publisher source, @NonNu * @return the new {@code Streamable} instance * @throws NullPointerException if {@code supplier} is {@code null} */ + @CheckReturnValue + @NonNull static <@NonNull T> Streamable defer(Supplier> supplier) { Objects.requireNonNull(supplier, "supplier is null"); return RxJavaPlugins.onAssembly(new StreamableDefer<>(supplier)); @@ -230,28 +291,35 @@ static Streamable fromPublisher(@NonNull Flow.Publisher source, @NonNu * @return the new {@code Streamable} instance * @throws NullPointerException if {@code throwable} is {@code null} */ + @CheckReturnValue + @NonNull static <@NonNull T> Streamable error(Throwable throwable) { Objects.requireNonNull(throwable, "throwable is null"); return RxJavaPlugins.onAssembly(new StreamableError<>(throwable)); } /** - * Emits the elements of each inner sequence produced by the outher sequence. + * Emits the elements of each inner sequence produced by the outer sequence. * @param the common element type * @param sources a streamable of inner streamables - * @param exec the executorservice where to run the virtual wait - * @return the new Streamable instance. + * @param executor the executorservice where to run the virtual wait + * @return the new {@code Streamable} instance. + * @throws NullPointerException if {@code sources} or {@code exec} is {@code null} */ - static <@NonNull T> Streamable concat(Streamable> sources, ExecutorService exec) { + @CheckReturnValue + @NonNull + static <@NonNull T> Streamable concat(Streamable> sources, ExecutorService executor) { + Objects.requireNonNull(sources, "sources is null"); + Objects.requireNonNull(executor, "executor is null"); return create(emitter -> { try (var mainSource = sources.forEach(item -> { - try (var innerSource = item.forEach(emitter::emit, emitter.canceller().derive(), exec)) { + try (var innerSource = item.forEach(emitter::emit, emitter.canceller().derive(), executor)) { innerSource.await(emitter.canceller()); } - }, emitter.canceller(), exec)) { + }, emitter.canceller(), executor)) { mainSource.await(emitter.canceller()); } - }, exec); + }, executor); } /** @@ -260,6 +328,8 @@ static Streamable fromPublisher(@NonNull Flow.Publisher source, @NonNu * @param count the number of elements to emit * @return the new {@code Streamable} instance */ + @CheckReturnValue + @NonNull static Streamable range(int start, int count) { if (count < 0) { throw new IllegalArgumentException("count >= 0 required but it was " + count); @@ -282,6 +352,8 @@ static Streamable range(int start, int count) { * @param count the number of elements to emit * @return the new {@code Streamable} instance */ + @CheckReturnValue + @NonNull static Streamable rangeLong(long start, long count) { if (count < 0) { throw new IllegalArgumentException("count >= 0 required but it was " + count); @@ -335,7 +407,7 @@ default Flowable toFlowable(@NonNull ExecutorService executor) { * Transforms the upstream sequence into zero or more elements for the downstream. * @param the result element type * @param transformer the interface to implement the transforming logic - * @return the new Streamable instance + * @return the new {@code Streamable} instance */ @CheckReturnValue @NonNull @@ -343,6 +415,18 @@ default Flowable toFlowable(@NonNull ExecutorService executor) { return transform(transformer, Executors.newVirtualThreadPerTaskExecutor()); } + /** + * Hides the identity of this {@code Streamable} and its {@link Streamer}. + *

+ * Use it to break optimizations or hide concrete implementations. + * @return the new {@code Streamable} instance + */ + @CheckReturnValue + @NonNull + default Streamable hide() { + return RxJavaPlugins.onAssembly(new StreamableHide<>(this)); + } + /** * Transforms the upstream sequence into zero or more elements for the downstream. * @param the result element type @@ -370,6 +454,8 @@ default Flowable toFlowable(@NonNull ExecutorService executor) { * @param n the maximum number of items to relay * @return the new {@code Streamable} instance */ + @CheckReturnValue + @NonNull default Streamable take(long n) { ObjectHelper.verifyPositive(n, "n"); return defer(() -> { diff --git a/src/main/java/io/reactivex/rxjava4/core/Streamer.java b/src/main/java/io/reactivex/rxjava4/core/Streamer.java index 37bccdb918c..87562ec2f91 100644 --- a/src/main/java/io/reactivex/rxjava4/core/Streamer.java +++ b/src/main/java/io/reactivex/rxjava4/core/Streamer.java @@ -131,37 +131,6 @@ static record StreamerFinishViaDisposableContainerCanceller( } - /** - * Hides the identity of this Streamer for debug or deoptimization purposes. - * @return the augmented streamer, always unique. - */ - default Streamer hide() { - return new HiddenStreamer<>(this); - } - - /** - * Hides the identity of the Streamer for debug or deoptimization purposes. - * @param the element type of the streamer - */ - static record HiddenStreamer(@NonNull Streamer streamer) implements Streamer { - - @Override - public @NonNull CompletionStage next(@NonNull DisposableContainer cancellation) { - return streamer.next(cancellation); - } - - @Override - public @NonNull T current() { - return streamer.current(); - } - - @Override - public @NonNull CompletionStage finish(@NonNull DisposableContainer cancellation) { - return streamer.finish(cancellation); - } - - } - /** * Moves and awaits the sequence's next element, returns false if there are no more * data. @@ -212,7 +181,7 @@ default void awaitFinish(@NonNull DisposableContainer cancellation) { /** * Use this constant in {@link #finish(DisposableContainer)} to indicate - * the cleanupp was done synchronously. + * the cleanup was done synchronously. */ CompletionStage FINISHED = CompletableFuture.completedStage(null); } diff --git a/src/main/java/io/reactivex/rxjava4/internal/fuseable/HasUpstreamStreamableSource.java b/src/main/java/io/reactivex/rxjava4/internal/fuseable/HasUpstreamStreamableSource.java new file mode 100644 index 00000000000..e05a161a798 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava4/internal/fuseable/HasUpstreamStreamableSource.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.fuseable; + +import io.reactivex.rxjava4.annotations.NonNull; +import io.reactivex.rxjava4.core.Streamable; + +/** + * Interface indicating the implementor has an upstream Streamable-like source available + * via {@link #source()} method. + * + * @param the value type + */ +public interface HasUpstreamStreamableSource<@NonNull T> { + /** + * Returns the upstream source of this Streamable. + *

Allows discovering the chain of streamables. + * @return the source Streamable + */ + @NonNull + Streamable source(); +} diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableEmpty.java b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableEmpty.java index 44c911cc4e1..42b120e4f86 100644 --- a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableEmpty.java +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableEmpty.java @@ -20,14 +20,18 @@ import io.reactivex.rxjava4.core.*; import io.reactivex.rxjava4.disposables.*; -public final class StreamableEmpty implements Streamable { +public enum StreamableEmpty implements Streamable { + + INSTANCE; @Override - public @NonNull Streamer<@NonNull T> stream(@NonNull DisposableContainer cancellation) { - return new EmptyStreamer<>(); + public @NonNull Streamer stream(@NonNull DisposableContainer cancellation) { + return EmptyStreamer.INSTANCE; } - static final class EmptyStreamer implements Streamer { + enum EmptyStreamer implements Streamer { + + INSTANCE; @Override public @NonNull CompletionStage next(DisposableContainer cancellation) { @@ -35,7 +39,7 @@ static final class EmptyStreamer implements Streamer { } @Override - public @NonNull T current() { + public @NonNull Object current() { throw new NoSuchElementException("This Streamable/Streamer never has elements"); } diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromArray.java b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromArray.java index 1bb9ffcd821..2c45af95040 100644 --- a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromArray.java +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromArray.java @@ -13,7 +13,6 @@ package io.reactivex.rxjava4.internal.operators.streamable; -import java.util.Objects; import java.util.concurrent.*; import io.reactivex.rxjava4.annotations.NonNull; @@ -22,10 +21,6 @@ public record StreamableFromArray(@NonNull T[] items) implements Streamable { - public StreamableFromArray { - Objects.requireNonNull(items, "items is null"); - } - @Override public @NonNull Streamer<@NonNull T> stream(@NonNull DisposableContainer cancellation) { return new FromArrayStreamer<>(items); diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromIterable.java b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromIterable.java new file mode 100644 index 00000000000..b02243d34fb --- /dev/null +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromIterable.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import java.util.*; +import java.util.concurrent.*; + +import io.reactivex.rxjava4.annotations.NonNull; +import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.disposables.DisposableContainer; +import io.reactivex.rxjava4.exceptions.Exceptions; +import io.reactivex.rxjava4.internal.operators.streamable.StreamableEmpty.EmptyStreamer; + +public record StreamableFromIterable(@NonNull Iterable items) implements Streamable { + + @SuppressWarnings("unchecked") + @Override + public @NonNull Streamer<@NonNull T> stream(@NonNull DisposableContainer cancellation) { + Iterator iterator; + try { + iterator = Objects.requireNonNull(items.iterator(), "iterator is null"); + if (!iterator.hasNext()) { + return (Streamer)EmptyStreamer.INSTANCE; + } + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + return StreamableError.createFailed(ex); + } + return new IteratorStreamer<>(iterator); + } + + static final class IteratorStreamer implements Streamer { + + Iterator iterator; + + long index; + + volatile T current; + + IteratorStreamer(Iterator iterator) { + this.iterator = iterator; + } + + @Override + public @NonNull CompletionStage next(@NonNull DisposableContainer cancellation) { + if (index == 0L || iterator.hasNext()) { + var v = iterator.next(); + current = v; + if (v == null) { + return CompletableFuture.failedStage(new NullPointerException("Item at index " + index + " is null.")); + } + index++; + return NEXT_TRUE; + } + current = null; + return NEXT_FALSE; + } + + @Override + public @NonNull T current() { + return current; + } + + @Override + public @NonNull CompletionStage finish(@NonNull DisposableContainer cancellation) { + iterator = null; + current = null; + return FINISHED; + } + } +} diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromStream.java b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromStream.java new file mode 100644 index 00000000000..54791164209 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromStream.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import java.util.Objects; +import java.util.stream.Stream; + +import io.reactivex.rxjava4.annotations.NonNull; +import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.disposables.DisposableContainer; +import io.reactivex.rxjava4.internal.operators.streamable.StreamableEmpty.EmptyStreamer; + +public record StreamableFromStream(@NonNull Stream items) implements Streamable { + + @SuppressWarnings("unchecked") + @Override + public @NonNull Streamer<@NonNull T> stream(@NonNull DisposableContainer cancellation) { + var iterator = Objects.requireNonNull(items.iterator(), "iterator is null"); + if (!iterator.hasNext()) { + return (Streamer)EmptyStreamer.INSTANCE; + } + return new StreamableFromIterable.IteratorStreamer<>(iterator); + } +} diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableHide.java b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableHide.java new file mode 100644 index 00000000000..3ee6f653ee1 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableHide.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import java.util.concurrent.CompletionStage; + +import io.reactivex.rxjava4.annotations.NonNull; +import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.disposables.DisposableContainer; +import io.reactivex.rxjava4.internal.fuseable.HasUpstreamStreamableSource; + +public record StreamableHide(Streamable source) +implements Streamable, HasUpstreamStreamableSource { + + @Override + public @NonNull Streamer<@NonNull T> stream(@NonNull DisposableContainer cancellation) { + return new HideStreamer<>(source.stream(cancellation)); + } + + record HideStreamer(Streamer streamer) implements Streamer { + + @Override + public @NonNull CompletionStage next(@NonNull DisposableContainer cancellation) { + return streamer.next(cancellation); + } + + @Override + public @NonNull T current() { + return streamer.current(); + } + + @Override + public @NonNull CompletionStage finish(@NonNull DisposableContainer cancellation) { + return streamer.finish(cancellation); + } + } +} diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableJust.java b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableJust.java index e02c8426538..a500015112d 100644 --- a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableJust.java +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableJust.java @@ -22,10 +22,6 @@ public record StreamableJust(@NonNull T item) implements Streamable { - public StreamableJust { - Objects.requireNonNull(item, "item is null"); - } - @Override public @NonNull Streamer<@NonNull T> stream(@NonNull DisposableContainer cancellation) { return new JustStreamer<>(item, cancellation); diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableNever.java b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableNever.java new file mode 100644 index 00000000000..f7a0f17d65b --- /dev/null +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableNever.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import java.util.NoSuchElementException; +import java.util.concurrent.*; + +import io.reactivex.rxjava4.annotations.NonNull; +import io.reactivex.rxjava4.core.*; +import io.reactivex.rxjava4.disposables.*; + +public enum StreamableNever implements Streamable { + + INSTANCE; + + @Override + public @NonNull Streamer stream(@NonNull DisposableContainer cancellation) { + return NeverStreamer.INSTANCE; + } + + enum NeverStreamer implements Streamer { + + INSTANCE; + + @Override + public @NonNull CompletionStage next(DisposableContainer cancellation) { + var cf = new CompletableFuture(); + cancellation.subscribe(Disposable.fromFuture(cf, true)); + return cf; + } + + @Override + public @NonNull Object current() { + throw new NoSuchElementException("This Streamable/Streamer never has elements"); + } + + @Override + public @NonNull CompletionStage finish(DisposableContainer canceller) { + return FINISHED; + } + } +} diff --git a/src/main/java/io/reactivex/rxjava4/observers/BaseTestConsumer.java b/src/main/java/io/reactivex/rxjava4/observers/BaseTestConsumer.java index 8445592cc6c..ebbcdc79393 100644 --- a/src/main/java/io/reactivex/rxjava4/observers/BaseTestConsumer.java +++ b/src/main/java/io/reactivex/rxjava4/observers/BaseTestConsumer.java @@ -646,6 +646,60 @@ public final U awaitCount(int atLeast) { return (U)this; } + /** + * Returns true if an await timed out. + * @return true if one of the timeout-based await methods has timed out. + *

History: 2.0.7 - experimental + * @see #clearTimeout() + * @see #assertTimeout() + * @see #assertNoTimeout() + * @since 2.1 + */ + public final boolean isTimeout() { + return timeout; + } + + /** + * Clears the timeout flag set by the await methods when they timed out. + *

History: 2.0.7 - experimental + * @return this + * @since 2.1 + * @see #isTimeout() + */ + @SuppressWarnings("unchecked") + public final U clearTimeout() { + timeout = false; + return (U)this; + } + + /** + * Asserts that some awaitX method has timed out. + *

History: 2.0.7 - experimental + * @return this + * @since 2.1 + */ + @SuppressWarnings("unchecked") + public final U assertTimeout() { + if (!timeout) { + throw fail("No timeout?!"); + } + return (U)this; + } + + /** + * Asserts that some awaitX method has not timed out. + *

History: 2.0.7 - experimental + * @return this + * @since 2.1 + */ + @SuppressWarnings("unchecked") + public final U assertNoTimeout() { + if (timeout) { + throw fail("Timeout?!"); + } + return (U)this; + } + /** * Returns true if this test consumer was cancelled/disposed. * @return true if this test consumer was cancelled/disposed. diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromIterableTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromIterableTest.java new file mode 100644 index 00000000000..fb745340399 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromIterableTest.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Isolated; + +import io.reactivex.rxjava4.core.Streamable; +import io.reactivex.rxjava4.exceptions.TestException; + +@Isolated +public class StreamableFromIterableTest extends StreamableBaseTest { + + @Test + public void normal() throws Throwable { + Streamable.fromStream(List.of(1, 2, 3).stream()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3); + } + + @Test + public void empty() throws Throwable { + Streamable.fromStream(List.of().stream()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(); + } + + @Test + public void one() throws Throwable { + Streamable.fromStream(List.of(1).stream()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1); + } + + @Test + public void hasNull() throws Throwable { + Streamable.fromStream(Arrays.asList(1, null, 3).stream()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(NullPointerException.class, 1) + .assertError(t -> t.getMessage().equals("Item at index 1 is null.")); + ; + } + + @Test + public void hasNull2() throws Throwable { + Streamable.fromStream(Arrays.asList(null, 1, 2, 3).stream()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(NullPointerException.class) + .assertError(t -> t.getMessage().equals("Item at index 0 is null.")); + ; + } +} diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromStreamTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromStreamTest.java new file mode 100644 index 00000000000..18e9814326c --- /dev/null +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromStreamTest.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.operators.streamable; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Isolated; + +import io.reactivex.rxjava4.core.Streamable; +import io.reactivex.rxjava4.exceptions.TestException; + +@Isolated +public class StreamableFromStreamTest extends StreamableBaseTest { + + @Test + public void normal() throws Throwable { + Streamable.fromIterable(List.of(1, 2, 3)) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3); + } + + @Test + public void empty() throws Throwable { + Streamable.fromIterable(List.of()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(); + } + + @Test + public void one() throws Throwable { + Streamable.fromIterable(List.of(1)) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1); + } + + @Test + public void hasNull() throws Throwable { + Streamable.fromIterable(Arrays.asList(1, null, 3)) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(NullPointerException.class, 1) + .assertError(t -> t.getMessage().equals("Item at index 1 is null.")); + ; + } + + @Test + public void hasNull2() throws Throwable { + Streamable.fromIterable(Arrays.asList(null, 1, 2, 3)) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(NullPointerException.class) + .assertError(t -> t.getMessage().equals("Item at index 0 is null.")); + ; + } + + @Test + public void iteratorThrows() throws Throwable { + Streamable.fromIterable(() -> { throw new TestException("test"); }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class) + .assertError(t -> t.getMessage().equals("test")); + ; + } +} diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTest.java index 3b02a98092f..13ae264c63b 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTest.java @@ -15,15 +15,17 @@ import static org.junit.jupiter.api.Assertions.*; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.*; import org.junit.jupiter.api.parallel.Isolated; import io.reactivex.rxjava4.core.*; -import io.reactivex.rxjava4.exceptions.*; +import io.reactivex.rxjava4.exceptions.TestException; import io.reactivex.rxjava4.internal.subscriptions.EmptySubscription; +import io.reactivex.rxjava4.schedulers.Schedulers; import io.reactivex.rxjava4.subscribers.TestSubscriber; @Isolated @@ -286,4 +288,84 @@ public void fromPublisherExec() throws Throwable { .assertResult(1, 2, 3, 4, 5); }); } + + @Test + public void emptyCurrentThrows() { + assertThrows(NoSuchElementException.class, () -> { + StreamableEmpty.EmptyStreamer.INSTANCE.current(); + }); + } + + @Test + public void neverCurrentThrows() { + assertThrows(NoSuchElementException.class, () -> { + StreamableNever.NeverStreamer.INSTANCE.current(); + }); + } + + @Test + public void never() { + Streamable.never() + .test() + .awaitDone(100, TimeUnit.MILLISECONDS) + .assertTimeout(); + } + + @Test + public void never2() throws Throwable { + withCachedExecutor(exec -> { + Streamable.never() + .test(exec) + .awaitDone(100, TimeUnit.MILLISECONDS) + .assertTimeout(); + }); + } + + @Test + public void fromStages() throws Throwable { + withVirtual(exec -> { + Streamable.fromStages(List.of( + CompletableFuture.completedFuture(1), + CompletableFuture.completedFuture(2), + CompletableFuture.completedFuture(3) + ), exec + ) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(3) + .assertNoErrors() + .assertComplete(); + }); + } + + @Test + public void createPlain() { + Streamable.create(emitter -> { + emitter.emit(1); + emitter.emit(2); + emitter.emit(3); + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3); + } + + @Test + public void createScheduler() { + Streamable.create(emitter -> { + emitter.emit(1); + emitter.emit(2); + emitter.emit(3); + }, Schedulers.cached()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3); + } + + @Test + public void hide() { + var str = Streamable.empty().hide(); + + assertFalse(str instanceof StreamableEmpty, str.getClass().toString()); + } } diff --git a/src/test/java/io/reactivex/rxjava4/internal/schedulers/SharedSchedulerIsolatedTest.java b/src/test/java/io/reactivex/rxjava4/internal/schedulers/SharedSchedulerIsolatedTest.java new file mode 100644 index 00000000000..069ac617ec2 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava4/internal/schedulers/SharedSchedulerIsolatedTest.java @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava4.internal.schedulers; + +import static org.junit.jupiter.api.Assertions.fail; + +import java.lang.management.ManagementFactory; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Isolated; + +import io.reactivex.rxjava4.core.RxJavaTest; +import io.reactivex.rxjava4.core.Scheduler.Worker; +import io.reactivex.rxjava4.internal.functions.Functions; +import io.reactivex.rxjava4.schedulers.Schedulers; + +@Isolated +public class SharedSchedulerIsolatedTest extends RxJavaTest { + + long memoryUsage() { + return ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); + } + + @Test + public void noleak() throws Exception { + var scheduler = Schedulers.cached().share(); + try { + Worker worker = scheduler.createWorker(); + + worker.schedule(Functions.EMPTY_RUNNABLE); + + System.gc(); + Thread.sleep(500); + + long before = memoryUsage(); + System.out.printf("Start: %.1f%n", before / 1024.0 / 1024.0); + + for (int i = 0; i < 300 * 1000; i++) { + worker.schedule(Functions.EMPTY_RUNNABLE, 1, TimeUnit.DAYS); + } + + long middle = memoryUsage(); + + System.out.printf("Middle: %.1f -> %.1f%n", before / 1024.0 / 1024.0, middle / 1024.0 / 1024.0); + + worker.dispose(); + + System.gc(); + + Thread.sleep(100); + + int wait = 400; + + long after = memoryUsage(); + + while (wait-- > 0) { + System.out.printf("Usage: %.1f -> %.1f -> %.1f%n", before / 1024.0 / 1024.0, middle / 1024.0 / 1024.0, after / 1024.0 / 1024.0); + + if (middle > after * 2) { + return; + } + + Thread.sleep(100); + + System.gc(); + + Thread.sleep(100); + + after = memoryUsage(); + } + + fail(String.format("Looks like there is a memory leak: %.1f -> %.1f -> %.1f", before / 1024.0 / 1024.0, middle / 1024.0 / 1024.0, after / 1024.0 / 1024.0)); + + } finally { + scheduler.shutdown(); + } + } +} diff --git a/src/test/java/io/reactivex/rxjava4/internal/schedulers/SharedSchedulerTest.java b/src/test/java/io/reactivex/rxjava4/internal/schedulers/SharedSchedulerTest.java index ce1829e761f..ceaee91ff2c 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/schedulers/SharedSchedulerTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/schedulers/SharedSchedulerTest.java @@ -15,22 +15,20 @@ import static org.junit.jupiter.api.Assertions.*; -import java.lang.management.ManagementFactory; import java.util.*; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.*; -import io.reactivex.rxjava4.core.Flowable; +import io.reactivex.rxjava4.core.*; import io.reactivex.rxjava4.core.Scheduler.Worker; import io.reactivex.rxjava4.disposables.*; import io.reactivex.rxjava4.functions.Function; -import io.reactivex.rxjava4.internal.functions.Functions; import io.reactivex.rxjava4.internal.schedulers.SharedScheduler.SharedWorker.SharedAction; import io.reactivex.rxjava4.schedulers.*; import io.reactivex.rxjava4.testsupport.TestHelper; -public class SharedSchedulerTest implements Runnable { +public class SharedSchedulerTest extends RxJavaTest implements Runnable { volatile int calls; @@ -75,65 +73,6 @@ public void delay() { } } - long memoryUsage() { - return ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed(); - } - - @Test - public void noleak() throws Exception { - var scheduler = Schedulers.cached().share(); - try { - Worker worker = scheduler.createWorker(); - - worker.schedule(Functions.EMPTY_RUNNABLE); - - System.gc(); - Thread.sleep(500); - - long before = memoryUsage(); - System.out.printf("Start: %.1f%n", before / 1024.0 / 1024.0); - - for (int i = 0; i < 200 * 1000; i++) { - worker.schedule(Functions.EMPTY_RUNNABLE, 1, TimeUnit.DAYS); - } - - long middle = memoryUsage(); - - System.out.printf("Middle: %.1f -> %.1f%n", before / 1024.0 / 1024.0, middle / 1024.0 / 1024.0); - - worker.dispose(); - - System.gc(); - - Thread.sleep(100); - - int wait = 400; - - long after = memoryUsage(); - - while (wait-- > 0) { - System.out.printf("Usage: %.1f -> %.1f -> %.1f%n", before / 1024.0 / 1024.0, middle / 1024.0 / 1024.0, after / 1024.0 / 1024.0); - - if (middle > after * 2) { - return; - } - - Thread.sleep(100); - - System.gc(); - - Thread.sleep(100); - - after = memoryUsage(); - } - - fail(String.format("Looks like there is a memory leak: %.1f -> %.1f -> %.1f", before / 1024.0 / 1024.0, middle / 1024.0 / 1024.0, after / 1024.0 / 1024.0)); - - } finally { - scheduler.shutdown(); - } - } - @Test public void now() { TestScheduler test = new TestScheduler(); diff --git a/src/test/java/io/reactivex/rxjava4/subscribers/TestSubscriberTest.java b/src/test/java/io/reactivex/rxjava4/subscribers/TestSubscriberTest.java index 349692bd72c..c89e72c2859 100644 --- a/src/test/java/io/reactivex/rxjava4/subscribers/TestSubscriberTest.java +++ b/src/test/java/io/reactivex/rxjava4/subscribers/TestSubscriberTest.java @@ -1662,15 +1662,9 @@ public void asDisposable() { assertTrue(d.isDisposed(), "d is disposed"); } - static final class TestSubscriberImpl extends TestSubscriber { - public boolean isTimeout() { - return timeout; - } - } - @Test public void awaitCountTimeout() { - TestSubscriberImpl ts = new TestSubscriberImpl<>(); + TestSubscriber ts = new TestSubscriber<>(); ts.onSubscribe(new BooleanSubscription()); ts.awaitCount(1); assertTrue(ts.isTimeout()); diff --git a/src/test/java/io/reactivex/rxjava4/testsupport/BaseTestConsumerEx.java b/src/test/java/io/reactivex/rxjava4/testsupport/BaseTestConsumerEx.java index b91f023102e..1f0e6818354 100644 --- a/src/test/java/io/reactivex/rxjava4/testsupport/BaseTestConsumerEx.java +++ b/src/test/java/io/reactivex/rxjava4/testsupport/BaseTestConsumerEx.java @@ -205,60 +205,6 @@ public final U assertFailureAndMessage(Class error, .assertNotComplete(); } - /** - * Returns true if an await timed out. - * @return true if one of the timeout-based await methods has timed out. - *

History: 2.0.7 - experimental - * @see #clearTimeout() - * @see #assertTimeout() - * @see #assertNoTimeout() - * @since 2.1 - */ - public final boolean isTimeout() { - return timeout; - } - - /** - * Clears the timeout flag set by the await methods when they timed out. - *

History: 2.0.7 - experimental - * @return this - * @since 2.1 - * @see #isTimeout() - */ - @SuppressWarnings("unchecked") - public final U clearTimeout() { - timeout = false; - return (U)this; - } - - /** - * Asserts that some awaitX method has timed out. - *

History: 2.0.7 - experimental - * @return this - * @since 2.1 - */ - @SuppressWarnings("unchecked") - public final U assertTimeout() { - if (!timeout) { - throw fail("No timeout?!"); - } - return (U)this; - } - - /** - * Asserts that some awaitX method has not timed out. - *

History: 2.0.7 - experimental - * @return this - * @since 2.1 - */ - @SuppressWarnings("unchecked") - public final U assertNoTimeout() { - if (timeout) { - throw fail("Timeout?!"); - } - return (U)this; - } - /** * Returns the internal shared list of errors. * @return Returns the internal shared list of errors. diff --git a/src/test/java/io/reactivex/rxjava4/validators/CheckParamValidationTest.java b/src/test/java/io/reactivex/rxjava4/validators/CheckParamValidationTest.java index f98f4af3ca8..3395fd25593 100644 --- a/src/test/java/io/reactivex/rxjava4/validators/CheckParamValidationTest.java +++ b/src/test/java/io/reactivex/rxjava4/validators/CheckParamValidationTest.java @@ -30,6 +30,7 @@ import io.reactivex.rxjava4.exceptions.TestException; import io.reactivex.rxjava4.functions.*; import io.reactivex.rxjava4.internal.functions.Functions; +import io.reactivex.rxjava4.internal.operators.streamable.StreamableNever; import io.reactivex.rxjava4.parallel.*; import io.reactivex.rxjava4.plugins.RxJavaPlugins; import io.reactivex.rxjava4.schedulers.Schedulers; @@ -71,6 +72,11 @@ public void checkParallelFlowable() { checkClass(ParallelFlowable.class); } + @Test @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS) + public void checkStreamable() { + checkClass(Streamable.class); + } + // --------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------- @@ -519,6 +525,10 @@ public void checkParallelFlowable() { addIgnore(new ParamIgnore(Completable.class, "unsafeCreate", CompletableSource.class)); + // needs special param validation due to (long)start + end - 1 <= Integer.MAX_VALUE + addIgnore(new ParamIgnore(Streamable.class, "range", Integer.TYPE, Integer.TYPE)); + addIgnore(new ParamIgnore(Streamable.class, "rangeLong", Long.TYPE, Long.TYPE)); + // ----------------------------------------------------------------------------------- defaultValues = new HashMap<>(); @@ -538,6 +548,8 @@ public void checkParallelFlowable() { defaultValues.put(CompletableSource.class, new NeverCompletable()); defaultValues.put(Completable.class, new NeverCompletable()); + defaultValues.put(Streamable.class, StreamableNever.INSTANCE); + defaultValues.put(Action.class, Functions.EMPTY_ACTION); defaultValues.put(Runnable.class, Functions.EMPTY_RUNNABLE); defaultValues.put(Consumer.class, Functions.emptyConsumer()); @@ -668,6 +680,9 @@ public Object apply(Flowable upstream) { addDefaultInstance(Maybe.class, Maybe.just(1).hide(), "Just(1).Hide()"); addDefaultInstance(ParallelFlowable.class, Flowable.just(1).parallel(), "Just(1)"); + + addDefaultInstance(Streamable.class, Streamable.just(1), "Just(1)"); + addDefaultInstance(Streamable.class, Streamable.just(1).hide(), "Just(1).Hide()"); } static void addIgnore(ParamIgnore ignore) { @@ -867,8 +882,11 @@ void checkClass(Class clazz) { Throwable error = null; errors.clear(); try { - m.invoke(baseObject, callParams2); + var result = m.invoke(baseObject, callParams2); success = true; + if (result instanceof CompletionStageDisposable csd) { + csd.ignore(); + } } catch (Throwable ex) { // let it fail error = ex; From 8abd189712c6f3362f8377009db182a24a29f3a0 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 1 Jul 2026 12:17:48 +0200 Subject: [PATCH 2/4] Improve coverage --- .../operators/streamable/StreamableJust.java | 43 +++---------------- .../rxjava4/schedulers/TestSchedulerTest.java | 29 +++++++++++++ 2 files changed, 34 insertions(+), 38 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableJust.java b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableJust.java index a500015112d..1cfe4e564ca 100644 --- a/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableJust.java +++ b/src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableJust.java @@ -13,32 +13,27 @@ package io.reactivex.rxjava4.internal.operators.streamable; -import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.CompletionStage; import io.reactivex.rxjava4.annotations.NonNull; import io.reactivex.rxjava4.core.*; -import io.reactivex.rxjava4.disposables.*; +import io.reactivex.rxjava4.disposables.DisposableContainer; public record StreamableJust(@NonNull T item) implements Streamable { @Override public @NonNull Streamer<@NonNull T> stream(@NonNull DisposableContainer cancellation) { - return new JustStreamer<>(item, cancellation); + return new JustStreamer<>(item); } - static final class JustStreamer implements Streamer, Disposable { + static final class JustStreamer implements Streamer { volatile T item; - volatile DisposableContainer cancellation; - volatile int stage; - JustStreamer(T item, DisposableContainer cancellation) { + JustStreamer(T item) { this.item = item; - this.cancellation = cancellation; - cancellation.add(this); } @Override @@ -55,42 +50,14 @@ static final class JustStreamer implements Streamer, Disposable { @Override public @NonNull T current() { - var item = this.item; - if (stage == 0) { - throw new NoSuchElementException("Streamable.just not yet started!"); - } - if (stage == 2) { - throw new NoSuchElementException("Streamable.just already completed!"); - } return item; } @Override public @NonNull CompletionStage finish(DisposableContainer canceller) { item = null; - cancellation = null; stage = 2; return FINISHED; } - - @Override - public void close() { - Streamer.super.close(); - } - - @Override - public boolean isDisposed() { - return stage == 2; - } - - @Override - public void dispose() { - var dc = cancellation; - if (dc != null) { - if (dc.delete(this)) { - close(); // FIXME not sure about this! - } - } - } } } diff --git a/src/test/java/io/reactivex/rxjava4/schedulers/TestSchedulerTest.java b/src/test/java/io/reactivex/rxjava4/schedulers/TestSchedulerTest.java index ad7b7768e33..cf4f5248bdc 100644 --- a/src/test/java/io/reactivex/rxjava4/schedulers/TestSchedulerTest.java +++ b/src/test/java/io/reactivex/rxjava4/schedulers/TestSchedulerTest.java @@ -31,6 +31,7 @@ import io.reactivex.rxjava4.internal.util.ExceptionHelper; import io.reactivex.rxjava4.plugins.RxJavaPlugins; import io.reactivex.rxjava4.schedulers.TestScheduler.*; +import io.reactivex.rxjava4.subscribers.TestSubscriber; public class TestSchedulerTest extends RxJavaTest { @@ -342,4 +343,32 @@ public void disposeWork() { assertEquals(0, run.get()); } + + @Test + public void timeout() { + var ts = new TestSubscriber<>(); + ts.onSubscribe(new BooleanSubscription()); + + assertFalse(ts.isTimeout(), "Has timeout?"); + + ts.awaitDone(1, TimeUnit.MICROSECONDS); + + assertTrue(ts.isTimeout(), "Has no timeout?"); + + ts.assertTimeout(); + + assertThrows(AssertionError.class, () -> { + ts.assertNoTimeout(); + }); + + ts.clearTimeout(); + + assertFalse(ts.isTimeout(), "Has timeout?"); + ts.assertNoTimeout(); + + assertThrows(AssertionError.class, () -> { + ts.assertTimeout(); + }); + + } } From c4c428f18da9aa002284f4d044bbe7df3f884629 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 1 Jul 2026 12:27:46 +0200 Subject: [PATCH 3/4] do not call abstract methods?! --- .../reactivex/rxjava4/validators/CheckParamValidationTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/test/java/io/reactivex/rxjava4/validators/CheckParamValidationTest.java b/src/test/java/io/reactivex/rxjava4/validators/CheckParamValidationTest.java index 3395fd25593..cd9788ba0bf 100644 --- a/src/test/java/io/reactivex/rxjava4/validators/CheckParamValidationTest.java +++ b/src/test/java/io/reactivex/rxjava4/validators/CheckParamValidationTest.java @@ -771,6 +771,9 @@ void checkClass(Class clazz) { if (m.getDeclaringClass() != clazz) { continue; } + if (Modifier.isAbstract(m.getModifiers())) { + continue; + } String key = clazz.getName() + " " + m.getName(); From c0e44f597dd38c452d9fbce17bc1ca3f06346df1 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 1 Jul 2026 14:20:02 +0200 Subject: [PATCH 4/4] Fix flaky dispose tests --- .../internal/operators/streamable/StreamableForEachTest.java | 4 +++- .../operators/streamable/StreamableFromIterableTest.java | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableForEachTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableForEachTest.java index 192f0753a5f..381cacc4d86 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableForEachTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableForEachTest.java @@ -18,7 +18,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; import org.junit.jupiter.api.parallel.Isolated; import io.reactivex.rxjava4.core.Streamable; @@ -145,6 +145,7 @@ public void forEachOutsideCancel() { .forEach(_ -> { counter.getAndIncrement(); cd.dispose(); + Thread.sleep(10); // The body may fall off faster than the cancel can propagate out, so sleep }, cd) .await(); }); @@ -164,6 +165,7 @@ public void forEachBiOutsideCancel() throws Throwable { .forEach((_, _) -> { counter.getAndIncrement(); cd.dispose(); + Thread.sleep(10); // The body may fall off faster than the cancel can propagate out, so sleep }, cd, exec) .await(); }); diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromIterableTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromIterableTest.java index fb745340399..c5ae67bc177 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromIterableTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromIterableTest.java @@ -20,7 +20,6 @@ import org.junit.jupiter.api.parallel.Isolated; import io.reactivex.rxjava4.core.Streamable; -import io.reactivex.rxjava4.exceptions.TestException; @Isolated public class StreamableFromIterableTest extends StreamableBaseTest {