Skip to content

Conversation

He-Pin
Copy link
Member

@He-Pin He-Pin commented Sep 6, 2025

Motivation:
Make code much easy to find for daily job.
refs: https://smallrye.io/smallrye-mutiny/latest/guides/reactive-to-imperative/#iterating-over-multis-items

which has a asStream on Mutiny.

@He-Pin He-Pin added the t:stream Pekko Streams label Sep 6, 2025
@He-Pin He-Pin added this to the 2.0.0-M1 milestone Sep 6, 2025
Copy link
Member

@raboof raboof left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This definitely makes it easier to find.

This does beg the question whether we should move and deprecate all methods in https://pekko.apache.org/docs/pekko/current/stream/operators/index.html#additional-sink-and-source-converters .

The warning there (which is also in your javadoc/scaladoc) is that this will use blocking operations. That could be a reason to discourage this API, and putting them in the separate StreamConverters might be a good way to do that. So from that perspective, perhaps having them in StreamConverters (and thus harder to find) might be a good thing?

* Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream.
* As it is interacting wit blocking API the implementation runs on a separate dispatcher
* configured through the ``pekko.stream.blocking-io-dispatcher``.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add @since 2.0.0

@He-Pin
Copy link
Member Author

He-Pin commented Sep 6, 2025

I think the asStream can be non-blocking, only triggering a pull when there is a hasNext request, and letting the poller who consumes the Java stream wait on the condition, the current is a bit complex but still never block the queue, but the poller, so this method should be safe.

Does this method live outside before Java 8?

@He-Pin
Copy link
Member Author

He-Pin commented Sep 6, 2025

@raboof As a Java developer, I think that's bad to put them on StreamConverters, users need to remember too many, today, LLM can only handle less mcp tools and mcp servers, so does humans.

I think Pekko should make code smooth brain just as ZIO and Spring :), which actually will cover 80% the daily usage.

@raboof
Copy link
Member

raboof commented Sep 6, 2025

I think the asStream can be non-blocking, only triggering a pull when there is a hasNext request, and letting the poller who consumes the Java stream wait on the condition, the current is a bit complex but still never block the queue, but the poller, so this method should be safe.

Gotcha, so asInputStream and asOutputStream are dangerous but asJavaStream is safe? That might be a good reason to expose it directly on Sink.

I see the original asJavaStream already had the warning "Be aware that Java Stream blocks current thread while waiting on next element from downstream.". That sounds confusing to me. Is it waiting for an element from upstream, or demand from downstream?

I see you have added "As it is interacting wit blocking API the implementation runs on a separate dispatcher configured through the pekko.stream.blocking-io-dispatcher." to the javadoc/scaladoc, but which implementation is that?

I think Pekko should make code smooth brain just as ZIO and Spring :), which actually will cover 80% the daily usage.

I agree, we should be making things "as simple as possible, but not simpler" 😄

@mdedetrich
Copy link
Contributor

This definitely makes it easier to find.

This does beg the question whether we should move and deprecate all methods in https://pekko.apache.org/docs/pekko/current/stream/operators/index.html#additional-sink-and-source-converters .

The warning there (which is also in your javadoc/scaladoc) is that this will use blocking operations. That could be a reason to discourage this API, and putting them in the separate StreamConverters might be a good way to do that. So from that perspective, perhaps having them in StreamConverters (and thus harder to find) might be a good thing?

I would be against removing these blocking methods, as I have in the past used the frequently with other API's that only work with InputStream/OutputStream which is very common in Java land.

There is nothing you can do about these methods being blocking as the underlying InputStream/OutputStream are also blocking, it is what it is. There are the annoying gotcha's, i.e. its easy to make deadlocks because actor stream materialization cannot be blocking but this is the price we have to pay when working with Java API's

@mdedetrich
Copy link
Contributor

mdedetrich commented Sep 6, 2025

I see the original asJavaStream already had the warning "Be aware that Java Stream blocks current thread while waiting on next element from downstream.". That sounds confusing to me. Is it waiting for an element from upstream, or demand from downstream?

The Java Stream (talking about InputStream/OutputStream) itself blocks while its waiting for the next element (i.e. it stalls the current thread) which means its easy to make a deadlock if you expose the InputStream/OutputStream as a materialized value/element in the stream, as the materialized value needs to evaluate immediately (i.e. no blocking) before the stream starts but the pekko stream cannot start because the InputStream blocks until a receives the element (which it will never receive as the pekko stream hasn't started yet), hence the deadlock.

I experienced this myself, not sure what the easiest way to deal with this is from an API don't shoot yourself in the foot perspective. This is why the current API is designed so you only get the reference from an InputStream/OutputStream as a Sink (i.e. last step in the pekko stream) as that design with the sink makes it difficult for you to use the InputStream/OutputStream within the pekko stream itself

I am not sure about the new Java 1.8 Stream API is blocking or not, we have to be careful here.

@He-Pin
Copy link
Member Author

He-Pin commented Sep 6, 2025

We are currently using Spring-Ai and MCP SDK at work, which are totally built the the reactor-core. I always wondered, why can't it be Pekko-stream.

And nowadays, the virtual thread makes direct style programming possible, see how ox-stream and ox-actor are being implemented, yes, the blocking should be managed, but may not be a problem when you are running the actor/stream with virtual thread.

There is a Jox library being implemented with pure java, so I think Pekko Stream should be quite simple and friendly for Java developers too.
image
This code is from a system where we leverage pekko stream to do translation for Taobao English, you can see the long

withAttributes(
                ActorAttributes.withSupervisionStrategy(
                    (org.apache.pekko.japi.function.Function<Throwable, Supervision.Directive>) param -> {
                        log.error("AsyncOvsTranslator error", param);
                        return (Supervision.Directive) Supervision.resume();
                    }))

In Reactor-core will just be an onErrorContinue.

@mdedetrich
Copy link
Contributor

We are currently using Spring-Ai and MCP SDK at work, which are totally built the the reactor-core. I always wondered, why can't it be Pekko-stream.

And nowadays, the virtual thread makes direct style programming possible, see how ox-stream and ox-actor are being implemented, yes, the blocking should be managed, but may not be a problem when you are running the actor/stream with virtual thread.

There is a Jox library being implemented with pure java, so I think Pekko Stream should be quite simple and friendly for Java developers too. image This code is from a system where we leverage pekko stream to do translation for Taobao English, you can see the long

withAttributes(
                ActorAttributes.withSupervisionStrategy(
                    (org.apache.pekko.japi.function.Function<Throwable, Supervision.Directive>) param -> {
                        log.error("AsyncOvsTranslator error", param);
                        return (Supervision.Directive) Supervision.resume();
                    }))

In Reactor-core will just be an onErrorContinue.

Whether you use loom or not is not relevant here, the question is does the new Java 1.8 stream block the thread while its waiting for a new element? If it does (which I do think is the case as I see no evidence from the API that its properly async), then its just as dangerous as the InputStream/OutputStream converters.

@raboof
Copy link
Member

raboof commented Sep 6, 2025

The warning there (which is also in your javadoc/scaladoc) is that this will use blocking operations. That could be a reason to discourage this API, and putting them in the separate StreamConverters might be a good way to do that. So from that perspective, perhaps having them in StreamConverters (and thus harder to find) might be a good thing?

I would be against removing these blocking methods, as I have in the past used the frequently with other API's that only work with InputStream/OutputStream which is very common in Java land.

I'm definitely not suggesting removing the blocking methods, just 'not moving them to Sink'. I guess I should have said "keeping them in the separate StreamConverters" rather than "putting them in the separate StreamConverters" ;)

@He-Pin
Copy link
Member Author

He-Pin commented Sep 6, 2025

@mdedetrich You can give it a try, this code will work ok if the event loop is a virtual thread; otherwise, dead deadlock.
netty/netty#15405

At work, Ajdk( a JDK backed on Dragonwell) 21 backported and fixed many issues around Loom, and now we can safely use Loom. I do encounter some deadlocks when using Loom, but now it's all fixed, I think that's why OX is starting direct style.

@mdedetrich
Copy link
Contributor

@mdedetrich You can give it a try, this code will work ok if the event loop is a virtual thread; otherwise, dead deadlock. netty/netty#15405

I know that it won't block if you use a VirtualThread but we cannot force people to use VirtualThread so if it blocks on standard usage (which it does) then its as unsafe as the other stream converters and for this reason you need to follow the same pattern, i.e. only give the reference to a Java 1.8 Stream as a final value in a Sink otherwise people will almost always deadlock themselves when using the API.

There is a reason why the current StreamCoverters are designed the way they are.

@He-Pin
Copy link
Member Author

He-Pin commented Sep 6, 2025

When you are running inside a Virtual thread, and then you are blocked by, let's say, a Lock, and then the virtual thread is parked and unmounted, so the carrier thread can execute other things, has the virtual thread been blocked? yes, it cann't make any progress until it is unparked, and no, the carrier thread is still running, just some memory copying.

You can try this with Pekko 1.2, set the max thread count to 1 and turn on virualization = on.

@mdedetrich
Copy link
Contributor

mdedetrich commented Sep 6, 2025

When you are running inside a Virtual thread, and then you are blocked by, let's say, a Lock, and then the virtual thread is parked and unmounted, so the carrier thread can execute other things, has the virtual thread been blocked? yes, it cann't make any progress until it is unparked, and no, the carrier thread is still running, just some memory copying.

You can try this with Pekko 1.2, set the max thread count to 1 and turn on virualization = on.

I am aware that Virtual Threads solve the issues, the point is we don't know and we cannot force people to always use VirtualThreads so the API cannot be designed assuming that people will always use a VirtualThread. We could make a separate simpler API that is designed for people that use VirtualThread and somehow validates that its execution is on a VirtualThread but for the base API, we cannot assume its running on a VirtualThread and causing certain deadlocks when a user is not using VirtualThread is a terrible experience.

@He-Pin
Copy link
Member Author

He-Pin commented Sep 6, 2025

  1. There's already a declaration here.
  2. Java 25 is coming soon, and managed blocking is a thing of the past.
  3. The API needs to be kept simple and reasonable. If you think this is wrong, why does smallrye-mutiny have an asStream method? and reactor.core.publisher.Flux#toStream()

I think these APIs historically existed because Lightbend advocated that blocking should be managed, so reactive, and you can see, other reactive libraries which have many stars, just make these api easy to find.

@mdedetrich
Copy link
Contributor

mdedetrich commented Sep 6, 2025

  1. There's already a declaration here.

Yes the PR exposes Java 1.8 Stream in a Sink so it should be fine, but the same pitfalls with the other stream converter methods also needs to be documented. I'll check in more detail when I have access to laptop.

  1. Java 25 is coming soon, and managed blocking is a thing of the past.

Irrelevant, not everyone is running on JVM 25 and people run Pekko in other environments.

  1. The API needs to be kept simple and reasonable. If you think this is wrong, why does smallrye-mutiny have an asStream method? and reactor.core.publisher.Flux#toStream()

I don't know how mutiny works. All I am saying is that a core design of Pekko Streams specifically is that it's built on the assumption that everything is purely asynchronous which is why as I explained before that materialized values/stream elements can never block and since we can't enforce that users will always 100% of the time use VirtualThread we have to deal with this.

Unfortunately this was the issue with Java not having a core async/concurrency primitive when it was released (in contrast to Kotkin which has coroutines and Go which has channels) and it took 30 years to release an officially supported one (although you can argue that CompletionStage is that primitive on which case it's sooner)

That actually opens up another set of questions, we may be able to simplify the current stream converters API by wrapping the results in a CompletionStage. That way the API is easier to work with (the current design of it being in a Sink is a pita to work with) and we may be able to avoid these deadlock issues.

I am not against have another API which is designed for VirtualThread and is even simpler, it would just have to assert that it's being run on a VirtualThread and crash with a helpful error message if it's not

@He-Pin He-Pin force-pushed the asJavaStream branch 2 times, most recently from 5df6af3 to e0bc6de Compare September 6, 2025 10:28
@He-Pin
Copy link
Member Author

He-Pin commented Sep 6, 2025

Kotlin's coroutines are fake, even worse than Java 21's virtual threads. I added this API quite cautiously. You see, I didn't add toOutputStream to Sink. If other libraries had these features, a new user might see that Pekko Stream doesn't have this feature and choose another library.

It seems you're planning to build a Kotlin-specific DSL for Pekko. While that's great, it also presents significant challenges, as it will have to compete with Kotlin's Flow API. It's a real challenge to have something that others don't have, and to be superior to others.

@mdedetrich
Copy link
Contributor

mdedetrich commented Sep 6, 2025

Kotlin's coroutines are fake, even worse than Java 21's virtual threads. I added this API quite cautiously. You see, I didn't add toOutputStream to Sink. If other libraries had these features, a new user might see that Pekko Stream doesn't have this feature and choose another library.

I don't know what you mean by fake, but coroutines are by far the best implementation of an async concurrent primitive out of any of the JVM languages. Its performance is far higher than Loom as coroutines just compile do jump statements in JVM bytecode, it supports automatic cancellation of resources along with cancellation of coroutines and to boot it's also supported on all other kotlin platforms (native/js) with zero performance overhead.

Loom on the hand only works on newer JVMs and is slower, the only good thing about it is that it uses the same programming model as threads but that is also what makes it slow (i.e. it needs to maintain stack traces).

Akka/Pekko actors are ironically faster than Loom as a concurrency primitive as you only pass around pure values.

It seems you're planning to build a Kotlin-specific DSL for Pekko. While that's great, it also presents significant challenges, as it will have to compete with Kotlin's Flow API. It's a real challenge to have something that others don't have, and to be superior to others.

I am planning to build a kotlin but what I am saying is not specifically because of this, it's an issue with Java as a language.

@He-Pin
Copy link
Member Author

He-Pin commented Sep 6, 2025

If we have an AsyncStream/AsyncQueue, then the problem can be simpler, which pulls always return a Future.

@He-Pin
Copy link
Member Author

He-Pin commented Sep 6, 2025

  1. Is that true? then why it's not the 1st of the JVM in 2024-04-13 i9-13900KF (32) @ 5.500GHz, 64G RAM LesnyRumcajs/grpc_bench#441 ?
  2. it will require a Dispatchers.io to run the blocking code, let kotlin to handle old java file and Socket, its slow.
  3. what's the color your function problem is a no for me; that's why Loom is much better than kotlin one.
  4. Akka/Pekko will be slow if it's been blocked than loom, the only way is running the dispatcher with Loom.

We are using Kotlin at work, some part of our server and our ios and android sharing the same codebase, but on server side, mostly using Java 21 nowadays.

@He-Pin He-Pin requested a review from Copilot September 6, 2025 10:52
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds a new asJavaStream method to the Sink API, providing a convenience method for converting Pekko streams into Java 8 Streams. This method delegates to the existing StreamConverters.asJavaStream() functionality.

  • Adds asJavaStream() method to both Scala and Java Sink APIs
  • Updates documentation with new method and examples
  • Adds test coverage for the new functionality

Reviewed Changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala Adds asJavaStream method to Scala Sink API
stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala Adds asJavaStream method to Java Sink API
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkAsJavaStreamSpec.scala Adds test case for new Scala API method
stream-tests/src/test/java/org/apache/pekko/stream/io/SinkAsJavaSourceTest.java Adds test case for new Java API method
docs/src/test/scala/docs/stream/operators/converters/StreamConvertersToJava.scala Adds documentation example for Scala API
docs/src/test/java/jdocs/stream/operators/converters/StreamConvertersToJava.java Adds documentation example for Java API
docs/src/main/paradox/stream/operators/index.md Updates operator index with new method
docs/src/main/paradox/stream/operators/Sink/asJavaStream.md Creates dedicated documentation page for the new method

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@mdedetrich
Copy link
Contributor

mdedetrich commented Sep 6, 2025

  1. Is that true? then why it's not the 1st of the JVM in 2024-04-13 i9-13900KF (32) @ 5.500GHz, 64G RAM LesnyRumcajs/grpc_bench#441 ?

I don't know, I would have to look into it. I am saying that the co-routine compiler transformations are well known and understood and they just translate to raw JVM bytecode statements jsr/goto, you can't really beat that.

Even though its not really used, scala had its own coroutine plugin as part of the compiler and it also did the same and it beat every other Scala IO runtime hands down.

As you said, coroutines are slower if they have to interact with blocking Java code and kotlin-grpc might be doing that. On the other hand, if you have a purely async non blocking implementation like akka/pekko then it should be as fast, if not faster. Those same benchmarks show that akka/pekko grpc is just as fast as java vertx and the fact that java vertx is not meaningfully faster than akka/pekko grpc actually shows how Loom is not that fast as Loom has full runtime support in the JVM where as pekko/akka does not, as it implements actors as a library. If JVM implemented actors natively in JVM, then pekko/actor would be much faster, moerso than loom. There are lots of tricks that you can use to make actors very fast which Erlang does but cannot be done with akka/pekko as we don't have full JVM support (we however might be able to make actors even faster with newer versions of JVM as they are giving us more tools).

  1. it will require a Dispatchers.io to run the blocking code, let kotlin to handle old java file and Socket, its slow.

Sure if you need to interact with old blocking Java code it will be slower, the solution here is to not write/use blocking code. Thats why akka/pekko itself was written completely with async in mind and also why its as fast as java vertx 😉

  1. what's the color your function problem is a no for me; that's why Loom is much better than kotlin one.

True, I am just saying there are tradeoffs. If you don't want to deal with the color function problem then you can use loom, I am just saying not everyone is in that boat.

  1. Akka/Pekko will be slow if it's been blocked than loom, the only way is running the dispatcher with Loom.

We are using Kotlin at work, some part of our server and our ios and android sharing the same codebase, but on server side, mostly using Java 21 nowadays.

We are going on a bit of a tangent here though, my point is that not everyone who uses akka/pekko is using loom and akka/pekko is not a loom only API design. As I said, I don't have issues with making an API specific for loom users if it makes their life simpler from an API perspective, its just that we need to check for that because if you don't happen to be using loom then the code will just deadlock and thats a terrible user experience.

@mdedetrich
Copy link
Contributor

Oh and on yet another note, those benchmarks at LesnyRumcajs/grpc_bench#441 show akka being slightly faster than pekko, which means there are more performance improvements on the table for pekko. It might even be possible to make akka/pekko beat vert.x with new JVM capabilities (although we would have to use multi jar for that)

@He-Pin
Copy link
Member Author

He-Pin commented Sep 6, 2025

@mdedetrich maybe a new run will show some different.

@He-Pin
Copy link
Member Author

He-Pin commented Sep 6, 2025

Btw, there are already a Source#fromJavaStream so it's fair to have a Sink#asJavaStream too.

@He-Pin He-Pin requested review from pjfanning and raboof September 7, 2025 08:14
@raboof
Copy link
Member

raboof commented Sep 8, 2025

The current implementation uses Await.result in Iterator.hasNext. It might be possible to create an implementation that avoids blocking - or at least avoids it as long as the user doesn't explicitly trigger operations that are blocking 'by nature'. I think we should do that before adding it to Sink.

@mdedetrich
Copy link
Contributor

mdedetrich commented Sep 8, 2025

The current implementation uses Await.result in Iterator.hasNext. It might be possible to create an implementation that avoids blocking - or at least avoids it as long as the user doesn't explicitly trigger operations that are blocking 'by nature'. I think we should do that before adding it to Sink.

I would go further and say that Await.result should never be used in production code with the only real exception I can think of as being shut down of the main application/ActorSystem.

@He-Pin
Copy link
Member Author

He-Pin commented Sep 8, 2025

But reactor-core has that on Flux and no problem.

@mdedetrich
Copy link
Contributor

Actually now that I think about it, if its not possible to implement this function without blocking (i.e. Await.result) it might make sense to do what I asked before and only have this method available for loom/VirtualThread users, otherwise its way too easy to deadlock

@He-Pin
Copy link
Member Author

He-Pin commented Sep 8, 2025

@mdedetrich The Materializer itself is using the Await.result/ready:)

@He-Pin
Copy link
Member Author

He-Pin commented Sep 8, 2025

I did not see much deadlock, because of the blocking method on Reactor-core/Rxjava which have a larger java users base.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

t:stream Pekko Streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants