Skip to content

Commit 986a1b9

Browse files
committed
fixing race condition related to worker recovery issue
1 parent 0e4c25f commit 986a1b9

File tree

5 files changed

+174
-44
lines changed

5 files changed

+174
-44
lines changed

lib/adaptivequemgr.go

+25-22
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import (
2121
"errors"
2222
"fmt"
2323
"math/rand"
24-
"sync/atomic"
2524
"strings"
25+
"sync/atomic"
2626
"time"
2727

2828
"github.com/paypal/hera/cal"
@@ -118,28 +118,31 @@ type BindCount struct {
118118
Workers map[string]*WorkerClient // lookup by ticket
119119
}
120120

121-
func bindEvictNameOk(bindName string) (bool) {
121+
func bindEvictNameOk(bindName string) bool {
122122
commaNames := GetConfig().BindEvictionNames
123123
if len(commaNames) == 0 {
124124
// for tests, allow all names to be subject to bind eviction
125125
return true
126126
}
127127
commaNames = strings.ToLower(commaNames)
128128
bindName = strings.ToLower(bindName)
129-
for _, okSubname := range strings.Split(commaNames,",") {
129+
for _, okSubname := range strings.Split(commaNames, ",") {
130130
if strings.Contains(bindName, okSubname) {
131131
return true
132132
}
133133
}
134134
return false
135135
}
136136

137-
/* A bad query with multiple binds will add independent bind throttles to all
138-
bind name and values */
139-
func (mgr *adaptiveQueueManager) doBindEviction() (int) {
137+
/*
138+
A bad query with multiple binds will add independent bind throttles to all
139+
140+
bind name and values
141+
*/
142+
func (mgr *adaptiveQueueManager) doBindEviction() int {
140143
throttleCount := 0
141144
GetBindEvict().lock.Lock()
142-
for _,keyValues := range GetBindEvict().BindThrottle {
145+
for _, keyValues := range GetBindEvict().BindThrottle {
143146
throttleCount += len(keyValues)
144147
}
145148
GetBindEvict().lock.Unlock()
@@ -172,14 +175,14 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
172175
}
173176
continue
174177
}
175-
contextBinds := parseBinds(request)
176-
sqlsrcPrefix := worker.clientHostPrefix.Load().(string)
177-
sqlsrcApp := worker.clientApp.Load().(string)
178+
contextBinds := parseBinds(request)
179+
sqlsrcPrefix := worker.clientHostPrefix.Load().(string)
180+
sqlsrcApp := worker.clientApp.Load().(string)
178181
if sqlsrcPrefix != "" {
179182
contextBinds[SrcPrefixAppKey] = fmt.Sprintf("%s%s", sqlsrcPrefix, sqlsrcApp)
180183
if logger.GetLogger().V(logger.Debug) {
181-
msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey])
182-
logger.GetLogger().Log(logger.Debug, msg)
184+
msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey])
185+
logger.GetLogger().Log(logger.Debug, msg)
183186
}
184187
}
185188
for bindName0, bindValue := range contextBinds {
@@ -200,8 +203,8 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
200203
}
201204
concatKey := fmt.Sprintf("%d|%s|%s", sqlhash, bindName, bindValue)
202205
if logger.GetLogger().V(logger.Debug) {
203-
msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey)
204-
logger.GetLogger().Log(logger.Debug, msg)
206+
msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey)
207+
logger.GetLogger().Log(logger.Debug, msg)
205208
}
206209
entry, ok := bindCounts[concatKey]
207210
if !ok {
@@ -210,7 +213,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
210213
Name: bindName,
211214
Value: bindValue,
212215
Workers: make(map[string]*WorkerClient),
213-
}
216+
}
214217
bindCounts[concatKey] = entry
215218
}
216219

@@ -227,7 +230,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
227230
bindName := entry.Name
228231
bindValue := entry.Value
229232

230-
if len(entry.Workers) < int( float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers) ) {
233+
if len(entry.Workers) < int(float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers)) {
231234
continue
232235
}
233236
// evict sqlhash, bindvalue
@@ -241,7 +244,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
241244

242245
if mgr.dispatchedWorkers[worker] != ticket ||
243246
worker.Status == wsFnsh ||
244-
worker.isUnderRecovery == 1 /* Recover() uses compare & swap */ {
247+
atomic.LoadInt32(&worker.isUnderRecovery) == 1 /* Recover() uses compare & swap */ {
245248

246249
continue
247250
}
@@ -274,10 +277,10 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
274277
throttle.incrAllowEveryX()
275278
} else {
276279
throttle := BindThrottle{
277-
Name: bindName,
278-
Value: bindValue,
279-
Sqlhash: sqlhash,
280-
AllowEveryX: 3*len(entry.Workers) + 1,
280+
Name: bindName,
281+
Value: bindValue,
282+
Sqlhash: sqlhash,
283+
AllowEveryX: 3*len(entry.Workers) + 1,
281284
}
282285
now := time.Now()
283286
throttle.RecentAttempt.Store(&now)
@@ -464,7 +467,7 @@ func (mgr *adaptiveQueueManager) getWorkerToRecover() (*WorkerClient, bool) {
464467
}
465468
}
466469
} else {
467-
if worker != nil && worker.Status == wsFnsh {
470+
if worker != nil && worker.Status == wsFnsh {
468471
if logger.GetLogger().V(logger.Warning) {
469472
logger.GetLogger().Log(logger.Warning, "worker.pid state is in FNSH, so skipping", worker.pid)
470473
}

lib/workerbroker.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ package lib
1919

2020
import (
2121
"errors"
22+
"github.com/paypal/hera/utility/logger"
2223
"os"
2324
"os/signal"
2425
"sync"
2526
"syscall"
26-
27-
"github.com/paypal/hera/utility/logger"
27+
"time"
2828
)
2929

3030
// HeraWorkerType defines the possible worker type
@@ -291,7 +291,7 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) {
291291
if errors.Is(err, syscall.ECHILD) {
292292
break
293293
} else {
294-
logger.GetLogger().Log(logger.Verbose, "error in wait signal: ", err)
294+
logger.GetLogger().Log(logger.Warning, "error in wait signal: ", err)
295295
}
296296
}
297297
}
@@ -327,6 +327,7 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) {
327327
logger.GetLogger().Log(logger.Debug, "worker (id=", workerclient.ID, "pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.")
328328
}
329329
workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long
330+
time.Sleep(5 * time.Second)
330331
pool.RestartWorker(workerclient)
331332
}
332333
} else {

lib/workerclient.go

+12-10
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ type WorkerClient struct {
148148
rqId uint32
149149

150150
//
151-
// under recovery. 0: no; 1: yes. use atomic.CompareAndSwapInt32 to check state.
151+
// under recovery. 0: no; 1: yes. use atomic.CompareAndSwapInt32 to check state and use atomic.LoadInt32 to read state
152152
//
153153
isUnderRecovery int32
154154

@@ -204,7 +204,7 @@ func NewWorker(wid int, wType HeraWorkerType, instID int, shardID int, moduleNam
204204
}
205205
// TODO
206206
worker.racID = -1
207-
worker.isUnderRecovery = 0
207+
atomic.CompareAndSwapInt32(&worker.isUnderRecovery, 1, 0)
208208
if worker.ctrlCh != nil {
209209
close(worker.ctrlCh)
210210
}
@@ -214,6 +214,7 @@ func NewWorker(wid int, wType HeraWorkerType, instID int, shardID int, moduleNam
214214
// msg. if adaptiveqmgr blocks on a non-buffered channel, there is a deadlock when return worker
215215
//
216216
worker.ctrlCh = make(chan *workerMsg, 5)
217+
217218
return worker
218219
}
219220

@@ -590,7 +591,7 @@ func (worker *WorkerClient) initiateRecover(param int, p *WorkerPool, prior Hera
590591
param = common.StrandedSkipBreakHiLoad
591592
}
592593
} else {
593-
rv = time.After(time.Millisecond * time.Duration(GetConfig().StrandedWorkerTimeoutMs))
594+
rv = time.After(time.Millisecond * 100000)
594595
}
595596
buff := []byte{byte(param), byte((worker.rqId & 0xFF000000) >> 24), byte((worker.rqId & 0x00FF0000) >> 16),
596597
byte((worker.rqId & 0x0000FF00) >> 8), byte((worker.rqId & 0x000000FF))}
@@ -643,9 +644,6 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor
643644
if logger.GetLogger().V(logger.Debug) {
644645
logger.GetLogger().Log(logger.Debug, "worker already underrecovery: ", worker.ID, " process Id: ", worker.pid)
645646
}
646-
//
647-
// defer will not be called.
648-
//
649647
return
650648
}
651649
defer func() {
@@ -725,7 +723,6 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor
725723
logger.GetLogger().Log(logger.Info, "stranded conn recovered", worker.Type, worker.pid)
726724
}
727725
worker.callogStranded("RECOVERED", info)
728-
729726
worker.setState(wsFnsh)
730727
if logger.GetLogger().V(logger.Debug) {
731728
logger.GetLogger().Log(logger.Debug, fmt.Sprintf("worker Id: %d, worker process: %d recovered as part of message from channel set status to FINSH", worker.ID, worker.pid))
@@ -918,7 +915,7 @@ func (worker *WorkerClient) doRead() {
918915
worker.setState(wsWait)
919916
}
920917
if eor != common.EORMoreIncomingRequests {
921-
worker.outCh <- &workerMsg{data: payload, eor: true, free: (eor == common.EORFree), inTransaction: ((eor == common.EORInTransaction) || (eor == common.EORInCursorInTransaction)), rqId: rqId}
918+
worker.outCh <- &workerMsg{data: payload, eor: true, free: (eor == common.EORFree), inTransaction: ((eor == common.EORInTransaction) || (eor == common.EORInCursorInTransaction)), rqId: uint32(rqId)}
922919
payload = nil
923920
} else {
924921
// buffer data to avoid race condition
@@ -955,8 +952,13 @@ func (worker *WorkerClient) doRead() {
955952

956953
// Write sends a message to the worker
957954
func (worker *WorkerClient) Write(ns *netstring.Netstring, nsCount uint16) error {
955+
if atomic.LoadInt32(&worker.isUnderRecovery) == 1 {
956+
if logger.GetLogger().V(logger.Alert) {
957+
logger.GetLogger().Log(logger.Alert, "workerclient write error: worker is under recovery.")
958+
}
959+
return ErrWorkerFail
960+
}
958961
worker.setState(wsBusy)
959-
960962
worker.rqId += uint32(nsCount)
961963

962964
//
@@ -984,7 +986,7 @@ func (worker *WorkerClient) setState(status HeraWorkerStatus) {
984986
if currentStatus == status {
985987
return
986988
}
987-
if worker.isUnderRecovery == 1 && (status == wsWait || status == wsBusy) {
989+
if atomic.LoadInt32(&worker.isUnderRecovery) == 1 && (status == wsWait || status == wsBusy) {
988990
logger.GetLogger().Log(logger.Warning, "worker : ", worker.ID, "processId: ", worker.pid, " seeing invalid state transition from ", currentStatus, " to ", status)
989991
if logger.GetLogger().V(logger.Debug) {
990992
worker.printCallStack()

lib/workerpool.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func (pool *WorkerPool) RestartWorker(worker *WorkerClient) (err error) {
193193
}
194194
pool.activeQ.Remove(worker)
195195
pool.poolCond.L.Unlock()
196-
196+
time.Sleep(time.Millisecond * 3000)
197197
go pool.spawnWorker(worker.ID)
198198
return nil
199199
}

0 commit comments

Comments
 (0)