Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs/src/main/paradox/stream/operators/Sink/asJavaStream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Sink.asJavaStream

Create a sink which materializes into Java 8 `Stream` that can be run to trigger demand through the sink.

@ref[Sink operators](../index.md#sink-operators)

## Signature

@apidoc[Sink.asJavaStream](Sink$) { scala="#asJavaStream[T]():org.apache.pekko.stream.scaladsl.Sink[T,java.util.stream.Stream[T]]" java="#asJavaStream()" }

## Description

Create a sink which materializes into Java 8 `Stream` that can be run to trigger demand through the sink.
Elements emitted through the stream will be available for reading through the Java 8 `Stream`.

The Java 8 `Stream` will be ended when the stream flowing into this `Sink` completes, and closing the Java
`Stream` will cancel the inflow of this `Sink`. If the Java `Stream` throws an exception, the Pekko stream is cancelled.

Be aware that Java `Stream` blocks current thread while waiting on next element from downstream.

## Example

Here is an example of a @apidoc[Sink] that materializes into a @javadoc[java.util.stream.Stream](java.util.stream.Stream).

Scala
: @@snip [StreamConvertersToJava.scala](/docs/src/test/scala/docs/stream/operators/converters/StreamConvertersToJava.scala) { #import #asJavaStreamOnSink }

Java
: @@snip [StreamConvertersToJava.java](/docs/src/test/java/jdocs/stream/operators/converters/StreamConvertersToJava.java) { #import #asJavaStreamOnSink }


## Reactive Streams semantics

@@@div { .callout }
**cancels** when the Java Stream is closed

**backpressures** when no read is pending on the Java Stream
@@@
2 changes: 2 additions & 0 deletions docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl

| |Operator|Description|
|--|--|--|
|Sink|<a name="asjavastream"></a>@ref[asJavaStream](Sink/asJavaStream.md)|Create a sink which materializes into Java 8 `Stream` that can be run to trigger demand through the sink.|
|Sink|<a name="aspublisher"></a>@ref[asPublisher](Sink/asPublisher.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.|
|Sink|<a name="cancelled"></a>@ref[cancelled](Sink/cancelled.md)|Immediately cancel the stream|
|Sink|<a name="collect"></a>@ref[collect](Sink/collect.md)|Collect all input elements using a Java @javadoc[Collector](java.util.stream.Collector).|
Expand Down Expand Up @@ -397,6 +398,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [alsoToAll](Source-or-Flow/alsoToAll.md)
* [asFlowWithContext](Flow/asFlowWithContext.md)
* [asInputStream](StreamConverters/asInputStream.md)
* [asJavaStream](Sink/asJavaStream.md)
* [asJavaStream](StreamConverters/asJavaStream.md)
* [ask](Source-or-Flow/ask.md)
* [ask](ActorFlow/ask.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ public void demonstrateConverterToJava8Stream() {
assertEquals(5, jStream.count());
}

@Test
public void demonstrateConverterToJava8StreamOnSink() {
// #asJavaStreamOnSink

Source<Integer, NotUsed> source = Source.range(0, 9).filter(i -> i % 2 == 0);
Stream<Integer> jStream = source.runWith(Sink.asJavaStream(), system);

// #asJavaStreamOnSink
assertEquals(5, jStream.count());
}

@Test
public void demonstrateCreatingASourceFromJava8Stream()
throws InterruptedException, ExecutionException, TimeoutException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ class StreamConvertersToJava extends PekkoSpec with Futures {
jStream.count should be(5)
}

"demonstrate materialization to Java8 streams with methods on Sink" in {
// #asJavaStreamOnSink
val source: Source[Int, NotUsed] = Source(0 to 9).filter(_ % 2 == 0)

val jStream: java.util.stream.Stream[Int] = source.runWith(Sink.asJavaStream[Int]())
// #asJavaStreamOnSink
jStream.count should be(5)
}

"demonstrate conversion from Java8 streams" in {
// #fromJavaStream
def factory(): IntStream = IntStream.rangeClosed(0, 9)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,12 @@ public void mustBeAbleToUseAsJavaStream() throws Exception {
java.util.stream.Stream<Integer> javaStream = Source.from(list).runWith(streamSink, system);
assertEquals(list, javaStream.collect(Collectors.toList()));
}

@Test
public void mustBeAbleToUseAsJavaStreamOnSink() throws Exception {
final List<Integer> list = Arrays.asList(1, 2, 3);
java.util.stream.Stream<Integer> javaStream =
Source.from(list).runWith(Sink.asJavaStream(), system);
assertEquals(list, javaStream.collect(Collectors.toList()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ class SinkAsJavaStreamSpec extends StreamSpec(UnboundedMailboxConfig) {
"work in happy case" in {
val javaSource = Source(1 to 100).runWith(StreamConverters.asJavaStream())
javaSource.count() should ===(100L)
//
Source(1 to 100).runWith(Sink.asJavaStream())
.count() should ===(100L)
}

"fail if parent stream is failed" in {
Expand Down
17 changes: 17 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,23 @@ object Sink {
def asPublisher[T](fanout: AsPublisher): Sink[T, Publisher[T]] =
new Sink(scaladsl.Sink.asPublisher(fanout == AsPublisher.WITH_FANOUT))

/**
* Creates a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink.
* Elements emitted through the stream will be available for reading through the Java 8 ``Stream``.
*
* The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java
* ``Stream`` will cancel the inflow of this ``Sink``.
*
* Java 8 ``Stream`` throws exception in case reactive stream failed.
*
* Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream.
* As it is interacting with blocking API the implementation runs on a separate dispatcher
* configured through the ``pekko.stream.blocking-io-dispatcher``.
*
* @since 2.0.0
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add @since 2.0.0

def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = new Sink(scaladsl.StreamConverters.asJavaStream())

/**
* A `Sink` that will invoke the given procedure for each received element. The sink is materialized
* into a [[java.util.concurrent.CompletionStage]] which will be completed with `Success` when reaching the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ object StreamConverters {
* Java 8 ``Stream`` throws exception in case reactive stream failed.
*
* Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream.
* As it is interacting wit blocking API the implementation runs on a separate dispatcher
* As it is interacting with blocking API the implementation runs on a separate dispatcher
* configured through the ``pekko.stream.blocking-io-dispatcher``.
*/
def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = new Sink(scaladsl.StreamConverters.asJavaStream())
Expand Down
17 changes: 17 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,23 @@ object Sink {
if (fanout) new FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, shape("FanoutPublisherSink"))
else new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink")))

/**
* Creates a sink which materializes into Java 8 ``Stream`` that can be run to trigger demand through the sink.
* Elements emitted through the stream will be available for reading through the Java 8 ``Stream``.
*
* The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java
* ``Stream`` will cancel the inflow of this ``Sink``.
*
* If the Java 8 ``Stream`` throws exception the Pekko stream is cancelled.
*
* Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream.
* As it is interacting with blocking API the implementation runs on a separate dispatcher
* configured through the ``pekko.stream.blocking-io-dispatcher``.
*
* @since 2.0.0
*/
def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = StreamConverters.asJavaStream()

/**
* A `Sink` that will consume the stream and discard the elements.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ object StreamConverters {
* If the Java 8 ``Stream`` throws exception the Pekko stream is cancelled.
*
* Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream.
* As it is interacting wit blocking API the implementation runs on a separate dispatcher
* As it is interacting with blocking API the implementation runs on a separate dispatcher
* configured through the ``pekko.stream.blocking-io-dispatcher``.
*/
def asJavaStream[T](): Sink[T, java.util.stream.Stream[T]] = {
Expand Down