*: add opt-in stream multiplexing support#34
*: add opt-in stream multiplexing support#34shubhamdhama wants to merge 1 commit intocockroachdb:mainfrom
Conversation
1a7f639 to
269f176
Compare
|
|
Below is an AI assisted review of impact on no-multiplexing: "1" isn't a concern because we don't use it in cockroach for no-mux. I'll create an issue regarding this before I merge this PR to explore if we need framing for no-mux and mux and some benchmarks backing that decision. "6" is something I can dig more deeper, likely in a follow-up. "7" I can remove as a follow-up. Details#: 1 |
|
Checklists Explanations
Immediate follow-ups,
Follow-ups
|
There was a problem hiding this comment.
[AcIm] Move the content of this file to mux_writer_test.go
| "storj.io/drpc/drpcdebug" | ||
| ) | ||
|
|
||
| // StreamWriter is the interface used by streams for writing packets. Each call |
There was a problem hiding this comment.
[AcIm] Keep this comment at interface level, don't leak the implementation detail here.
| case fr.ID.Stream == 0 || fr.ID.Message == 0: | ||
| return Packet{}, drpc.ProtocolError.New("id monotonicity violation (fr:%v r:%v)", fr.ID, r.id) | ||
|
|
||
| case fr.ID.Stream == r.id.Stream && fr.ID.Less(r.id): |
There was a problem hiding this comment.
This is where I have relaxed the Frame ID constraint even for no-mux scenario.
There was a problem hiding this comment.
Can you elaborate on the reason to relax this?
There was a problem hiding this comment.
How would this work if frames of different streams arrive one after another and one of them is out of order?
Say, we got frames in this sequence (S: Stream, F: Frame)
S1 F1 --> allowed, set r.id = (1, 1)
S1 F2 --> allowed since F2 > F1, r.id = (1, 2)
S2 F1 --> allowed, since r.id.Stream != fr.ID.Stream, r.id = (2, 1)
S1 F2 --> allowed, since r.id.Stream != fr.ID.Stream, and the monotonicity check is skipped
We would need to track the last seen frame message id per stream and compare against that.
There was a problem hiding this comment.
Ah ok, I think this will not matter with the current implementation as we have now changed it to have exactly one frame per packet.
There was a problem hiding this comment.
But I plan to change this in upcoming PR. But that's sorta improvement without which this PR is still functional, though prone to application level HOL.
|
|
||
| if s.sigs.term.IsSet() { | ||
| // Put must always be called for message packets to manage buffer | ||
| // ownership. Put returns the buffer to the pool if the stream is closed. |
There was a problem hiding this comment.
[AcIm] Add a comment why it's safe for no-mux.
There was a problem hiding this comment.
Also, clarify that this comment applies only for mux
eb90688 to
6232cfe
Compare
DRPC currently allows only one active stream per transport at a time. This commit adds multiplexing as an opt-in mode (Options.Mux) that enables concurrent streams over a single transport, while preserving the original sequential behavior as the default. Mux and non-mux paths have fundamentally different concurrency models, so a single Manager with conditionals would add branching in hot paths. Instead, two separate types are used: Manager (non-mux, unchanged) and MuxManager (new, in manager_mux.go). MuxManager runs two goroutines: manageReader routes packets to streams via a streamRegistry, and manageWriter batches frames from a sharedWriteBuf into transport writes. The Stream now accepts a StreamWriter interface instead of *Writer directly, so both paths share the same Stream code. Non-mux uses *Writer (direct transport writes), mux uses muxWriter (serializes each packet atomically into the shared buffer to prevent frame interleaving). Packet buffering is abstracted behind a packetStore interface with two implementations: syncPacketBuffer (blocking single-slot, non-mux) and queuePacketBuffer (non-blocking queue with sync.Pool recycling, mux). RawRecv and MsgRecv branch on the mux flag for buffer lifecycle: non-mux copies data then calls Done() to unblock the reader, mux takes ownership and recycles after consumption. HandlePacket now processes KindMessage before checking the term signal. This is needed for mux mode where Put must always run to return pool buffers. In non-mux mode, Put on a closed syncPacketBuffer returns immediately, so there is no behavioral change. Though there is a brief blocking window if terminate() has set the term signal but hasn't called pbuf.Close() yet. The reader's monotonicity check is relaxed from global to per-stream so interleaved frames from different streams are accepted. Non-mux never produces interleaved stream IDs, so this has no behavioral impact there. Conn uses a streamManager interface satisfied by both Manager types, branching once in the constructor. In mux mode, Invoke allocates a per-call marshal buffer instead of reusing a shared one to support concurrent calls. On the server side, ServeOne branches once to either serveOneNonMux (sequential handleRPC) or serveOneMux (concurrent handleRPC with WaitGroup).
6232cfe to
783893f
Compare
| // appropriate locks. | ||
| func (s *Stream) rawWriteLocked(kind drpcwire.Kind, data []byte) (err error) { | ||
| fr := s.newFrameLocked(kind) | ||
| n := s.opts.SplitSize |
There was a problem hiding this comment.
[Optional AcIm] remove the SplitSize option.
cthumuluru-crdb
left a comment
There was a problem hiding this comment.
@shubhamdhama - I'm still going through the entire PR but added some questions/comments for you to take a look.
| case fr.ID.Stream == 0 || fr.ID.Message == 0: | ||
| return Packet{}, drpc.ProtocolError.New("id monotonicity violation (fr:%v r:%v)", fr.ID, r.id) | ||
|
|
||
| case fr.ID.Stream == r.id.Stream && fr.ID.Less(r.id): |
There was a problem hiding this comment.
Can you elaborate on the reason to relax this?
| // into grpc metadata in the context. | ||
| GRPCMetadataCompatMode bool | ||
|
|
||
| // Mux enables stream multiplexing on the transport, allowing multiple |
There was a problem hiding this comment.
nit: Can you make it more descriptive? Since the RPC muxer is also names as Mux, I would like it if we rename this one. Also, this is a flag to indicate if stream multiplexing must be enabled or not.
| if err := stream.HandlePacket(pkt); err != nil { | ||
| m.terminate(managerClosed.Wrap(err)) | ||
| return | ||
| } |
There was a problem hiding this comment.
Retain the check for ignoring old messages. I don't see a reason to drop it.
There was a problem hiding this comment.
Since the packets would come interleaved, there is no "old message" anymore.
| } | ||
|
|
||
| // if any invoke sequence is being sent, forward it to be handled. | ||
| case pkt.Kind == drpcwire.KindInvoke || pkt.Kind == drpcwire.KindInvokeMetadata: |
There was a problem hiding this comment.
You should still take care of the canceled message arriving first right? How do we protect against that?
|
|
||
| // silently drop packet for an unregistered stream | ||
| default: | ||
| m.log("DROP", pkt.String) |
There was a problem hiding this comment.
You cannot really drop the packet silently in this case right? Reader goroutine is the one reading the packet and handing it off for the manager to create the stream. Lets assume, there is a race between "new stream" and "cancelation" packet. Depending on how the race you might end up creating a stream but endup silently dropping the cancel packet for that stream.
There was a problem hiding this comment.
I think we have discussed this earlier, offline. If we receive a "cancellation" for a stream before its creation, it should be a protocol error. Otherwise, if we support the case of cancellation may appear before creation, then we have to keep some sort of memory of such stream IDs. Then we have to decide the duration for which we should persist that memory.
I am fine adding a support for this but this was never supported even in non-mux. I recall you observed this race, but IMO that was a bug rather than a norm.
| } else if fr.Done { | ||
| return nil | ||
| } | ||
| if err := s.wr.WritePacket(pkt); err != nil { |
There was a problem hiding this comment.
About SplitSize, since we don't set it explicitly in cockroach, drpc would currently use a default frame size of 64 KB right? So this would be a change in the existing behaviour for the non-mux code path. Also, for the mux code path, there would be head-of-line blocking concerns if we don't split large messages.
Can we clarify the motivation for removing SplitSize in the PR?
There was a problem hiding this comment.
Ahh right, the default is n = 64 * 1024. Then I guess I should add it to the writer after-all. Thank you for catching this.
Reason for removing: for non-multiplexing I can't think why framing exists. I mean genuinely, for no-mux I don't understand it. For multiplexing I can understand the reasons of head of line blocking, which I think would further complicate the PR but still very much required, so it's the immediate follow-up of this PR.
For no-mux, if you have a packet of 1 MiB in memory, you chunk into 16 chunks and you write them one by one as 16 syscalls. On the other hand, if you write all of 1MiB at once, the TCP layer should be smart enough to chunk it. So without any benchmarking I don't know for sure why framing exists and that's why I have added an action item for it in #34 (comment).
That said, I also want to avoid deviating from existing behavior so I will add it back for no-mux.
There was a problem hiding this comment.
[AcIm] add the SplitSize back for no-mux
There was a problem hiding this comment.
On the other hand, if you write all of 1MiB at once, the TCP layer should be smart enough to chunk it.
I think it's naive way of looking at things. I'll do more digging here.
There was a problem hiding this comment.
I don't see a strong reason to keep it for non-mux (http 1.1 doesn't have any chunking for instance) but just wanted to point out the change in behaviour in the non-mux path.
There was a problem hiding this comment.
If you are planning to benchmark in order to make the decision for the non-mux path, the existing chunking is something to keep in mind.
HandlePacket: KindMessage before term check
There is a race window worth noting:
No deadlock in any case. The third case means the message gets delivered during termination, which is fine. The goal is to close the stream, not to prevent an in-flight message from being consumed. |
|
|
||
| if s.sigs.term.IsSet() { | ||
| // Put must always be called for message packets to manage buffer | ||
| // ownership. Put returns the buffer to the pool if the stream is closed. |
There was a problem hiding this comment.
Also, clarify that this comment applies only for mux
| switch { | ||
| case fr.ID.Less(r.id): | ||
| case fr.ID.Stream == 0 || fr.ID.Message == 0: | ||
| return Packet{}, drpc.ProtocolError.New("id monotonicity violation (fr:%v r:%v)", fr.ID, r.id) |
There was a problem hiding this comment.
May want to rephrase the error since this is different from a monotonicity violation (as in, not really an out of order frame situation, more like an invalid id).
|
The non-mux code path looks good to me, except for the change w.r.t SplitSize 👍 |
shubhamdhama
left a comment
There was a problem hiding this comment.
Answered some of the Chandra's comments.
|
|
||
| // silently drop packet for an unregistered stream | ||
| default: | ||
| m.log("DROP", pkt.String) |
There was a problem hiding this comment.
I think we have discussed this earlier, offline. If we receive a "cancellation" for a stream before its creation, it should be a protocol error. Otherwise, if we support the case of cancellation may appear before creation, then we have to keep some sort of memory of such stream IDs. Then we have to decide the duration for which we should persist that memory.
I am fine adding a support for this but this was never supported even in non-mux. I recall you observed this race, but IMO that was a bug rather than a norm.
| } | ||
|
|
||
| // if any invoke sequence is being sent, forward it to be handled. | ||
| case pkt.Kind == drpcwire.KindInvoke || pkt.Kind == drpcwire.KindInvokeMetadata: |
| if err := stream.HandlePacket(pkt); err != nil { | ||
| m.terminate(managerClosed.Wrap(err)) | ||
| return | ||
| } |
There was a problem hiding this comment.
Since the packets would come interleaved, there is no "old message" anymore.
| rpc = string(pkt.Data) | ||
| streamCtx := ctx | ||
|
|
||
| if metadata := m.popMetadata(pkt.ID.Stream); metadata != nil { |
There was a problem hiding this comment.
Since this creates a new map for every stream, it may be good to keep some upper bound on the map size.
| return | ||
| } | ||
|
|
||
| pb.data = append(pb.data, data) |
There was a problem hiding this comment.
Need an upper bound on the size of this since we keep appending to it. If no MsgRecv() or the consumer lags for some reason, this will keep growing.
| } | ||
| m.metaMu.Unlock() | ||
| // Cancel all active streams so they get a clear error. | ||
| m.reg.ForEach(func(_ uint64, s *drpcstream.Stream) { |
There was a problem hiding this comment.
We don't need to cancel every stream explicitly here because the m.sigs.term.Set() at the beginning of terminate will unblock the select in all the manageStream goroutines, each of which will cancel its own stream. This is how it works in the non-mux as well.
| m.sigs.tport.Set(m.tr.Close()) | ||
| m.sw.Close() | ||
| m.metaMu.Lock() | ||
| for id := range m.meta { |
There was a problem hiding this comment.
May not need this either as this will be garbage collected.
| defer r.mu.Unlock() | ||
|
|
||
| if r.closed { | ||
| return managerClosed.New("register") |
There was a problem hiding this comment.
nit: rephrase to 'register called'
| if m.sigs.term.Set(err) { | ||
| m.log("TERM", func() string { return fmt.Sprint(err) }) | ||
| m.sigs.tport.Set(m.tr.Close()) | ||
| m.sw.Close() |
There was a problem hiding this comment.
Closing the shared write buffer and then closing the transport is cleaner. That way we avoid the unnecessary write to the transport (that will then fail) if the buffer has any leftover data.
| // Drain swaps out accumulated bytes, giving the caller ownership of the | ||
| // returned slice. The internal buffer is replaced with spare (reset to zero | ||
| // length) so producers can continue appending without allocation. | ||
| func (sw *sharedWriteBuf) Drain(spare []byte) []byte { |
There was a problem hiding this comment.
nit: Unused, only used in tests. Can consider moving to a test file.
| m.meta[streamID] = metadata | ||
| } | ||
|
|
||
| func (m *MuxManager) popMetadata(streamID uint64) map[string]string { |
There was a problem hiding this comment.
nit: Can we rename to getMetadata, since this is just a dict read? pop usually implies LIFO semantics (or atleast removing from end of a data structure), so it can be confusing.
| case pkt.Kind == drpcwire.KindInvoke || pkt.Kind == drpcwire.KindInvokeMetadata: | ||
| select { | ||
| case m.pkts <- pkt: | ||
| m.pdone.Recv() |
There was a problem hiding this comment.
The reader waits here for NewServerStream to pick up and process the packet each time an Invoke or InvokeMetadata packet arrives, which can block other packets from being read. I think this is because pkt.Data cannot be reused by the reader until the new stream is created. Since the number of invoke packets, in general, would be fewer than the number of message packets and invoke packets would tend to be small (just the rpc string), we can consider copying the packet here to remove this wait, incurring the cost of one small allocation per Invoke as a trade-off. This would also require that the size of metadata be fairly small but that isn't an unreasonable constraint, IMO.
DRPC currently allows only one active stream per transport at a time. This commit adds multiplexing as an opt-in mode (Options.Mux) that enables concurrent streams over a single transport, while preserving the original sequential behavior as the default.
Mux and non-mux paths have fundamentally different concurrency models, so a single Manager with conditionals would add branching in hot paths. Instead, two separate types are used: Manager (non-mux, unchanged) and MuxManager (new, in manager_mux.go). MuxManager runs two goroutines: manageReader routes packets to streams via a streamRegistry, and manageWriter batches frames from a sharedWriteBuf into transport writes.
Elaborate details on separating MuxManager and existing Manager
The alternative was a single Manager with a
mux boolfield and conditionals throughout. This was rejected because the internal state is fundamentally different between modes. Manager uses a semaphore (drpcsignal.Chan) for single-stream-at-a-time, astreamBufferfor tracking the current stream, a shared*drpcwire.Writerfor direct transport writes, andmanageReader+manageStreamsgoroutines. MuxManager uses astreamRegistry(thread-safe map by stream ID), asharedWriteBuffor batched writes, async.WaitGroupfor per-stream goroutines, anatomic.Uint64for client stream ID generation, andmanageReader+manageWritergoroutines. Branching across all of these would make both paths harder to reason about.Shared code stays at package level: the
Optionsstruct (withMux booladded),managerClosederror class,isConnectionResethelper, andclosedChpre-closed channel.The Stream now accepts a StreamWriter interface instead of *Writer directly, so both paths share the same Stream code. Non-mux uses *Writer (direct transport writes), mux uses muxWriter (serializes each packet atomically into the shared buffer to prevent frame interleaving).
Packet buffering is abstracted behind a packetStore interface with two implementations: syncPacketBuffer (blocking single-slot, non-mux) and queuePacketBuffer (non-blocking queue with sync.Pool recycling, mux). In non-mux mode, Put blocks the reader until the consumer calls Get + Done, providing natural backpressure since there's only one stream. In mux mode, Put appends and returns immediately so the reader stays unblocked to dispatch packets to other concurrent streams. RawRecv and MsgRecv branch on the mux flag for buffer lifecycle: non-mux copies data then calls Done() to unblock the reader, mux takes ownership and recycles after consumption.
packetStore details
Buffer ownership in mux mode:
AcquirePacketBuf()gets a buffer from the pool, the reader fills it,HandlePacketcallsPut(transfers ownership to the queue), the reader callsAcquirePacketBuf()again for a fresh buffer. On the consumer side,Gettakes ownership from the queue, and the consumer callsRecycle(orRawRecvreturns the buffer directly to the caller).The pool uses a
pktBufwrapper struct instead of storing bare[]bytevalues. Without the wrapper, everysync.Pool.Putwould box the slice header into an interface, causing an allocation per Put. The wrapper is a pointer type, so pool operations don't allocate.Closebehavior differs by error: with a non-EOF error, queued buffers are returned to the pool immediately (the stream is broken, no one will read them). Withio.EOF, queued buffers are preserved so readers can drain remaining messages before seeing EOF.HandlePacket now processes KindMessage before checking the term signal. This is needed for mux mode where Put must always run to return pool buffers. In non-mux mode, Put on a closed syncPacketBuffer returns immediately, so there is no behavioral change. Though there is a brief blocking window if terminate() has set the term signal but hasn't called pbuf.Close() yet, but is safe.
Details
There is a race window worth noting:
terminate()setstermbefore callingpbuf.Close(). If the reader goroutine entersHandlePacketwith aKindMessageduring that window,Putcan entersyncPacketBufferwhile it's not yet closed. Three cases:for pb.set && pb.err == nil):Closesetspb.errand broadcasts, wakingPut, which exits immediately.Putplaces data, blocks atfor pb.set || pb.held):Closeseespb.held == false, setspb.set = falseandpb.err, broadcasts.Putwakes and exits.Putplaces data, consumer callsGetthenDone):Putcompletes normally,Closeruns afterward.No deadlock in any case. The third case means the message gets delivered during termination, which is fine. The goal is to close the stream, not to prevent an in-flight message from being consumed.
The reader's monotonicity check is relaxed from global to per-stream so interleaved frames from different streams are accepted. Non-mux never produces interleaved stream IDs, so this has no behavioral impact there.
Conn uses a streamManager interface satisfied by both Manager types, branching once in the constructor. In mux mode, Invoke allocates a per-call marshal buffer instead of reusing a shared one to support concurrent calls. On the server side, ServeOne branches once to either serveOneNonMux (sequential handleRPC) or serveOneMux (concurrent handleRPC with WaitGroup).