Skip to content

Commit 88405bb

Browse files
committed
MaxRequestsPerConn cluster param was added
1 parent 34fdeeb commit 88405bb

File tree

8 files changed

+48
-16
lines changed

8 files changed

+48
-16
lines changed

AUTHORS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,4 @@ Dmitry Kropachev <[email protected]>
141141
Oliver Boyle <[email protected]>
142142
Jackson Fleming <[email protected]>
143143
Sylwia Szunejko <[email protected]>
144+
Danylo Savchenko <[email protected]>

cluster.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ type ClusterConfig struct {
8686
// Default: 2
8787
NumConns int
8888

89+
// Maximum number of inflight requests allowed per connection.
90+
// Default: 32768 for CQL v3 and newer
91+
// Default: 128 for older CQL versions
92+
MaxRequestsPerConn int
93+
8994
// Default consistency level.
9095
// Default: Quorum
9196
Consistency Consistency
@@ -262,6 +267,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
262267
ConnectTimeout: 11 * time.Second,
263268
Port: 9042,
264269
NumConns: 2,
270+
MaxRequestsPerConn: 0,
265271
Consistency: Quorum,
266272
MaxPreparedStmts: defaultMaxPreparedStmts,
267273
MaxRoutingKeyInfo: 1000,

cluster_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ func TestNewCluster_Defaults(t *testing.T) {
1313
assertEqual(t, "cluster config timeout", 11*time.Second, cfg.Timeout)
1414
assertEqual(t, "cluster config port", 9042, cfg.Port)
1515
assertEqual(t, "cluster config num-conns", 2, cfg.NumConns)
16+
assertEqual(t, "cluster config max requests per conn", 0, cfg.MaxRequestsPerConn)
1617
assertEqual(t, "cluster config consistency", Quorum, cfg.Consistency)
1718
assertEqual(t, "cluster config max prepared statements", defaultMaxPreparedStmts, cfg.MaxPreparedStmts)
1819
assertEqual(t, "cluster config max routing key info", 1000, cfg.MaxRoutingKeyInfo)

conn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ func (s *Session) dialWithoutObserver(ctx context.Context, host *HostInfo, cfg *
257257
errorHandler: errorHandler,
258258
compressor: cfg.Compressor,
259259
session: s,
260-
streams: streams.New(cfg.ProtoVersion),
260+
streams: streams.NewStreamIDGenerator(s.cfg.ProtoVersion, s.cfg.MaxRequestsPerConn),
261261
host: host,
262262
isSchemaV2: true, // Try using "system.peers_v2" until proven otherwise
263263
frameObserver: s.frameObserver,

conn_test.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,7 @@ func TestQueryTimeoutClose(t *testing.T) {
672672
func TestStream0(t *testing.T) {
673673
// TODO: replace this with type check
674674
const expErr = "gocql: received unexpected frame on stream 0"
675+
const maxRequestsPerConn = 13
675676

676677
var buf bytes.Buffer
677678
f := newFramer(nil, protoVersion4)
@@ -685,13 +686,22 @@ func TestStream0(t *testing.T) {
685686
t.Fatal(err)
686687
}
687688

689+
srv := NewTestServer(t, defaultProto, context.Background())
690+
defer srv.Stop()
691+
cluster := testCluster(defaultProto, srv.Address)
692+
s, err := cluster.CreateSession()
693+
s.cfg.MaxRequestsPerConn = maxRequestsPerConn
694+
if err != nil {
695+
t.Fatalf("NewCluster: %v", err)
696+
}
697+
688698
conn := &Conn{
689699
r: bufio.NewReader(&buf),
690-
streams: streams.New(protoVersion4),
700+
streams: streams.NewStreamIDGenerator(defaultProto, s.cfg.MaxRequestsPerConn),
691701
logger: &defaultLogger{},
692702
}
693703

694-
err := conn.recv(context.Background())
704+
err = conn.recv(context.Background())
695705
if err == nil {
696706
t.Fatal("expected to get an error on stream 0")
697707
} else if !strings.HasPrefix(err.Error(), expErr) {

connectionpool.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,10 @@ func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
7979
type policyConnPool struct {
8080
session *Session
8181

82-
port int
83-
numConns int
84-
keyspace string
82+
port int
83+
numConns int
84+
maxRequestsPerConn int
85+
keyspace string
8586

8687
mu sync.RWMutex
8788
hostConnPools map[string]*hostConnPool
@@ -141,11 +142,12 @@ func connConfig(cfg *ClusterConfig) (*ConnConfig, error) {
141142
func newPolicyConnPool(session *Session) *policyConnPool {
142143
// create the pool
143144
pool := &policyConnPool{
144-
session: session,
145-
port: session.cfg.Port,
146-
numConns: session.cfg.NumConns,
147-
keyspace: session.cfg.Keyspace,
148-
hostConnPools: map[string]*hostConnPool{},
145+
session: session,
146+
port: session.cfg.Port,
147+
numConns: session.cfg.NumConns,
148+
maxRequestsPerConn: session.cfg.MaxRequestsPerConn,
149+
keyspace: session.cfg.Keyspace,
150+
hostConnPools: map[string]*hostConnPool{},
149151
}
150152

151153
return pool

host_source.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -402,10 +402,10 @@ func (h *HostInfo) HostnameAndPort() string {
402402
}
403403

404404
func (h *HostInfo) ConnectAddressAndPort() string {
405-
h.mu.Lock()
406-
defer h.mu.Unlock()
407-
addr, _ := h.connectAddressLocked()
408-
return net.JoinHostPort(addr.String(), strconv.Itoa(h.port))
405+
h.mu.Lock()
406+
defer h.mu.Unlock()
407+
addr, _ := h.connectAddressLocked()
408+
return net.JoinHostPort(addr.String(), strconv.Itoa(h.port))
409409
}
410410

411411
func (h *HostInfo) String() string {

internal/streams/streams.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,29 @@ type IDGenerator struct {
1919
offset uint32
2020
}
2121

22+
func NewStreamIDGenerator(protocol, maxRequestsPerConn int) *IDGenerator {
23+
if maxRequestsPerConn > 0 {
24+
return NewLimited(maxRequestsPerConn)
25+
}
26+
return New(protocol)
27+
}
28+
2229
func New(protocol int) *IDGenerator {
2330
maxStreams := 128
2431
if protocol > 2 {
2532
maxStreams = 32768
2633
}
34+
return NewLimited(maxStreams)
35+
}
36+
37+
func NewLimited(maxStreams int) *IDGenerator {
38+
// Round up maxStreams to a nearest multiple of 64
39+
maxStreams = ((maxStreams + 63) / 64) * 64
2740

2841
buckets := maxStreams / 64
2942
// reserve stream 0
3043
streams := make([]uint64, buckets)
3144
streams[0] = 1 << 63
32-
3345
return &IDGenerator{
3446
NumStreams: maxStreams,
3547
streams: streams,

0 commit comments

Comments
 (0)