Skip to content

Commit ba0fac8

Browse files
committed
MaxRequestsPerConn cluster param was added
1 parent 34fdeeb commit ba0fac8

File tree

8 files changed

+49
-18
lines changed

8 files changed

+49
-18
lines changed

AUTHORS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,4 @@ Dmitry Kropachev <[email protected]>
141141
Oliver Boyle <[email protected]>
142142
Jackson Fleming <[email protected]>
143143
Sylwia Szunejko <[email protected]>
144+
Danylo Savchenko <[email protected]>

cluster.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ type ClusterConfig struct {
8686
// Default: 2
8787
NumConns int
8888

89+
// Maximum number of inflight requests allowed per connection.
90+
// Default: 32768 for CQL v3 and newer
91+
// Default: 128 for older CQL versions
92+
MaxRequestsPerConn int
93+
8994
// Default consistency level.
9095
// Default: Quorum
9196
Consistency Consistency
@@ -262,6 +267,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
262267
ConnectTimeout: 11 * time.Second,
263268
Port: 9042,
264269
NumConns: 2,
270+
MaxRequestsPerConn: 0,
265271
Consistency: Quorum,
266272
MaxPreparedStmts: defaultMaxPreparedStmts,
267273
MaxRoutingKeyInfo: 1000,

cluster_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ func TestNewCluster_Defaults(t *testing.T) {
1313
assertEqual(t, "cluster config timeout", 11*time.Second, cfg.Timeout)
1414
assertEqual(t, "cluster config port", 9042, cfg.Port)
1515
assertEqual(t, "cluster config num-conns", 2, cfg.NumConns)
16+
assertEqual(t, "cluster config max requests per conn", 0, cfg.MaxRequestsPerConn)
1617
assertEqual(t, "cluster config consistency", Quorum, cfg.Consistency)
1718
assertEqual(t, "cluster config max prepared statements", defaultMaxPreparedStmts, cfg.MaxPreparedStmts)
1819
assertEqual(t, "cluster config max routing key info", 1000, cfg.MaxRoutingKeyInfo)

conn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ func (s *Session) dialWithoutObserver(ctx context.Context, host *HostInfo, cfg *
257257
errorHandler: errorHandler,
258258
compressor: cfg.Compressor,
259259
session: s,
260-
streams: streams.New(cfg.ProtoVersion),
260+
streams: streams.NewStreamIDGenerator(s.cfg.ProtoVersion, s.cfg.MaxRequestsPerConn),
261261
host: host,
262262
isSchemaV2: true, // Try using "system.peers_v2" until proven otherwise
263263
frameObserver: s.frameObserver,

conn_test.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"crypto/x509"
1515
"errors"
1616
"fmt"
17+
"github.com/gocql/gocql/internal/streams"
1718
"io"
1819
"io/ioutil"
1920
"math/rand"
@@ -24,8 +25,6 @@ import (
2425
"sync/atomic"
2526
"testing"
2627
"time"
27-
28-
"github.com/gocql/gocql/internal/streams"
2928
)
3029

3130
const (
@@ -672,6 +671,7 @@ func TestQueryTimeoutClose(t *testing.T) {
672671
func TestStream0(t *testing.T) {
673672
// TODO: replace this with type check
674673
const expErr = "gocql: received unexpected frame on stream 0"
674+
const maxRequestsPerConn = 13
675675

676676
var buf bytes.Buffer
677677
f := newFramer(nil, protoVersion4)
@@ -685,13 +685,22 @@ func TestStream0(t *testing.T) {
685685
t.Fatal(err)
686686
}
687687

688+
srv := NewTestServer(t, defaultProto, context.Background())
689+
defer srv.Stop()
690+
cluster := testCluster(defaultProto, srv.Address)
691+
s, err := cluster.CreateSession()
692+
s.cfg.MaxRequestsPerConn = maxRequestsPerConn
693+
if err != nil {
694+
t.Fatalf("NewCluster: %v", err)
695+
}
696+
688697
conn := &Conn{
689698
r: bufio.NewReader(&buf),
690-
streams: streams.New(protoVersion4),
699+
streams: streams.NewStreamIDGenerator(defaultProto, s.cfg.MaxRequestsPerConn),
691700
logger: &defaultLogger{},
692701
}
693702

694-
err := conn.recv(context.Background())
703+
err = conn.recv(context.Background())
695704
if err == nil {
696705
t.Fatal("expected to get an error on stream 0")
697706
} else if !strings.HasPrefix(err.Error(), expErr) {

connectionpool.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,10 @@ func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
7979
type policyConnPool struct {
8080
session *Session
8181

82-
port int
83-
numConns int
84-
keyspace string
82+
port int
83+
numConns int
84+
maxRequestsPerConn int
85+
keyspace string
8586

8687
mu sync.RWMutex
8788
hostConnPools map[string]*hostConnPool
@@ -141,11 +142,12 @@ func connConfig(cfg *ClusterConfig) (*ConnConfig, error) {
141142
func newPolicyConnPool(session *Session) *policyConnPool {
142143
// create the pool
143144
pool := &policyConnPool{
144-
session: session,
145-
port: session.cfg.Port,
146-
numConns: session.cfg.NumConns,
147-
keyspace: session.cfg.Keyspace,
148-
hostConnPools: map[string]*hostConnPool{},
145+
session: session,
146+
port: session.cfg.Port,
147+
numConns: session.cfg.NumConns,
148+
maxRequestsPerConn: session.cfg.MaxRequestsPerConn,
149+
keyspace: session.cfg.Keyspace,
150+
hostConnPools: map[string]*hostConnPool{},
149151
}
150152

151153
return pool

host_source.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -402,10 +402,10 @@ func (h *HostInfo) HostnameAndPort() string {
402402
}
403403

404404
func (h *HostInfo) ConnectAddressAndPort() string {
405-
h.mu.Lock()
406-
defer h.mu.Unlock()
407-
addr, _ := h.connectAddressLocked()
408-
return net.JoinHostPort(addr.String(), strconv.Itoa(h.port))
405+
h.mu.Lock()
406+
defer h.mu.Unlock()
407+
addr, _ := h.connectAddressLocked()
408+
return net.JoinHostPort(addr.String(), strconv.Itoa(h.port))
409409
}
410410

411411
func (h *HostInfo) String() string {

internal/streams/streams.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,29 @@ type IDGenerator struct {
1919
offset uint32
2020
}
2121

22+
func NewStreamIDGenerator(protocol, maxRequestsPerConn int) *IDGenerator {
23+
if maxRequestsPerConn > 0 {
24+
return NewLimited(maxRequestsPerConn)
25+
}
26+
return New(protocol)
27+
}
28+
2229
func New(protocol int) *IDGenerator {
2330
maxStreams := 128
2431
if protocol > 2 {
2532
maxStreams = 32768
2633
}
34+
return NewLimited(maxStreams)
35+
}
36+
37+
func NewLimited(maxStreams int) *IDGenerator {
38+
// Round up maxStreams to a nearest multiple of 64
39+
maxStreams = ((maxStreams + 63) / 64) * 64
2740

2841
buckets := maxStreams / 64
2942
// reserve stream 0
3043
streams := make([]uint64, buckets)
3144
streams[0] = 1 << 63
32-
3345
return &IDGenerator{
3446
NumStreams: maxStreams,
3547
streams: streams,

0 commit comments

Comments
 (0)