Skip to content

Commit 783893f

Browse files
committed
*: add opt-in stream multiplexing support
DRPC currently allows only one active stream per transport at a time. This commit adds multiplexing as an opt-in mode (Options.Mux) that enables concurrent streams over a single transport, while preserving the original sequential behavior as the default. Mux and non-mux paths have fundamentally different concurrency models, so a single Manager with conditionals would add branching in hot paths. Instead, two separate types are used: Manager (non-mux, unchanged) and MuxManager (new, in manager_mux.go). MuxManager runs two goroutines: manageReader routes packets to streams via a streamRegistry, and manageWriter batches frames from a sharedWriteBuf into transport writes. The Stream now accepts a StreamWriter interface instead of *Writer directly, so both paths share the same Stream code. Non-mux uses *Writer (direct transport writes), mux uses muxWriter (serializes each packet atomically into the shared buffer to prevent frame interleaving). Packet buffering is abstracted behind a packetStore interface with two implementations: syncPacketBuffer (blocking single-slot, non-mux) and queuePacketBuffer (non-blocking queue with sync.Pool recycling, mux). RawRecv and MsgRecv branch on the mux flag for buffer lifecycle: non-mux copies data then calls Done() to unblock the reader, mux takes ownership and recycles after consumption. HandlePacket now processes KindMessage before checking the term signal. This is needed for mux mode where Put must always run to return pool buffers. In non-mux mode, Put on a closed syncPacketBuffer returns immediately, so there is no behavioral change. Though there is a brief blocking window if terminate() has set the term signal but hasn't called pbuf.Close() yet. The reader's monotonicity check is relaxed from global to per-stream so interleaved frames from different streams are accepted. Non-mux never produces interleaved stream IDs, so this has no behavioral impact there. Conn uses a streamManager interface satisfied by both Manager types, branching once in the constructor. In mux mode, Invoke allocates a per-call marshal buffer instead of reusing a shared one to support concurrent calls. On the server side, ServeOne branches once to either serveOneNonMux (sequential handleRPC) or serveOneMux (concurrent handleRPC with WaitGroup).
1 parent 774206f commit 783893f

File tree

20 files changed

+1890
-72
lines changed

20 files changed

+1890
-72
lines changed

drpcconn/conn.go

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,20 @@ type Options struct {
2929
CollectStats bool
3030
}
3131

32+
// streamManager is the interface satisfied by both drpcmanager.Manager (non-mux)
33+
// and drpcmanager.MuxManager (mux).
34+
type streamManager interface {
35+
NewClientStream(ctx context.Context, rpc string) (*drpcstream.Stream, error)
36+
Closed() <-chan struct{}
37+
Unblocked() <-chan struct{}
38+
Close() error
39+
}
40+
3241
// Conn is a drpc client connection.
3342
type Conn struct {
3443
tr drpc.Transport
35-
man *drpcmanager.Manager
44+
man streamManager
45+
mux bool
3646
mu sync.Mutex
3747
wbuf []byte
3848

@@ -56,7 +66,12 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Conn {
5666
c.stats = make(map[string]*drpcstats.Stats)
5767
}
5868

59-
c.man = drpcmanager.NewWithOptions(tr, opts.Manager)
69+
c.mux = opts.Manager.Mux
70+
if c.mux {
71+
c.man = drpcmanager.NewMuxWithOptions(tr, opts.Manager)
72+
} else {
73+
c.man = drpcmanager.NewWithOptions(tr, opts.Manager)
74+
}
6075

6176
return c
6277
}
@@ -100,8 +115,9 @@ func (c *Conn) Unblocked() <-chan struct{} { return c.man.Unblocked() }
100115
// Close closes the connection.
101116
func (c *Conn) Close() (err error) { return c.man.Close() }
102117

103-
// Invoke issues the rpc on the transport serializing in, waits for a response, and
104-
// deserializes it into out. Only one Invoke or Stream may be open at a time.
118+
// Invoke issues the rpc on the transport serializing in, waits for a response,
119+
// and deserializes it into out. In non-mux mode, only one Invoke or Stream may
120+
// be open at a time. In mux mode, multiple calls may be open concurrently.
105121
func (c *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) (err error) {
106122
defer func() { err = drpc.ToRPCErr(err) }()
107123

@@ -117,18 +133,28 @@ func (c *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, ou
117133
}
118134
defer func() { err = errs.Combine(err, stream.Close()) }()
119135

120-
// we have to protect c.wbuf here even though the manager only allows one
121-
// stream at a time because the stream may async close allowing another
122-
// concurrent call to Invoke to proceed.
123-
c.mu.Lock()
124-
defer c.mu.Unlock()
125-
126-
c.wbuf, err = drpcenc.MarshalAppend(in, enc, c.wbuf[:0])
127-
if err != nil {
128-
return err
136+
var data []byte
137+
if c.mux {
138+
// Per-call buffer allocation for concurrent access.
139+
data, err = drpcenc.MarshalAppend(in, enc, nil)
140+
if err != nil {
141+
return err
142+
}
143+
} else {
144+
// We have to protect c.wbuf here even though the manager only allows
145+
// one stream at a time because the stream may async close allowing
146+
// another concurrent call to Invoke to proceed.
147+
c.mu.Lock()
148+
defer c.mu.Unlock()
149+
150+
c.wbuf, err = drpcenc.MarshalAppend(in, enc, c.wbuf[:0])
151+
if err != nil {
152+
return err
153+
}
154+
data = c.wbuf
129155
}
130156

131-
if err := c.doInvoke(stream, enc, rpc, c.wbuf, metadata, out); err != nil {
157+
if err := c.doInvoke(stream, enc, rpc, data, metadata, out); err != nil {
132158
return err
133159
}
134160
return nil

drpcconn/conn_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,10 @@ func TestConn_NewStreamSendsGrpcAndDrpcMetadata(t *testing.T) {
180180
})
181181
s, err := conn.NewStream(ctx, "/com.example.Foo/Bar", testEncoding{})
182182
assert.NoError(t, err)
183-
_ = s.CloseSend()
183+
184+
assert.NoError(t, s.CloseSend())
185+
186+
ctx.Wait()
184187
}
185188

186189
func TestConn_encodeMetadata(t *testing.T) {

drpcmanager/frame_queue_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright (C) 2026 Cockroach Labs.
2+
// See LICENSE for copying information.
3+
4+
package drpcmanager
5+
6+
import (
7+
"testing"
8+
9+
"github.com/zeebo/assert"
10+
"storj.io/drpc/drpcwire"
11+
)
12+
13+
func TestSharedWriteBuf_AppendDrain(t *testing.T) {
14+
sw := newSharedWriteBuf()
15+
16+
pkt := drpcwire.Packet{
17+
Data: []byte("hello"),
18+
ID: drpcwire.ID{Stream: 1, Message: 2},
19+
Kind: drpcwire.KindMessage,
20+
}
21+
22+
assert.NoError(t, sw.Append(pkt))
23+
24+
// Drain should return serialized bytes.
25+
data := sw.Drain(nil)
26+
assert.That(t, len(data) > 0)
27+
28+
// Parse the frame back out to verify correctness.
29+
_, got, ok, err := drpcwire.ParseFrame(data)
30+
assert.NoError(t, err)
31+
assert.That(t, ok)
32+
assert.DeepEqual(t, got.Data, pkt.Data)
33+
assert.Equal(t, got.ID.Stream, pkt.ID.Stream)
34+
assert.Equal(t, got.ID.Message, pkt.ID.Message)
35+
assert.Equal(t, got.Kind, pkt.Kind)
36+
assert.Equal(t, got.Done, true)
37+
}
38+
39+
func TestSharedWriteBuf_CloseIdempotent(t *testing.T) {
40+
sw := newSharedWriteBuf()
41+
sw.Close()
42+
sw.Close() // must not panic
43+
}
44+
45+
func TestSharedWriteBuf_AppendAfterClose(t *testing.T) {
46+
sw := newSharedWriteBuf()
47+
sw.Close()
48+
49+
err := sw.Append(drpcwire.Packet{})
50+
assert.Error(t, err)
51+
}
52+
53+
func TestSharedWriteBuf_WaitAndDrainBlocks(t *testing.T) {
54+
sw := newSharedWriteBuf()
55+
56+
done := make(chan struct{})
57+
go func() {
58+
defer close(done)
59+
data, ok := sw.WaitAndDrain(nil)
60+
assert.That(t, ok)
61+
assert.That(t, len(data) > 0)
62+
}()
63+
64+
// Append should wake the blocked WaitAndDrain.
65+
assert.NoError(t, sw.Append(drpcwire.Packet{Data: []byte("a")}))
66+
<-done
67+
}
68+
69+
func TestSharedWriteBuf_WaitAndDrainCloseEmpty(t *testing.T) {
70+
sw := newSharedWriteBuf()
71+
72+
done := make(chan struct{})
73+
go func() {
74+
defer close(done)
75+
_, ok := sw.WaitAndDrain(nil)
76+
assert.That(t, !ok)
77+
}()
78+
79+
// Close on empty buffer should return ok=false.
80+
sw.Close()
81+
<-done
82+
}

drpcmanager/manager.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ type Options struct {
6060
// handling. When enabled, the server stream will decode incoming metadata
6161
// into grpc metadata in the context.
6262
GRPCMetadataCompatMode bool
63+
64+
// Mux enables stream multiplexing on the transport, allowing multiple
65+
// concurrent streams. When false (default), the manager uses the
66+
// original single-stream-at-a-time behavior.
67+
Mux bool
6368
}
6469

6570
// Manager handles the logic of managing a transport for a drpc client or
@@ -306,6 +311,7 @@ func (m *Manager) newStream(ctx context.Context, sid uint64, kind drpc.StreamKin
306311
drpcopts.SetStreamStats(&opts.Internal, cb(rpc))
307312
}
308313

314+
m.wr.Reset()
309315
stream := drpcstream.NewWithOptions(ctx, sid, m.wr, opts)
310316
select {
311317
case m.streams <- streamInfo{ctx: ctx, stream: stream}:

0 commit comments

Comments
 (0)