Skip to content

Commit 8ab8583

Browse files
committed
changes for adding atomic checks while sending and relading data during worker recovery
1 parent ddc8bd4 commit 8ab8583

File tree

8 files changed

+352
-159
lines changed

8 files changed

+352
-159
lines changed

lib/adaptivequemgr.go

+25-22
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import (
2121
"errors"
2222
"fmt"
2323
"math/rand"
24-
"sync/atomic"
2524
"strings"
25+
"sync/atomic"
2626
"time"
2727

2828
"github.com/paypal/hera/cal"
@@ -118,28 +118,31 @@ type BindCount struct {
118118
Workers map[string]*WorkerClient // lookup by ticket
119119
}
120120

121-
func bindEvictNameOk(bindName string) (bool) {
121+
func bindEvictNameOk(bindName string) bool {
122122
commaNames := GetConfig().BindEvictionNames
123123
if len(commaNames) == 0 {
124124
// for tests, allow all names to be subject to bind eviction
125125
return true
126126
}
127127
commaNames = strings.ToLower(commaNames)
128128
bindName = strings.ToLower(bindName)
129-
for _, okSubname := range strings.Split(commaNames,",") {
129+
for _, okSubname := range strings.Split(commaNames, ",") {
130130
if strings.Contains(bindName, okSubname) {
131131
return true
132132
}
133133
}
134134
return false
135135
}
136136

137-
/* A bad query with multiple binds will add independent bind throttles to all
138-
bind name and values */
139-
func (mgr *adaptiveQueueManager) doBindEviction() (int) {
137+
/*
138+
A bad query with multiple binds will add independent bind throttles to all
139+
140+
bind name and values
141+
*/
142+
func (mgr *adaptiveQueueManager) doBindEviction() int {
140143
throttleCount := 0
141144
GetBindEvict().lock.Lock()
142-
for _,keyValues := range GetBindEvict().BindThrottle {
145+
for _, keyValues := range GetBindEvict().BindThrottle {
143146
throttleCount += len(keyValues)
144147
}
145148
GetBindEvict().lock.Unlock()
@@ -172,14 +175,14 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
172175
}
173176
continue
174177
}
175-
contextBinds := parseBinds(request)
176-
sqlsrcPrefix := worker.clientHostPrefix.Load().(string)
177-
sqlsrcApp := worker.clientApp.Load().(string)
178+
contextBinds := parseBinds(request)
179+
sqlsrcPrefix := worker.clientHostPrefix.Load().(string)
180+
sqlsrcApp := worker.clientApp.Load().(string)
178181
if sqlsrcPrefix != "" {
179182
contextBinds[SrcPrefixAppKey] = fmt.Sprintf("%s%s", sqlsrcPrefix, sqlsrcApp)
180183
if logger.GetLogger().V(logger.Debug) {
181-
msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey])
182-
logger.GetLogger().Log(logger.Debug, msg)
184+
msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey])
185+
logger.GetLogger().Log(logger.Debug, msg)
183186
}
184187
}
185188
for bindName0, bindValue := range contextBinds {
@@ -200,8 +203,8 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
200203
}
201204
concatKey := fmt.Sprintf("%d|%s|%s", sqlhash, bindName, bindValue)
202205
if logger.GetLogger().V(logger.Debug) {
203-
msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey)
204-
logger.GetLogger().Log(logger.Debug, msg)
206+
msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey)
207+
logger.GetLogger().Log(logger.Debug, msg)
205208
}
206209
entry, ok := bindCounts[concatKey]
207210
if !ok {
@@ -210,7 +213,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
210213
Name: bindName,
211214
Value: bindValue,
212215
Workers: make(map[string]*WorkerClient),
213-
}
216+
}
214217
bindCounts[concatKey] = entry
215218
}
216219

@@ -227,7 +230,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
227230
bindName := entry.Name
228231
bindValue := entry.Value
229232

230-
if len(entry.Workers) < int( float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers) ) {
233+
if len(entry.Workers) < int(float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers)) {
231234
continue
232235
}
233236
// evict sqlhash, bindvalue
@@ -241,7 +244,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
241244

242245
if mgr.dispatchedWorkers[worker] != ticket ||
243246
worker.Status == wsFnsh ||
244-
worker.isUnderRecovery == 1 /* Recover() uses compare & swap */ {
247+
atomic.LoadInt32(&worker.isUnderRecovery) == 1 /* Recover() uses compare & swap */ {
245248

246249
continue
247250
}
@@ -274,10 +277,10 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) {
274277
throttle.incrAllowEveryX()
275278
} else {
276279
throttle := BindThrottle{
277-
Name: bindName,
278-
Value: bindValue,
279-
Sqlhash: sqlhash,
280-
AllowEveryX: 3*len(entry.Workers) + 1,
280+
Name: bindName,
281+
Value: bindValue,
282+
Sqlhash: sqlhash,
283+
AllowEveryX: 3*len(entry.Workers) + 1,
281284
}
282285
now := time.Now()
283286
throttle.RecentAttempt.Store(&now)
@@ -464,7 +467,7 @@ func (mgr *adaptiveQueueManager) getWorkerToRecover() (*WorkerClient, bool) {
464467
}
465468
}
466469
} else {
467-
if worker != nil && worker.Status == wsFnsh {
470+
if worker != nil && worker.Status == wsFnsh {
468471
if logger.GetLogger().V(logger.Warning) {
469472
logger.GetLogger().Log(logger.Warning, "worker.pid state is in FNSH, so skipping", worker.pid)
470473
}

lib/util.go

+14-4
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ func IsPidRunning(pid int) (isRunning bool) {
7979
}
8080

8181
/*
82-
1st return value: the number
83-
2nd return value: the number of digits
82+
1st return value: the number
83+
2nd return value: the number of digits
8484
*/
8585
func atoi(bf []byte) (int, int) {
8686
sz := len(bf)
@@ -96,8 +96,8 @@ func atoi(bf []byte) (int, int) {
9696
}
9797

9898
/*
99-
1st return value: the number
100-
2nd return value: the number of digits
99+
1st return value: the number
100+
2nd return value: the number of digits
101101
*/
102102
func atoui(str string) (uint64, int) {
103103
sz := len(str)
@@ -164,3 +164,13 @@ func ExtractSQLHash(request *netstring.Netstring) (uint32, bool) {
164164
}
165165
return 0, false
166166
}
167+
168+
// Contains This is utility method to check whether value present in list or not
169+
func Contains[T comparable](slice []T, value T) bool {
170+
for _, val := range slice {
171+
if val == value {
172+
return true
173+
}
174+
}
175+
return false
176+
}

lib/workerbroker.go

+63-53
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,11 @@ package lib
1919

2020
import (
2121
"errors"
22+
"github.com/paypal/hera/utility/logger"
2223
"os"
2324
"os/signal"
2425
"sync"
2526
"syscall"
26-
27-
"github.com/paypal/hera/utility"
28-
"github.com/paypal/hera/utility/logger"
2927
)
3028

3129
// HeraWorkerType defines the possible worker type
@@ -64,7 +62,7 @@ type WorkerBroker struct {
6462
// and restart the stopped workers.
6563
//
6664
pidworkermap map[int32]*WorkerClient
67-
lock sync.Mutex
65+
lock sync.Mutex
6866

6967
//
7068
// loaded from cfg once and used later.
@@ -204,7 +202,9 @@ func (broker *WorkerBroker) GetWorkerPoolCfgs() (pCfgs []map[HeraWorkerType]*Wor
204202

205203
// GetWorkerPool get the worker pool object for the type and id
206204
// ids holds optional paramenters.
207-
// ids[0] == instance id; ids[1] == shard id.
205+
//
206+
// ids[0] == instance id; ids[1] == shard id.
207+
//
208208
// if a particular id is not set, it defaults to 0.
209209
// TODO: interchange sid <--> instId since instId is not yet used
210210
func (broker *WorkerBroker) GetWorkerPool(wType HeraWorkerType, ids ...int) (workerbroker *WorkerPool, err error) {
@@ -273,59 +273,69 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) {
273273
// we can get all the pids in this call. double the size in case we
274274
// get none-hera defunct processes. +1 in case racing casue mapsize=0.
275275
//
276-
var arraySize = 2*len(broker.pidworkermap) + 1
277-
var defunctPids = make([]int32, arraySize)
278-
if logger.GetLogger().V(logger.Verbose) {
279-
logger.GetLogger().Log(logger.Verbose, "Wait SIGCHLD len=", arraySize-1, ", pwmap:", broker.pidworkermap)
280-
}
281-
if arraySize > 0 {
282-
utility.ReapDefunctPids(defunctPids)
283-
}
284-
if logger.GetLogger().V(logger.Info) {
285-
logger.GetLogger().Log(logger.Info, "exited worker", defunctPids)
286-
}
287-
broker.lock.Lock()
288-
for i := 0; i < arraySize; i++ {
289-
//
290-
// last valid entry in stoppedpids is followed by one or more zeros.
291-
//
292-
if defunctPids[i] == 0 {
276+
defunctPids := make([]int32, 0)
277+
for {
278+
var status syscall.WaitStatus
279+
280+
//Reap exited children in non-blocking mode
281+
pid, err := syscall.Wait4(-1, &status, syscall.WNOHANG, nil)
282+
if pid > 0 {
283+
if logger.GetLogger().V(logger.Verbose) {
284+
logger.GetLogger().Log(logger.Verbose, "received worker exit signal for pid:", pid, " status: ", status)
285+
}
286+
defunctPids = append(defunctPids, int32(pid))
287+
} else if pid == 0 {
293288
break
289+
} else {
290+
if errors.Is(err, syscall.ECHILD) {
291+
break
292+
} else {
293+
logger.GetLogger().Log(logger.Warning, "error in wait signal: ", err)
294+
}
294295
}
295-
var workerclient = broker.pidworkermap[defunctPids[i]]
296-
if workerclient != nil {
297-
delete(broker.pidworkermap, defunctPids[i])
298-
pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID)
299-
if err != nil {
300-
if logger.GetLogger().V(logger.Alert) {
301-
logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err)
296+
}
297+
298+
if len(defunctPids) > 0 {
299+
if logger.GetLogger().V(logger.Debug) {
300+
logger.GetLogger().Log(logger.Debug, "worker exit signal received from pids :", defunctPids)
301+
}
302+
broker.lock.Lock()
303+
for _, pid := range defunctPids {
304+
var workerclient = broker.pidworkermap[pid]
305+
if workerclient != nil {
306+
delete(broker.pidworkermap, pid)
307+
pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID)
308+
if err != nil {
309+
if logger.GetLogger().V(logger.Alert) {
310+
logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err)
311+
}
312+
} else {
313+
//
314+
// a worker could be terminated while serving a request.
315+
// in these cases, doRead() in workerclient will get an
316+
// EOF and exit. doSession() in coordinator will get the
317+
// worker outCh closed event and exit, at which point
318+
// coordinator itself calls returnworker to set connstate
319+
// from assign to idle.
320+
// no need to publish the following event again.
321+
//
322+
//if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) {
323+
// GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle})
324+
//}
325+
if logger.GetLogger().V(logger.Debug) {
326+
logger.GetLogger().Log(logger.Debug, "worker (id=", workerclient.ID, "pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.")
327+
}
328+
workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long
329+
pool.RestartWorker(workerclient)
302330
}
303331
} else {
304-
//
305-
// a worker could be terminated while serving a request.
306-
// in these cases, doRead() in workerclient will get an
307-
// EOF and exit. doSession() in coordinator will get the
308-
// worker outCh closed event and exit, at which point
309-
// coordinator itself calls returnworker to set connstate
310-
// from assign to idle.
311-
// no need to publish the following event again.
312-
//
313-
//if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) {
314-
// GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle})
315-
//}
316-
if logger.GetLogger().V(logger.Debug) {
317-
logger.GetLogger().Log(logger.Debug, "worker (pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.")
332+
if logger.GetLogger().V(logger.Alert) {
333+
logger.GetLogger().Log(logger.Alert, "Exited worker pid =", pid, " not found")
318334
}
319-
workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long
320-
pool.RestartWorker(workerclient)
321-
}
322-
} else {
323-
if logger.GetLogger().V(logger.Alert) {
324-
logger.GetLogger().Log(logger.Alert, "Exited worker pid =", defunctPids[i], " not found")
325335
}
326336
}
337+
broker.lock.Unlock()
327338
}
328-
broker.lock.Unlock()
329339
case syscall.SIGTERM:
330340
if logger.GetLogger().V(logger.Debug) {
331341
logger.GetLogger().Log(logger.Debug, "Got SIGTERM")
@@ -365,8 +375,8 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) {
365375
}
366376

367377
/*
368-
resizePool calls workerpool.Resize to resize a worker pool when the dynamic configuration of
369-
the number of workers changed
378+
resizePool calls workerpool.Resize to resize a worker pool when the dynamic configuration of
379+
the number of workers changed
370380
*/
371381
func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, shardID int) {
372382
broker.poolCfgs[0][wType].maxWorkerCnt = maxWorkers
@@ -381,7 +391,7 @@ func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, sha
381391
}
382392

383393
/*
384-
changeMaxWorkers is called when the dynamic config changed, it calls resizePool() for all the pools
394+
changeMaxWorkers is called when the dynamic config changed, it calls resizePool() for all the pools
385395
*/
386396
func (broker *WorkerBroker) changeMaxWorkers() {
387397
wW := GetNumWWorkers(0)

0 commit comments

Comments
 (0)