Skip to content

Commit bf16ec3

Browse files
worryg0djameshartig
authored andcommitted
CASSGO-4 Support of sending queries to the specific node
Query.SetHostID() allows users to specify on which node the Query will be executed. It is not a tipycal use case, but it makes sense with virtual tables which are available since C* 4.0. Patch by Bohdan Siryk; Reviewed by João Reis, James Hartig for CASSGO-4
1 parent f35d463 commit bf16ec3

File tree

5 files changed

+110
-3
lines changed

5 files changed

+110
-3
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- Support of sending queries to the specific node with Query.SetHostID() (CASSGO-4)
13+
1214
### Changed
1315

1416
- Move lz4 compressor to lz4 package within the gocql module (CASSGO-32)

cassandra_test.go

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3327,7 +3327,6 @@ func TestUnsetColBatch(t *testing.T) {
33273327
}
33283328
var id, mInt, count int
33293329
var mText string
3330-
33313330
if err := session.Query("SELECT count(*) FROM gocql_test.batchUnsetInsert;").Scan(&count); err != nil {
33323331
t.Fatalf("Failed to select with err: %v", err)
33333332
} else if count != 2 {
@@ -3362,3 +3361,52 @@ func TestQuery_NamedValues(t *testing.T) {
33623361
t.Fatal(err)
33633362
}
33643363
}
3364+
3365+
// This test ensures that queries are sent to the specified host only
3366+
func TestQuery_SetHostID(t *testing.T) {
3367+
session := createSession(t)
3368+
defer session.Close()
3369+
3370+
hosts := session.GetHosts()
3371+
3372+
const iterations = 5
3373+
for _, expectedHost := range hosts {
3374+
for i := 0; i < iterations; i++ {
3375+
var actualHostID string
3376+
err := session.Query("SELECT host_id FROM system.local").
3377+
SetHostID(expectedHost.HostID()).
3378+
Scan(&actualHostID)
3379+
if err != nil {
3380+
t.Fatal(err)
3381+
}
3382+
3383+
if expectedHost.HostID() != actualHostID {
3384+
t.Fatalf("Expected query to be executed on host %s, but it was executed on %s",
3385+
expectedHost.HostID(),
3386+
actualHostID,
3387+
)
3388+
}
3389+
}
3390+
}
3391+
3392+
// ensuring properly handled invalid host id
3393+
err := session.Query("SELECT host_id FROM system.local").
3394+
SetHostID("[invalid]").
3395+
Exec()
3396+
if !errors.Is(err, ErrNoConnections) {
3397+
t.Fatalf("Expected error to be: %v, but got %v", ErrNoConnections, err)
3398+
}
3399+
3400+
// ensuring that the driver properly handles the case
3401+
// when specified host for the query is down
3402+
host := hosts[0]
3403+
pool, _ := session.pool.getPoolByHostID(host.HostID())
3404+
// simulating specified host is down
3405+
pool.host.setState(NodeDown)
3406+
err = session.Query("SELECT host_id FROM system.local").
3407+
SetHostID(host.HostID()).
3408+
Exec()
3409+
if !errors.Is(err, ErrNoConnections) {
3410+
t.Fatalf("Expected error to be: %v, but got %v", ErrNoConnections, err)
3411+
}
3412+
}

connectionpool.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,13 @@ func (p *policyConnPool) getPool(host *HostInfo) (pool *hostConnPool, ok bool) {
243243
return
244244
}
245245

246+
func (p *policyConnPool) getPoolByHostID(hostID string) (pool *hostConnPool, ok bool) {
247+
p.mu.RLock()
248+
pool, ok = p.hostConnPools[hostID]
249+
p.mu.RUnlock()
250+
return
251+
}
252+
246253
func (p *policyConnPool) Close() {
247254
p.mu.Lock()
248255
defer p.mu.Unlock()

query_executor.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type ExecutableQuery interface {
4141
Keyspace() string
4242
Table() string
4343
IsIdempotent() bool
44+
GetHostID() string
4445

4546
withContext(context.Context) ExecutableQuery
4647

@@ -83,12 +84,32 @@ func (q *queryExecutor) speculate(ctx context.Context, qry ExecutableQuery, sp S
8384
}
8485

8586
func (q *queryExecutor) executeQuery(qry ExecutableQuery) (*Iter, error) {
86-
hostIter := q.policy.Pick(qry)
87+
var hostIter NextHost
88+
89+
// check if the host id is specified for the query,
90+
// if it is, the query should be executed at the corresponding host.
91+
if hostID := qry.GetHostID(); hostID != "" {
92+
hostIter = func() SelectedHost {
93+
pool, ok := q.pool.getPoolByHostID(hostID)
94+
// if the specified host is down
95+
// we return nil to avoid endless query execution in queryExecutor.do()
96+
if !ok || !pool.host.IsUp() {
97+
return nil
98+
}
99+
return (*selectedHost)(pool.host)
100+
}
101+
}
102+
103+
// if host is not specified for the query,
104+
// then a host will be picked by HostSelectionPolicy
105+
if hostIter == nil {
106+
hostIter = q.policy.Pick(qry)
107+
}
87108

88109
// check if the query is not marked as idempotent, if
89110
// it is, we force the policy to NonSpeculative
90111
sp := qry.speculativeExecutionPolicy()
91-
if !qry.IsIdempotent() || sp.Attempts() == 0 {
112+
if qry.GetHostID() != "" || !qry.IsIdempotent() || sp.Attempts() == 0 {
92113
return q.do(qry.Context(), qry, hostIter), nil
93114
}
94115

session.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,7 @@ func (s *Session) Query(stmt string, values ...interface{}) *Query {
456456
qry.session = s
457457
qry.stmt = stmt
458458
qry.values = values
459+
qry.hostID = ""
459460
qry.defaultsFromSession()
460461
return qry
461462
}
@@ -949,6 +950,10 @@ type Query struct {
949950

950951
// routingInfo is a pointer because Query can be copied and copyable struct can't hold a mutex.
951952
routingInfo *queryRoutingInfo
953+
954+
// hostID specifies the host on which the query should be executed.
955+
// If it is empty, then the host is picked by HostSelectionPolicy
956+
hostID string
952957
}
953958

954959
type queryRoutingInfo struct {
@@ -1442,6 +1447,20 @@ func (q *Query) releaseAfterExecution() {
14421447
q.decRefCount()
14431448
}
14441449

1450+
// SetHostID allows to define the host the query should be executed against. If the
1451+
// host was filtered or otherwise unavailable, then the query will error. If an empty
1452+
// string is sent, the default behavior, using the configured HostSelectionPolicy will
1453+
// be used. A hostID can be obtained from HostInfo.HostID() after calling GetHosts().
1454+
func (q *Query) SetHostID(hostID string) *Query {
1455+
q.hostID = hostID
1456+
return q
1457+
}
1458+
1459+
// GetHostID returns id of the host on which query should be executed.
1460+
func (q *Query) GetHostID() string {
1461+
return q.hostID
1462+
}
1463+
14451464
// Iter represents an iterator that can be used to iterate over all rows that
14461465
// were returned by a query. The iterator might send additional queries to the
14471466
// database during the iteration if paging was enabled.
@@ -2057,6 +2076,11 @@ func (b *Batch) releaseAfterExecution() {
20572076
// that would race with speculative executions.
20582077
}
20592078

2079+
// GetHostID satisfies ExecutableQuery interface but does noop.
2080+
func (b *Batch) GetHostID() string {
2081+
return ""
2082+
}
2083+
20602084
type BatchType byte
20612085

20622086
const (
@@ -2189,6 +2213,11 @@ func (t *traceWriter) Trace(traceId []byte) {
21892213
}
21902214
}
21912215

2216+
// GetHosts return a list of hosts in the ring the driver knows of.
2217+
func (s *Session) GetHosts() []*HostInfo {
2218+
return s.ring.allHosts()
2219+
}
2220+
21922221
type ObservedQuery struct {
21932222
Keyspace string
21942223
Statement string

0 commit comments

Comments
 (0)