Skip to content

Commit 22f6eba

Browse files
authored
4.x: Add task coordination primitives, implement a basic concat() operator (#8110)
* 4.x: More Streamable operators * Fix bad cancellation storm * Fix style issue and eclipse annoyance of JUnit 5/6 selection
1 parent e30543c commit 22f6eba

11 files changed

Lines changed: 381 additions & 88 deletions

File tree

build.gradle

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,13 @@ dependencies {
4747
testImplementation "org.reactivestreams:reactive-streams-tck-flow:$reactiveStreamsVersion"
4848

4949
testImplementation "org.junit.jupiter:junit-jupiter:$jupiterVersion"
50-
testRuntimeOnly "org.junit.vintage:junit-vintage-engine:$jupiterVersion"
5150

52-
// The missing piece – required by Gradle 9+
53-
testRuntimeOnly "org.junit.platform:junit-platform-launcher:$jupiterLauncherVersion" // match your JUnit version family
51+
// Explicitly add these for IDE compatibility (especially Eclipse)
52+
testImplementation "org.junit.platform:junit-platform-commons:$jupiterLauncherVersion"
53+
testImplementation "org.junit.platform:junit-platform-launcher:$jupiterLauncherVersion"
54+
55+
testRuntimeOnly "org.junit.vintage:junit-vintage-engine:$jupiterVersion"
56+
testRuntimeOnly "org.junit.platform:junit-platform-launcher:$jupiterLauncherVersion" // already have this
5457
}
5558

5659
// === Experimental JDK handling for Outreach Program ===

src/main/java/io/reactivex/rxjava4/core/CompletionStageDisposable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import io.reactivex.rxjava4.annotations.NonNull;
2323
import io.reactivex.rxjava4.disposables.*;
24+
import io.reactivex.rxjava4.internal.util.AwaitCoordinatorStatic;
2425
import io.reactivex.rxjava4.plugins.RxJavaPlugins;
2526

2627
/**
@@ -84,7 +85,7 @@ public CompletionStageDisposable(@NonNull CompletionStage<T> stage, @NonNull Dis
8485
*/
8586
public void await() {
8687
state.lazySet(true);;
87-
Streamer.await(stage);
88+
AwaitCoordinatorStatic.await(stage);
8889
}
8990

9091
/**
@@ -93,7 +94,7 @@ public void await() {
9394
*/
9495
public void await(DisposableContainer canceller) {
9596
state.lazySet(true);;
96-
Streamer.await(stage, canceller);
97+
AwaitCoordinatorStatic.await(stage, canceller);
9798
}
9899

99100
/**

src/main/java/io/reactivex/rxjava4/core/Streamable.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@
1414
package io.reactivex.rxjava4.core;
1515

1616
import java.lang.reflect.InvocationTargetException;
17-
import java.util.Objects;
17+
import java.util.*;
1818
import java.util.concurrent.*;
1919

2020
import io.reactivex.rxjava4.annotations.*;
2121
import io.reactivex.rxjava4.disposables.*;
2222
import io.reactivex.rxjava4.exceptions.Exceptions;
2323
import io.reactivex.rxjava4.functions.*;
2424
import io.reactivex.rxjava4.internal.operators.streamable.*;
25+
import io.reactivex.rxjava4.internal.util.AwaitCoordinatorStatic;
2526
import io.reactivex.rxjava4.schedulers.Schedulers;
2627
import io.reactivex.rxjava4.subscribers.TestSubscriber;
2728

@@ -170,6 +171,49 @@ static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source, @NonNu
170171
.toStreamable();
171172
}
172173

174+
/**
175+
* Generates a sequence in order which the stages complete in any form.
176+
* @param <T> the common element type
177+
* @param stages the iterable of stages to be relayed in the order they complete
178+
* @param executor the executor to run the blocking operator
179+
* @return the new Streamable instance
180+
*/
181+
@SuppressWarnings("unchecked")
182+
@NonNull
183+
static <@NonNull T> Streamable<CompletionStage<T>> fromStages(@NonNull Iterable<? extends CompletionStage<? extends T>> stages, ExecutorService executor) {
184+
return create(emitter -> {
185+
var list = new ArrayList<CompletionStage<? extends T>>();
186+
for(var stage : stages) {
187+
list.add(stage);
188+
}
189+
while (list.size() != 0) {
190+
var winner = AwaitCoordinatorStatic.awaitFirstIndex(list, emitter.canceller());
191+
emitter.emit((CompletionStage<T>)list.remove(winner));
192+
}
193+
}, executor);
194+
}
195+
196+
/**
197+
* Emits the elements of each inner sequence produced by the outher sequence.
198+
* @param <T> the common element type
199+
* @param sources the streamable of inner streamables
200+
* @param exec the executorservice where to run the virtual wait
201+
* @return the new Streamable instance.
202+
*/
203+
static <@NonNull T> Streamable<T> concat(Streamable<? extends Streamable<? extends T>> sources, ExecutorService exec) {
204+
return create(emitter -> {
205+
try (var mainSource = sources.forEach(item -> {
206+
try (var innerSource = item.forEach(inner -> {
207+
emitter.emit(inner);
208+
}, emitter.canceller().derive(), exec)) {
209+
innerSource.await(emitter.canceller());
210+
}
211+
}, emitter.canceller(), exec)) {
212+
mainSource.await(emitter.canceller());
213+
};
214+
}, exec);
215+
}
216+
173217
// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
174218
// Operators
175219
// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo

src/main/java/io/reactivex/rxjava4/core/Streamer.java

Lines changed: 4 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414
package io.reactivex.rxjava4.core;
1515

1616
import java.util.*;
17-
import java.util.concurrent.*;
18-
import java.util.function.Function;
17+
import java.util.concurrent.CompletionStage;
1918

20-
import io.reactivex.rxjava4.annotations.*;
19+
import io.reactivex.rxjava4.annotations.NonNull;
2120
import io.reactivex.rxjava4.disposables.*;
21+
import io.reactivex.rxjava4.internal.util.AwaitCoordinator;
2222

2323
/**
2424
* A realized stream which can then be consumed asynchronously in steps.
@@ -31,7 +31,7 @@
3131
* TODO proper docs
3232
* @since 4.0.0
3333
*/
34-
public interface Streamer<@NonNull T> extends AutoCloseable {
34+
public interface Streamer<@NonNull T> extends AutoCloseable, AwaitCoordinator {
3535

3636
// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
3737
// API
@@ -195,79 +195,4 @@ default void awaitFinish() {
195195
default void awaitFinish(@NonNull DisposableContainer cancellation) {
196196
await(finish(cancellation), cancellation);
197197
}
198-
199-
// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
200-
// ASYNC/AWAIT "Language" keyword implementations
201-
// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
202-
203-
/**
204-
* The {@code await} keyword for async/await.
205-
* @param <T> the type of the returned value if any.
206-
* @param stage the stage to await virtual-blockingly
207-
* @return the awaited value
208-
*/
209-
@Nullable
210-
static <T> T await(@NonNull CompletionStage<T> stage) {
211-
return await(stage, null);
212-
}
213-
214-
/**
215-
* The cancellable {@code await} keyword for async/await.
216-
* @param <T> the type of the returned value if any.
217-
* @param stage the stage to await virtual-blockingly
218-
* @param canceller the container that can trigger a cancellation on demand
219-
* @return the awaited value
220-
*/
221-
@Nullable
222-
static <T> T await(@NonNull CompletionStage<T> stage, @Nullable DisposableContainer canceller) {
223-
var f = stage.toCompletableFuture();
224-
if (canceller == null) {
225-
return f.join();
226-
}
227-
var d = Disposable.fromFuture(f, true);
228-
try (var _ = canceller.subscribe(d)) {
229-
return f.join();
230-
}
231-
}
232-
233-
/**
234-
* Runs a function while turning it into a CompletionStage with a canceller supplied too.
235-
* @param <U> the return type of the function
236-
* @param function the function to apply
237-
* @param canceller the canceller to use
238-
* @param executor the executor to use
239-
* @return the new stage
240-
*/
241-
static <U> CompletionStage<U> runStage(Function<DisposableContainer, U> function,
242-
DisposableContainer canceller, Executor executor) {
243-
var loopback = new SerialDisposable();
244-
canceller.add(loopback);
245-
246-
// new Exception().printStackTrace();
247-
248-
var f = CompletableFuture.supplyAsync(() -> {
249-
try {
250-
return function.apply(canceller);
251-
} finally {
252-
canceller.delete(loopback);
253-
}
254-
}, executor);
255-
256-
var d = Disposable.fromFuture(f, true);
257-
loopback.replace(d);
258-
259-
return f;
260-
}
261-
262-
/**
263-
* Runs a function while turning it into a CompletionStage with a canceller supplied too.
264-
* @param <U> the return type of the function
265-
* @param function the function to apply
266-
* @param canceller the canceller to use
267-
* @return the new stage
268-
*/
269-
static <U> CompletionStage<U> runStage(Function<DisposableContainer, U> function,
270-
DisposableContainer canceller) {
271-
return runStage(function, canceller, Executors.newVirtualThreadPerTaskExecutor());
272-
}
273198
}

src/main/java/io/reactivex/rxjava4/disposables/CompositeDisposable.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,4 +268,14 @@ public void reset() {
268268
resources = null;
269269
}
270270
}
271+
272+
@Override
273+
public DisposableContainer derive() {
274+
var result = new CompositeDisposable();
275+
276+
add(result);
277+
result.add(Disposable.fromRunnable(() -> delete(result)));
278+
279+
return result;
280+
}
271281
}

src/main/java/io/reactivex/rxjava4/disposables/DisposableContainer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ public interface DisposableContainer extends Disposable {
5656
*/
5757
void clear();
5858

59+
/**
60+
* Create a derived sub container that can get cancelled by this container,
61+
* but cancelling the subcontainer does not cancel this container.
62+
* @return the derived subcontainer
63+
* @since 4.0
64+
*/
65+
DisposableContainer derive();
66+
5967
/**
6068
* Registers a {@link Disposable} with this container so that it can be removed and disposed
6169
* via a simple {@link #dispose()} call to the returned Disposable.
@@ -133,5 +141,10 @@ public void reset() {
133141
public void clear() {
134142
// Who cares?
135143
}
144+
145+
@Override
146+
public DisposableContainer derive() {
147+
return NEVER;
148+
}
136149
}
137150
}

src/main/java/io/reactivex/rxjava4/internal/disposables/ListCompositeDisposable.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,4 +198,14 @@ public void reset() {
198198
}
199199
}
200200

201+
@Override
202+
public DisposableContainer derive() {
203+
var result = new ListCompositeDisposable();
204+
205+
add(result);
206+
result.add(Disposable.fromRunnable(() -> delete(result)));
207+
208+
return result;
209+
}
210+
201211
}

src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableFromPublisher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import io.reactivex.rxjava4.disposables.DisposableContainer;
2323
import io.reactivex.rxjava4.internal.fuseable.HasUpstreamPublisher;
2424
import io.reactivex.rxjava4.internal.subscriptions.SubscriptionHelper;
25-
import io.reactivex.rxjava4.internal.util.ExceptionHelper;
25+
import io.reactivex.rxjava4.internal.util.*;
2626
import io.reactivex.rxjava4.internal.virtual.VirtualResumable;
2727

2828
public record StreamableFromPublisher<T>(@NonNull Publisher<T> source,
@@ -89,7 +89,7 @@ public void onComplete() {
8989
@Override
9090
public @NonNull CompletionStage<Boolean> next(@NonNull DisposableContainer canceller) {
9191
// System.out.println("next()");
92-
return Streamer.runStage(_ -> {
92+
return AwaitCoordinatorStatic.runStage(_ -> {
9393
item.lazySet(null);
9494
// System.out.println("Requesting the next item");
9595
SubscriptionHelper.deferredRequest(upstream, requester, 1);
@@ -143,7 +143,7 @@ public void onComplete() {
143143
@Override
144144
public @NonNull CompletionStage<Void> finish(@NonNull DisposableContainer cancellation) {
145145
// new Exception("StreamableFromPublisher::finish").printStackTrace();
146-
return Streamer.runStage(_ -> {
146+
return AwaitCoordinatorStatic.runStage(_ -> {
147147
SubscriptionHelper.cancel(upstream);
148148
return null;
149149
}, cancellation, executor);
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.internal.util;
15+
16+
import java.util.concurrent.*;
17+
import java.util.function.Function;
18+
19+
import io.reactivex.rxjava4.annotations.*;
20+
import io.reactivex.rxjava4.disposables.DisposableContainer;
21+
22+
/**
23+
* Static methods to coordinate {@link CompletionStage}s for various operators.
24+
*/
25+
public interface AwaitCoordinator {
26+
27+
/**
28+
* The {@code await} keyword for async/await.
29+
* @param <T> the type of the returned value if any.
30+
* @param stage the stage to await virtual-blockingly
31+
* @return the awaited value
32+
*/
33+
@Nullable
34+
default <T> T await(@NonNull CompletionStage<T> stage) {
35+
return AwaitCoordinatorStatic.await(stage, null);
36+
}
37+
38+
/**
39+
* The cancellable {@code await} keyword for async/await.
40+
* @param <T> the type of the returned value if any.
41+
* @param stage the stage to await virtual-blockingly
42+
* @param canceller the container that can trigger a cancellation on demand
43+
* @return the awaited value
44+
*/
45+
@Nullable
46+
default <T> T await(@NonNull CompletionStage<T> stage, @Nullable DisposableContainer canceller) {
47+
return AwaitCoordinatorStatic.await(stage, canceller);
48+
}
49+
50+
/**
51+
* Runs a function while turning it into a CompletionStage with a canceller supplied too.
52+
* @param <U> the return type of the function
53+
* @param function the function to apply
54+
* @param canceller the canceller to use
55+
* @param executor the executor to use
56+
* @return the new stage
57+
*/
58+
default <U> CompletionStage<U> runStage(Function<DisposableContainer, U> function,
59+
DisposableContainer canceller, Executor executor) {
60+
return AwaitCoordinatorStatic.<U>runStage(function, canceller, executor);
61+
}
62+
63+
/**
64+
* Runs a function while turning it into a CompletionStage with a canceller supplied too.
65+
* @param <U> the return type of the function
66+
* @param function the function to apply
67+
* @param canceller the canceller to use
68+
* @return the new stage
69+
*/
70+
default <U> CompletionStage<U> runStage(Function<DisposableContainer, U> function,
71+
DisposableContainer canceller) {
72+
return runStage(function, canceller, Executors.newVirtualThreadPerTaskExecutor());
73+
}
74+
}

0 commit comments

Comments
 (0)