Skip to content

Commit 65fc5c4

Browse files
committed
MaxRequestsPerConn cluster param was added
1 parent 953e0df commit 65fc5c4

File tree

6 files changed

+43
-12
lines changed

6 files changed

+43
-12
lines changed

cluster.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ type ClusterConfig struct {
106106
// Default: 2
107107
NumConns int
108108

109+
// Maximum number of inflight requests allowed per connection.
110+
// Default: 32768 for CQL v3 and newer
111+
// Default: 128 for older CQL versions
112+
MaxRequestsPerConn int
113+
109114
// Default consistency level.
110115
// Default: Quorum
111116
Consistency Consistency
@@ -282,6 +287,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
282287
ConnectTimeout: 11 * time.Second,
283288
Port: 9042,
284289
NumConns: 2,
290+
MaxRequestsPerConn: 0,
285291
Consistency: Quorum,
286292
MaxPreparedStmts: defaultMaxPreparedStmts,
287293
MaxRoutingKeyInfo: 1000,

cluster_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func TestNewCluster_Defaults(t *testing.T) {
3737
assertEqual(t, "cluster config timeout", 11*time.Second, cfg.Timeout)
3838
assertEqual(t, "cluster config port", 9042, cfg.Port)
3939
assertEqual(t, "cluster config num-conns", 2, cfg.NumConns)
40+
assertEqual(t, "cluster config max requests per conn", 0, cfg.MaxRequestsPerConn)
4041
assertEqual(t, "cluster config consistency", Quorum, cfg.Consistency)
4142
assertEqual(t, "cluster config max prepared statements", defaultMaxPreparedStmts, cfg.MaxPreparedStmts)
4243
assertEqual(t, "cluster config max routing key info", 1000, cfg.MaxRoutingKeyInfo)

conn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ func (s *Session) dialWithoutObserver(ctx context.Context, host *HostInfo, cfg *
277277
errorHandler: errorHandler,
278278
compressor: cfg.Compressor,
279279
session: s,
280-
streams: streams.New(cfg.ProtoVersion),
280+
streams: streams.NewStreamIDGenerator(s.cfg.ProtoVersion, s.cfg.MaxRequestsPerConn),
281281
host: host,
282282
isSchemaV2: true, // Try using "system.peers_v2" until proven otherwise
283283
frameObserver: s.frameObserver,

conn_test.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,7 @@ func TestQueryTimeoutClose(t *testing.T) {
693693
func TestStream0(t *testing.T) {
694694
// TODO: replace this with type check
695695
const expErr = "gocql: received unexpected frame on stream 0"
696+
const maxRequestsPerConn = 13
696697

697698
var buf bytes.Buffer
698699
f := newFramer(nil, protoVersion4)
@@ -706,13 +707,22 @@ func TestStream0(t *testing.T) {
706707
t.Fatal(err)
707708
}
708709

710+
srv := NewTestServer(t, defaultProto, context.Background())
711+
defer srv.Stop()
712+
cluster := testCluster(defaultProto, srv.Address)
713+
s, err := cluster.CreateSession()
714+
s.cfg.MaxRequestsPerConn = maxRequestsPerConn
715+
if err != nil {
716+
t.Fatalf("NewCluster: %v", err)
717+
}
718+
709719
conn := &Conn{
710720
r: bufio.NewReader(&buf),
711-
streams: streams.New(protoVersion4),
721+
streams: streams.NewStreamIDGenerator(defaultProto, s.cfg.MaxRequestsPerConn),
712722
logger: &defaultLogger{},
713723
}
714724

715-
err := conn.recv(context.Background())
725+
err = conn.recv(context.Background())
716726
if err == nil {
717727
t.Fatal("expected to get an error on stream 0")
718728
} else if !strings.HasPrefix(err.Error(), expErr) {

connectionpool.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,10 @@ func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
9999
type policyConnPool struct {
100100
session *Session
101101

102-
port int
103-
numConns int
104-
keyspace string
102+
port int
103+
numConns int
104+
maxRequestsPerConn int
105+
keyspace string
105106

106107
mu sync.RWMutex
107108
hostConnPools map[string]*hostConnPool
@@ -161,11 +162,12 @@ func connConfig(cfg *ClusterConfig) (*ConnConfig, error) {
161162
func newPolicyConnPool(session *Session) *policyConnPool {
162163
// create the pool
163164
pool := &policyConnPool{
164-
session: session,
165-
port: session.cfg.Port,
166-
numConns: session.cfg.NumConns,
167-
keyspace: session.cfg.Keyspace,
168-
hostConnPools: map[string]*hostConnPool{},
165+
session: session,
166+
port: session.cfg.Port,
167+
numConns: session.cfg.NumConns,
168+
maxRequestsPerConn: session.cfg.MaxRequestsPerConn,
169+
keyspace: session.cfg.Keyspace,
170+
hostConnPools: map[string]*hostConnPool{},
169171
}
170172

171173
return pool

internal/streams/streams.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,29 @@ type IDGenerator struct {
4343
offset uint32
4444
}
4545

46+
func NewStreamIDGenerator(protocol, maxRequestsPerConn int) *IDGenerator {
47+
if maxRequestsPerConn > 0 {
48+
return NewLimited(maxRequestsPerConn)
49+
}
50+
return New(protocol)
51+
}
52+
4653
func New(protocol int) *IDGenerator {
4754
maxStreams := 128
4855
if protocol > 2 {
4956
maxStreams = 32768
5057
}
58+
return NewLimited(maxStreams)
59+
}
60+
61+
func NewLimited(maxStreams int) *IDGenerator {
62+
// Round up maxStreams to a nearest multiple of 64
63+
maxStreams = ((maxStreams + 63) / 64) * 64
5164

5265
buckets := maxStreams / 64
5366
// reserve stream 0
5467
streams := make([]uint64, buckets)
5568
streams[0] = 1 << 63
56-
5769
return &IDGenerator{
5870
NumStreams: maxStreams,
5971
streams: streams,

0 commit comments

Comments
 (0)