Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 112 additions & 26 deletions src/main/java/io/reactivex/rxjava4/core/Streamable.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import io.reactivex.rxjava4.annotations.*;
import io.reactivex.rxjava4.disposables.*;
Expand Down Expand Up @@ -54,17 +55,6 @@ public interface Streamable<@NonNull T> {
// HELPERS
// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo

// TODO, why no public final so it is not unnecessarily reimplemented, Java?
/**
* Realizes the stream and returns an interface that lets one consume it.
* @return the Streamer instance to consume.
*/
@CheckReturnValue
@NonNull
default Streamer<T> stream() {
return stream(new CompositeDisposable()); // FIXME, use a practically no-op disposable container instead
}

// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
// Data sources and wrappers
// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
Expand All @@ -74,10 +64,11 @@ default Streamer<T> stream() {
* @param <T> the element type
* @return the {@code Streamable} instance
*/
@SuppressWarnings("unchecked")
@CheckReturnValue
@NonNull
static <@NonNull T> Streamable<T> empty() {
return RxJavaPlugins.onAssembly(new StreamableEmpty<>());
return RxJavaPlugins.onAssembly((Streamable<T>)StreamableEmpty.INSTANCE);
}

/**
Expand All @@ -101,17 +92,46 @@ default Streamer<T> stream() {
* @throws NullPointerException if {@code items} is {@code null}
*/
@SafeVarargs
@CheckReturnValue
@NonNull
static <@NonNull T> Streamable<T> fromArray(T... items) {
static <@NonNull T> Streamable<T> fromArray(@NonNull T... items) {
Objects.requireNonNull(items, "items is null");
return RxJavaPlugins.onAssembly(new StreamableFromArray<>(items));
}

/**
* Convert any Flow.Publisher into a Streamable sequence.
* Streams all elements of the given {@link Iterable} sequence.
* @param <T> the element type of the items
* @param items the iterable of items to stream
* @return the new {@code Streamable} instance
* @throws NullPointerException if {@code items} is {@code null}
*/
@CheckReturnValue
@NonNull
static <@NonNull T> Streamable<T> fromIterable(@NonNull Iterable<? extends T> items) {
Objects.requireNonNull(items, "items is null");
return RxJavaPlugins.onAssembly(new StreamableFromIterable<>(items));
}

/**
* Streams all elements of the given {@link Stream} sequence.
* @param <T> the element type of the items
* @param items the stream of items to stream
* @return the new {@code Streamable} instance
* @throws NullPointerException if {@code items} is {@code null}
*/
@CheckReturnValue
@NonNull
static <@NonNull T> Streamable<T> fromStream(@NonNull Stream<? extends T> items) {
Objects.requireNonNull(items, "items is null");
return RxJavaPlugins.onAssembly(new StreamableFromStream<>(items));
}

/**
* Convert any {@link java.util.concurrent.Flow.Publisher} into a {@code Streamable} sequence.
* @param <T> the element type
* @param source Flow.Publisher to convert
* @return the new Streamable instance
* @return the new {@code Streamable} instance
*/
@CheckReturnValue
@NonNull
Expand All @@ -121,24 +141,48 @@ static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source) {
}

/**
* Convert any Flow.Publisher into a Streamable sequence.
* Convert any {@link java.util.concurrent.Flow.Publisher} into a {@code Streamable} sequence.
* @param <T> the element type
* @param source Flow.Publisher to convert
* @param executor where the conversion will run
* @return the new Streamable instance
* @return the new {@code Streamable} instance
*/
@CheckReturnValue
@NonNull
static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source, @NonNull ExecutorService executor) {
Objects.requireNonNull(source, "source is null");
Objects.requireNonNull(executor, "executor is null");
return RxJavaPlugins.onAssembly(new StreamableFromPublisher<>(source, executor));
}

/**
* Returns an {@code Streamable} that never produces an item and never terminates.
* @param <T> the element type
* @return the {@code Streamable} instance
*/
@SuppressWarnings("unchecked")
@CheckReturnValue
@NonNull
static <@NonNull T> Streamable<T> never() {
return RxJavaPlugins.onAssembly((Streamable<T>)StreamableNever.INSTANCE);
}

/**
* Generate a sequence of values via a virtual generator callback (yielder)
* which is free to block and is natively backpressured.
* <p>
* Runs on the {@link Schedulers#virtual()} scheduler.
* <p>
* Example
* <pre><code>
* Streamable.create(emitter -> {
* emitter.emit(1);
* emitter.emit(2);
* emitter.emit(3);
* })
* .forEach(System.out::println)
* ;
* </code></pre>
* @param <T> the element type
* @param generator the generator to use
* @return the streamable instance
Expand All @@ -156,6 +200,17 @@ static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source, @NonNu
* which is free to block and is natively backpressured.
* <p>
* Runs on the given scheduler.
* <p>
* Example
* <pre><code>
* Streamable.create(emitter -> {
* emitter.emit(1);
* emitter.emit(2);
* emitter.emit(3);
* }, Schedulers.cached())
* .forEach(System.out::println)
* ;
* </code></pre>
* @param <T> the element type
* @param generator the generator to use
* @param scheduler the scheduler to run the virtual generator on
Expand Down Expand Up @@ -195,8 +250,12 @@ static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source, @NonNu
* @return the new Streamable instance
*/
@SuppressWarnings("unchecked")
@CheckReturnValue
@NonNull
static <@NonNull T> Streamable<CompletionStage<T>> fromStages(@NonNull Iterable<? extends CompletionStage<? extends T>> stages, ExecutorService executor) {
static <@NonNull T> Streamable<CompletionStage<T>> fromStages(
@NonNull Iterable<? extends CompletionStage<? extends T>> stages, ExecutorService executor) {
Objects.requireNonNull(stages, "stages is null");
Objects.requireNonNull(executor, "executor is null");
return create(emitter -> {
var list = new ArrayList<CompletionStage<? extends T>>();
for(var stage : stages) {
Expand All @@ -217,6 +276,8 @@ static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source, @NonNu
* @return the new {@code Streamable} instance
* @throws NullPointerException if {@code supplier} is {@code null}
*/
@CheckReturnValue
@NonNull
static <@NonNull T> Streamable<T> defer(Supplier<? extends Streamable<? extends T>> supplier) {
Objects.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new StreamableDefer<>(supplier));
Expand All @@ -230,28 +291,35 @@ static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source, @NonNu
* @return the new {@code Streamable} instance
* @throws NullPointerException if {@code throwable} is {@code null}
*/
@CheckReturnValue
@NonNull
static <@NonNull T> Streamable<T> error(Throwable throwable) {
Objects.requireNonNull(throwable, "throwable is null");
return RxJavaPlugins.onAssembly(new StreamableError<>(throwable));
}

/**
* Emits the elements of each inner sequence produced by the outher sequence.
* Emits the elements of each inner sequence produced by the outer sequence.
* @param <T> the common element type
* @param sources a streamable of inner streamables
* @param exec the executorservice where to run the virtual wait
* @return the new Streamable instance.
* @param executor the executorservice where to run the virtual wait
* @return the new {@code Streamable} instance.
* @throws NullPointerException if {@code sources} or {@code exec} is {@code null}
*/
static <@NonNull T> Streamable<T> concat(Streamable<? extends Streamable<? extends T>> sources, ExecutorService exec) {
@CheckReturnValue
@NonNull
static <@NonNull T> Streamable<T> concat(Streamable<? extends Streamable<? extends T>> sources, ExecutorService executor) {
Objects.requireNonNull(sources, "sources is null");
Objects.requireNonNull(executor, "executor is null");
return create(emitter -> {
try (var mainSource = sources.forEach(item -> {
try (var innerSource = item.forEach(emitter::emit, emitter.canceller().derive(), exec)) {
try (var innerSource = item.forEach(emitter::emit, emitter.canceller().derive(), executor)) {
innerSource.await(emitter.canceller());
}
}, emitter.canceller(), exec)) {
}, emitter.canceller(), executor)) {
mainSource.await(emitter.canceller());
}
}, exec);
}, executor);
}

/**
Expand All @@ -260,6 +328,8 @@ static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source, @NonNu
* @param count the number of elements to emit
* @return the new {@code Streamable} instance
*/
@CheckReturnValue
@NonNull
static Streamable<Integer> range(int start, int count) {
if (count < 0) {
throw new IllegalArgumentException("count >= 0 required but it was " + count);
Expand All @@ -282,6 +352,8 @@ static Streamable<Integer> range(int start, int count) {
* @param count the number of elements to emit
* @return the new {@code Streamable} instance
*/
@CheckReturnValue
@NonNull
static Streamable<Long> rangeLong(long start, long count) {
if (count < 0) {
throw new IllegalArgumentException("count >= 0 required but it was " + count);
Expand Down Expand Up @@ -335,14 +407,26 @@ default Flowable<T> toFlowable(@NonNull ExecutorService executor) {
* Transforms the upstream sequence into zero or more elements for the downstream.
* @param <R> the result element type
* @param transformer the interface to implement the transforming logic
* @return the new Streamable instance
* @return the new {@code Streamable} instance
*/
@CheckReturnValue
@NonNull
default <@NonNull R> Streamable<R> transform(@NonNull VirtualTransformer<T, R> transformer) {
return transform(transformer, Executors.newVirtualThreadPerTaskExecutor());
}

/**
* Hides the identity of this {@code Streamable} and its {@link Streamer}.
* <p>
* Use it to break optimizations or hide concrete implementations.
* @return the new {@code Streamable} instance
*/
@CheckReturnValue
@NonNull
default Streamable<T> hide() {
return RxJavaPlugins.onAssembly(new StreamableHide<>(this));
}

/**
* Transforms the upstream sequence into zero or more elements for the downstream.
* @param <R> the result element type
Expand Down Expand Up @@ -370,6 +454,8 @@ default Flowable<T> toFlowable(@NonNull ExecutorService executor) {
* @param n the maximum number of items to relay
* @return the new {@code Streamable} instance
*/
@CheckReturnValue
@NonNull
default Streamable<T> take(long n) {
ObjectHelper.verifyPositive(n, "n");
return defer(() -> {
Expand Down
33 changes: 1 addition & 32 deletions src/main/java/io/reactivex/rxjava4/core/Streamer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

/**
* A realized stream which can then be consumed asynchronously in steps.
* Think of it as the {@IAsyncEnumerator} of the Java world. Runs best on Virtual Threads.

Check warning on line 25 in src/main/java/io/reactivex/rxjava4/core/Streamer.java

View workflow job for this annotation

GitHub Actions / build

unknown tag. Unregistered custom tag?

Check warning on line 25 in src/main/java/io/reactivex/rxjava4/core/Streamer.java

View workflow job for this annotation

GitHub Actions / build

unknown tag. Unregistered custom tag?

Check warning on line 25 in src/main/java/io/reactivex/rxjava4/core/Streamer.java

View workflow job for this annotation

GitHub Actions / build (27)

unknown tag. Unregistered custom tag?

Check warning on line 25 in src/main/java/io/reactivex/rxjava4/core/Streamer.java

View workflow job for this annotation

GitHub Actions / build (27)

unknown tag. Unregistered custom tag?
* <p>
* To make sure you can run finish, use {@link DisposableContainer#clear()} or {@link DisposableContainer#reset()}
* to get rid of all previous registered disposables. finish() will create its own, and if that
Expand Down Expand Up @@ -131,37 +131,6 @@

}

/**
* Hides the identity of this Streamer for debug or deoptimization purposes.
* @return the augmented streamer, always unique.
*/
default Streamer<T> hide() {
return new HiddenStreamer<>(this);
}

/**
* Hides the identity of the Streamer for debug or deoptimization purposes.
* @param <T> the element type of the streamer
*/
static record HiddenStreamer<T>(@NonNull Streamer<T> streamer) implements Streamer<T> {

@Override
public @NonNull CompletionStage<Boolean> next(@NonNull DisposableContainer cancellation) {
return streamer.next(cancellation);
}

@Override
public @NonNull T current() {
return streamer.current();
}

@Override
public @NonNull CompletionStage<Void> finish(@NonNull DisposableContainer cancellation) {
return streamer.finish(cancellation);
}

}

/**
* Moves and awaits the sequence's next element, returns false if there are no more
* data.
Expand Down Expand Up @@ -212,7 +181,7 @@

/**
* Use this constant in {@link #finish(DisposableContainer)} to indicate
* the cleanupp was done synchronously.
* the cleanup was done synchronously.
*/
CompletionStage<Void> FINISHED = CompletableFuture.completedStage(null);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava4.internal.fuseable;

import io.reactivex.rxjava4.annotations.NonNull;
import io.reactivex.rxjava4.core.Streamable;

/**
* Interface indicating the implementor has an upstream Streamable-like source available
* via {@link #source()} method.
*
* @param <T> the value type
*/
public interface HasUpstreamStreamableSource<@NonNull T> {
/**
* Returns the upstream source of this Streamable.
* <p>Allows discovering the chain of streamables.
* @return the source Streamable
*/
@NonNull
Streamable<T> source();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,26 @@
import io.reactivex.rxjava4.core.*;
import io.reactivex.rxjava4.disposables.*;

public final class StreamableEmpty<T> implements Streamable<T> {
public enum StreamableEmpty implements Streamable<Object> {

INSTANCE;

@Override
public @NonNull Streamer<@NonNull T> stream(@NonNull DisposableContainer cancellation) {
return new EmptyStreamer<>();
public @NonNull Streamer<Object> stream(@NonNull DisposableContainer cancellation) {
return EmptyStreamer.INSTANCE;
}

static final class EmptyStreamer<T> implements Streamer<T> {
enum EmptyStreamer implements Streamer<Object> {

INSTANCE;

@Override
public @NonNull CompletionStage<Boolean> next(DisposableContainer cancellation) {
return NEXT_FALSE;
}

@Override
public @NonNull T current() {
public @NonNull Object current() {
throw new NoSuchElementException("This Streamable/Streamer never has elements");
}

Expand Down
Loading
Loading