Skip to content

Conversation

@joroKr21
Copy link
Contributor

@joroKr21 joroKr21 commented Sep 23, 2025

The prefetch parameter was introduced in #636 but it has an issue:

  • the gRRC client has internal bookkeeping that remembers the requested amount of messages (in the deframer)
  • we can get less than the requested amount from the queue (takeBetween)
  • if we then request another prefetch - chunk.size we end up with 2 * (prefetch - chunk.size) in the gRPC deframer which can be more than prefetch and this can degrade to the previous behaviour without back-pressure
  • instead, we need to request chunk.size messages to keep prefetch in flight
  • testing is tricky because there is a byte buffering layer in-between which affects back-pressure

@joroKr21 joroKr21 force-pushed the bugfix/prefetch branch 10 times, most recently from f7e0e31 to 11a30de Compare September 23, 2025 12:08
@joroKr21
Copy link
Contributor Author

@thesamet this failure seems unrelated to my changes:

java.lang.OutOfMemoryError: Direct buffer memory
at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
at java.base/java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:717)
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:692)
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:215)
at io.netty.buffer.PoolArena.tcacheAllocateSmall(PoolArena.java:180)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:137)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:129)
at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:395)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
at io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:124)
at io.grpc.netty.NettyWritableBufferAllocator.allocate(NettyWritableBufferAllocator.java:51)
at io.grpc.internal.MessageFramer.writeKnownLengthUncompressed(MessageFramer.java:226)
at io.grpc.internal.MessageFramer.writeUncompressed(MessageFramer.java:172)
at io.grpc.internal.MessageFramer.writePayload(MessageFramer.java:143)
at io.grpc.internal.AbstractStream.writeMessage(AbstractStream.java:66)
at io.grpc.internal.ForwardingClientStream.writeMessage(ForwardingClientStream.java:37)
at io.grpc.internal.DelayedStream.writeMessage(DelayedStream.java:278)
at io.grpc.internal.RetriableStream$1SendMessageEntry.runWith(RetriableStream.java:582)
at io.grpc.internal.RetriableStream.delayOrExecute(RetriableStream.java:559)
at io.grpc.internal.RetriableStream.sendMessage(RetriableStream.java:590)
at io.grpc.internal.ClientCallImpl.sendMessageInternal(ClientCallImpl.java:522)
at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:510)
at io.grpc.internal.DelayedClientCall.sendMessage(DelayedClientCall.java:342)
at scalapb.zio_grpc.client.ZClientCallImpl$.$anonfun$sendMessage$extension$1(ZClientCall.scala:38)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at zio.ZIOCompanionVersionSpecific.$anonfun$attempt$1(ZIOCompanionVersionSpecific.scala:98)
at zio.ZIO$.$anonfun$suspendSucceed$1(ZIO.scala:4886)
at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1128)
at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1159)
at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1243)
at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1159)
at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1159)
at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1159)
at zio.internal.FiberRuntime.evaluateEffect(FiberRuntime.scala:435)
at zio.internal.FiberRuntime.evaluateMessageWhileSuspended(FiberRuntime.scala:510)
at zio.internal.FiberRuntime.drainQueueOnCurrentThread(FiberRuntime.scala:272)
at zio.internal.FiberRuntime.run(FiberRuntime.scala:160)
at zio.internal.ZScheduler$$anon$3.run(ZScheduler.scala:403)

@joroKr21 joroKr21 force-pushed the bugfix/prefetch branch 5 times, most recently from c943854 to 2e1c845 Compare September 23, 2025 12:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant