Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions drpcconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,20 @@ type Options struct {
CollectStats bool
}

// streamManager is the interface satisfied by both drpcmanager.Manager (non-mux)
// and drpcmanager.MuxManager (mux).
type streamManager interface {
NewClientStream(ctx context.Context, rpc string) (*drpcstream.Stream, error)
Closed() <-chan struct{}
Unblocked() <-chan struct{}
Close() error
}

// Conn is a drpc client connection.
type Conn struct {
tr drpc.Transport
man *drpcmanager.Manager
man streamManager
mux bool
mu sync.Mutex
wbuf []byte

Expand All @@ -56,7 +66,12 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Conn {
c.stats = make(map[string]*drpcstats.Stats)
}

c.man = drpcmanager.NewWithOptions(tr, opts.Manager)
c.mux = opts.Manager.Mux
if c.mux {
c.man = drpcmanager.NewMuxWithOptions(tr, opts.Manager)
} else {
c.man = drpcmanager.NewWithOptions(tr, opts.Manager)
}

return c
}
Expand Down Expand Up @@ -100,8 +115,9 @@ func (c *Conn) Unblocked() <-chan struct{} { return c.man.Unblocked() }
// Close closes the connection.
func (c *Conn) Close() (err error) { return c.man.Close() }

// Invoke issues the rpc on the transport serializing in, waits for a response, and
// deserializes it into out. Only one Invoke or Stream may be open at a time.
// Invoke issues the rpc on the transport serializing in, waits for a response,
// and deserializes it into out. In non-mux mode, only one Invoke or Stream may
// be open at a time. In mux mode, multiple calls may be open concurrently.
func (c *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) (err error) {
defer func() { err = drpc.ToRPCErr(err) }()

Expand All @@ -117,18 +133,28 @@ func (c *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, ou
}
defer func() { err = errs.Combine(err, stream.Close()) }()

// we have to protect c.wbuf here even though the manager only allows one
// stream at a time because the stream may async close allowing another
// concurrent call to Invoke to proceed.
c.mu.Lock()
defer c.mu.Unlock()

c.wbuf, err = drpcenc.MarshalAppend(in, enc, c.wbuf[:0])
if err != nil {
return err
var data []byte
if c.mux {
// Per-call buffer allocation for concurrent access.
data, err = drpcenc.MarshalAppend(in, enc, nil)
if err != nil {
return err
}
} else {
// We have to protect c.wbuf here even though the manager only allows
// one stream at a time because the stream may async close allowing
// another concurrent call to Invoke to proceed.
c.mu.Lock()
defer c.mu.Unlock()

c.wbuf, err = drpcenc.MarshalAppend(in, enc, c.wbuf[:0])
if err != nil {
return err
}
data = c.wbuf
}

if err := c.doInvoke(stream, enc, rpc, c.wbuf, metadata, out); err != nil {
if err := c.doInvoke(stream, enc, rpc, data, metadata, out); err != nil {
return err
}
return nil
Expand Down
5 changes: 4 additions & 1 deletion drpcconn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ func TestConn_NewStreamSendsGrpcAndDrpcMetadata(t *testing.T) {
})
s, err := conn.NewStream(ctx, "/com.example.Foo/Bar", testEncoding{})
assert.NoError(t, err)
_ = s.CloseSend()

assert.NoError(t, s.CloseSend())

ctx.Wait()
}

func TestConn_encodeMetadata(t *testing.T) {
Expand Down
82 changes: 82 additions & 0 deletions drpcmanager/frame_queue_test.go
Copy link
Copy Markdown
Author

@shubhamdhama shubhamdhama Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[AcIm] Move the content of this file to mux_writer_test.go

Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (C) 2026 Cockroach Labs.
// See LICENSE for copying information.

package drpcmanager

import (
"testing"

"github.com/zeebo/assert"
"storj.io/drpc/drpcwire"
)

func TestSharedWriteBuf_AppendDrain(t *testing.T) {
sw := newSharedWriteBuf()

pkt := drpcwire.Packet{
Data: []byte("hello"),
ID: drpcwire.ID{Stream: 1, Message: 2},
Kind: drpcwire.KindMessage,
}

assert.NoError(t, sw.Append(pkt))

// Drain should return serialized bytes.
data := sw.Drain(nil)
assert.That(t, len(data) > 0)

// Parse the frame back out to verify correctness.
_, got, ok, err := drpcwire.ParseFrame(data)
assert.NoError(t, err)
assert.That(t, ok)
assert.DeepEqual(t, got.Data, pkt.Data)
assert.Equal(t, got.ID.Stream, pkt.ID.Stream)
assert.Equal(t, got.ID.Message, pkt.ID.Message)
assert.Equal(t, got.Kind, pkt.Kind)
assert.Equal(t, got.Done, true)
}

func TestSharedWriteBuf_CloseIdempotent(t *testing.T) {
sw := newSharedWriteBuf()
sw.Close()
sw.Close() // must not panic
}

func TestSharedWriteBuf_AppendAfterClose(t *testing.T) {
sw := newSharedWriteBuf()
sw.Close()

err := sw.Append(drpcwire.Packet{})
assert.Error(t, err)
}

func TestSharedWriteBuf_WaitAndDrainBlocks(t *testing.T) {
sw := newSharedWriteBuf()

done := make(chan struct{})
go func() {
defer close(done)
data, ok := sw.WaitAndDrain(nil)
assert.That(t, ok)
assert.That(t, len(data) > 0)
}()

// Append should wake the blocked WaitAndDrain.
assert.NoError(t, sw.Append(drpcwire.Packet{Data: []byte("a")}))
<-done
}

func TestSharedWriteBuf_WaitAndDrainCloseEmpty(t *testing.T) {
sw := newSharedWriteBuf()

done := make(chan struct{})
go func() {
defer close(done)
_, ok := sw.WaitAndDrain(nil)
assert.That(t, !ok)
}()

// Close on empty buffer should return ok=false.
sw.Close()
<-done
}
6 changes: 6 additions & 0 deletions drpcmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ type Options struct {
// handling. When enabled, the server stream will decode incoming metadata
// into grpc metadata in the context.
GRPCMetadataCompatMode bool

// Mux enables stream multiplexing on the transport, allowing multiple
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can you make it more descriptive? Since the RPC muxer is also names as Mux, I would like it if we rename this one. Also, this is a flag to indicate if stream multiplexing must be enabled or not.

// concurrent streams. When false (default), the manager uses the
// original single-stream-at-a-time behavior.
Mux bool
}

// Manager handles the logic of managing a transport for a drpc client or
Expand Down Expand Up @@ -306,6 +311,7 @@ func (m *Manager) newStream(ctx context.Context, sid uint64, kind drpc.StreamKin
drpcopts.SetStreamStats(&opts.Internal, cb(rpc))
}

m.wr.Reset()
stream := drpcstream.NewWithOptions(ctx, sid, m.wr, opts)
select {
case m.streams <- streamInfo{ctx: ctx, stream: stream}:
Expand Down
Loading