diff --git a/docs/src/main/paradox/stream/operators/Sink/asJavaStream.md b/docs/src/main/paradox/stream/operators/Sink/asJavaStream.md new file mode 100644 index 00000000000..e9fb8de6139 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Sink/asJavaStream.md @@ -0,0 +1,38 @@ +# Sink.asJavaStream + +Create a sink which materializes into Java 8 `Stream` that can be run to trigger demand through the sink. + +@ref[Sink operators](../index.md#sink-operators) + +## Signature + +@apidoc[Sink.asJavaStream](Sink$) { scala="#asJavaStream[T]():org.apache.pekko.stream.scaladsl.Sink[T,java.util.stream.Stream[T]]" java="#asJavaStream()" } + +## Description + +Create a sink which materializes into Java 8 `Stream` that can be run to trigger demand through the sink. +Elements emitted through the stream will be available for reading through the Java 8 `Stream`. + +The Java 8 `Stream` will be ended when the stream flowing into this `Sink` completes, and closing the Java +`Stream` will cancel the inflow of this `Sink`. If the Java `Stream` throws an exception, the Pekko stream is cancelled. + +Be aware that Java `Stream` blocks current thread while waiting on next element from downstream. + +## Example + +Here is an example of a @apidoc[Sink] that materializes into a @javadoc[java.util.stream.Stream](java.util.stream.Stream). + +Scala +: @@snip [StreamConvertersToJava.scala](/docs/src/test/scala/docs/stream/operators/converters/StreamConvertersToJava.scala) { #import #asJavaStreamOnSink } + +Java +: @@snip [StreamConvertersToJava.java](/docs/src/test/java/jdocs/stream/operators/converters/StreamConvertersToJava.java) { #import #asJavaStreamOnSink } + + +## Reactive Streams semantics + +@@@div { .callout } +**cancels** when the Java Stream is closed + +**backpressures** when no read is pending on the Java Stream +@@@ diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index c1cd3e21e07..1f345fa0eac 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -50,6 +50,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl | |Operator|Description| |--|--|--| +|Sink|@ref[asJavaStream](Sink/asJavaStream.md)|Create a sink which materializes into Java 8 `Stream` that can be run to trigger demand through the sink.| |Sink|@ref[asPublisher](Sink/asPublisher.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.| |Sink|@ref[cancelled](Sink/cancelled.md)|Immediately cancel the stream| |Sink|@ref[collect](Sink/collect.md)|Collect all input elements using a Java @javadoc[Collector](java.util.stream.Collector).| @@ -397,6 +398,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [alsoToAll](Source-or-Flow/alsoToAll.md) * [asFlowWithContext](Flow/asFlowWithContext.md) * [asInputStream](StreamConverters/asInputStream.md) +* [asJavaStream](Sink/asJavaStream.md) * [asJavaStream](StreamConverters/asJavaStream.md) * [ask](Source-or-Flow/ask.md) * [ask](ActorFlow/ask.md) diff --git a/docs/src/test/java/jdocs/stream/operators/converters/StreamConvertersToJava.java b/docs/src/test/java/jdocs/stream/operators/converters/StreamConvertersToJava.java index 7bb5c9a3659..c900a7ab6d0 100644 --- a/docs/src/test/java/jdocs/stream/operators/converters/StreamConvertersToJava.java +++ b/docs/src/test/java/jdocs/stream/operators/converters/StreamConvertersToJava.java @@ -66,6 +66,17 @@ public void demonstrateConverterToJava8Stream() { assertEquals(5, jStream.count()); } + @Test + public void demonstrateConverterToJava8StreamOnSink() { + // #asJavaStreamOnSink + + Source source = Source.range(0, 9).filter(i -> i % 2 == 0); + Stream jStream = source.runWith(Sink.asJavaStream(), system); + + // #asJavaStreamOnSink + assertEquals(5, jStream.count()); + } + @Test public void demonstrateCreatingASourceFromJava8Stream() throws InterruptedException, ExecutionException, TimeoutException { diff --git a/docs/src/test/scala/docs/stream/operators/converters/StreamConvertersToJava.scala b/docs/src/test/scala/docs/stream/operators/converters/StreamConvertersToJava.scala index dfe5dc1791a..168b1526b22 100644 --- a/docs/src/test/scala/docs/stream/operators/converters/StreamConvertersToJava.scala +++ b/docs/src/test/scala/docs/stream/operators/converters/StreamConvertersToJava.scala @@ -43,6 +43,15 @@ class StreamConvertersToJava extends PekkoSpec with Futures { jStream.count should be(5) } + "demonstrate materialization to Java8 streams with methods on Sink" in { + // #asJavaStreamOnSink + val source: Source[Int, NotUsed] = Source(0 to 9).filter(_ % 2 == 0) + + val jStream: java.util.stream.Stream[Int] = source.runWith(Sink.asJavaStream[Int]()) + // #asJavaStreamOnSink + jStream.count should be(5) + } + "demonstrate conversion from Java8 streams" in { // #fromJavaStream def factory(): IntStream = IntStream.rangeClosed(0, 9) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/io/SinkAsJavaSourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/io/SinkAsJavaSourceTest.java index 054e33a6c4d..e56d3ea4cd3 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/io/SinkAsJavaSourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/io/SinkAsJavaSourceTest.java @@ -44,4 +44,12 @@ public void mustBeAbleToUseAsJavaStream() throws Exception { java.util.stream.Stream javaStream = Source.from(list).runWith(streamSink, system); assertEquals(list, javaStream.collect(Collectors.toList())); } + + @Test + public void mustBeAbleToUseAsJavaStreamOnSink() throws Exception { + final List list = Arrays.asList(1, 2, 3); + java.util.stream.Stream javaStream = + Source.from(list).runWith(Sink.asJavaStream(), system); + assertEquals(list, javaStream.collect(Collectors.toList())); + } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkAsJavaStreamSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkAsJavaStreamSpec.scala index a9ace43651d..a15ee557d11 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkAsJavaStreamSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkAsJavaStreamSpec.scala @@ -32,6 +32,9 @@ class SinkAsJavaStreamSpec extends StreamSpec(UnboundedMailboxConfig) { "work in happy case" in { val javaSource = Source(1 to 100).runWith(StreamConverters.asJavaStream()) javaSource.count() should ===(100L) + // + Source(1 to 100).runWith(Sink.asJavaStream()) + .count() should ===(100L) } "fail if parent stream is failed" in { diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index 76db0ef98c6..c16cd76dcc7 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -212,6 +212,23 @@ object Sink { def asPublisher[T](fanout: AsPublisher): Sink[T, Publisher[T]] = new Sink(scaladsl.Sink.asPublisher(fanout == AsPublisher.WITH_FANOUT)) + /** + * Creates a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink. + * Elements emitted through the stream will be available for reading through the Java 8 ``Stream``. + * + * The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java + * ``Stream`` will cancel the inflow of this ``Sink``. + * + * Java 8 ``Stream`` throws exception in case reactive stream failed. + * + * Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream. + * As it is interacting with blocking API the implementation runs on a separate dispatcher + * configured through the ``pekko.stream.blocking-io-dispatcher``. + * + * @since 2.0.0 + */ + def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = new Sink(scaladsl.StreamConverters.asJavaStream()) + /** * A `Sink` that will invoke the given procedure for each received element. The sink is materialized * into a [[java.util.concurrent.CompletionStage]] which will be completed with `Success` when reaching the diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/StreamConverters.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/StreamConverters.scala index 6a51f9cd604..e4d3f7709d0 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/StreamConverters.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/StreamConverters.scala @@ -184,7 +184,7 @@ object StreamConverters { * Java 8 ``Stream`` throws exception in case reactive stream failed. * * Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream. - * As it is interacting wit blocking API the implementation runs on a separate dispatcher + * As it is interacting with blocking API the implementation runs on a separate dispatcher * configured through the ``pekko.stream.blocking-io-dispatcher``. */ def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = new Sink(scaladsl.StreamConverters.asJavaStream()) diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index 5112a69db2b..4a0e214f123 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -292,6 +292,23 @@ object Sink { if (fanout) new FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, shape("FanoutPublisherSink")) else new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink"))) + /** + * Creates a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink. + * Elements emitted through the stream will be available for reading through the Java 8 ``Stream``. + * + * The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java + * ``Stream`` will cancel the inflow of this ``Sink``. + * + * If the Java 8 ``Stream`` throws exception the Pekko stream is cancelled. + * + * Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream. + * As it is interacting with blocking API the implementation runs on a separate dispatcher + * configured through the ``pekko.stream.blocking-io-dispatcher``. + * + * @since 2.0.0 + */ + def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = StreamConverters.asJavaStream() + /** * A `Sink` that will consume the stream and discard the elements. */ diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/StreamConverters.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/StreamConverters.scala index 7b1a743889a..445e34bdafe 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/StreamConverters.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/StreamConverters.scala @@ -181,7 +181,7 @@ object StreamConverters { * If the Java 8 ``Stream`` throws exception the Pekko stream is cancelled. * * Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream. - * As it is interacting wit blocking API the implementation runs on a separate dispatcher + * As it is interacting with blocking API the implementation runs on a separate dispatcher * configured through the ``pekko.stream.blocking-io-dispatcher``. */ def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = {