Skip to content

Commit 5ae8ee0

Browse files
authored
Revert "first version of changes to avoid accidental worker state update during worker recovery"
This reverts commit 8739920.
1 parent 9a46780 commit 5ae8ee0

5 files changed

+28
-82
lines changed

lib/coordinator.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -831,7 +831,6 @@ func (crd *Coordinator) dispatchRequest(request *netstring.Netstring) error {
831831
// donot return a stranded worker. recover inserts a good worker back to pool.
832832
//
833833
if err == ErrSaturationKill {
834-
logger.GetLogger().Log(logger.Info, "trigger recovery as part of ErrSaturationKill")
835834
go worker.Recover(workerpool, ticket, WorkerClientRecoverParam{allowSkipOciBreak: true}, &strandedCalInfo{raddr: crd.conn.RemoteAddr().String(), laddr: crd.conn.LocalAddr().String(), nameSuffix: "_SATURATION_RECOVERED"}, common.StrandedSaturationRecover)
836835
} else {
837836
go worker.Recover(workerpool, ticket, WorkerClientRecoverParam{allowSkipOciBreak: true}, &strandedCalInfo{raddr: crd.conn.RemoteAddr().String(), laddr: crd.conn.LocalAddr().String()})
@@ -926,7 +925,7 @@ func (crd *Coordinator) doRequest(ctx context.Context, worker *WorkerClient, req
926925
// The worker is in ACPT state.
927926
// It will not finish recovery because of ACPT. The worker will never get back into the pool.
928927
// Just marking the state as FNSH and dispatchRequest will return the worker back to the pool.
929-
worker.setState(wsFnsh, false)
928+
worker.setState(wsFnsh)
930929
return false, ErrReqParseFail
931930
}
932931
cnt := 1

lib/workerbroker.go

+6-8
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ type WorkerBroker struct {
6464
// and restart the stopped workers.
6565
//
6666
pidworkermap map[int32]*WorkerClient
67-
lock sync.Mutex
67+
lock sync.Mutex
6868

6969
//
7070
// loaded from cfg once and used later.
@@ -204,9 +204,7 @@ func (broker *WorkerBroker) GetWorkerPoolCfgs() (pCfgs []map[HeraWorkerType]*Wor
204204

205205
// GetWorkerPool get the worker pool object for the type and id
206206
// ids holds optional paramenters.
207-
//
208-
// ids[0] == instance id; ids[1] == shard id.
209-
//
207+
// ids[0] == instance id; ids[1] == shard id.
210208
// if a particular id is not set, it defaults to 0.
211209
// TODO: interchange sid <--> instId since instId is not yet used
212210
func (broker *WorkerBroker) GetWorkerPool(wType HeraWorkerType, ids ...int) (workerbroker *WorkerPool, err error) {
@@ -318,7 +316,7 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) {
318316
if logger.GetLogger().V(logger.Debug) {
319317
logger.GetLogger().Log(logger.Debug, "worker (pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.")
320318
}
321-
workerclient.setState(wsUnset, true) // Set the state to UNSET to make sure worker does not stay in FNSH state so long
319+
workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long
322320
pool.RestartWorker(workerclient)
323321
}
324322
} else {
@@ -367,8 +365,8 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) {
367365
}
368366

369367
/*
370-
resizePool calls workerpool.Resize to resize a worker pool when the dynamic configuration of
371-
the number of workers changed
368+
resizePool calls workerpool.Resize to resize a worker pool when the dynamic configuration of
369+
the number of workers changed
372370
*/
373371
func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, shardID int) {
374372
broker.poolCfgs[0][wType].maxWorkerCnt = maxWorkers
@@ -383,7 +381,7 @@ func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, sha
383381
}
384382

385383
/*
386-
changeMaxWorkers is called when the dynamic config changed, it calls resizePool() for all the pools
384+
changeMaxWorkers is called when the dynamic config changed, it calls resizePool() for all the pools
387385
*/
388386
func (broker *WorkerBroker) changeMaxWorkers() {
389387
wW := GetNumWWorkers(0)

lib/workerclient.go

+13-64
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"net"
2626
"os"
2727
"path/filepath"
28-
"runtime"
2928
"strconv"
3029
"strings"
3130
"sync/atomic"
@@ -472,7 +471,7 @@ func (worker *WorkerClient) StartWorker() (err error) {
472471
logger.GetLogger().Log(logger.Info, "Started ", workerPath, ", pid=", pid)
473472
}
474473
worker.pid = pid
475-
worker.setState(wsInit, false)
474+
worker.setState(wsInit)
476475
return nil
477476
}
478477

@@ -542,7 +541,7 @@ func (worker *WorkerClient) attachToWorker() (err error) {
542541
logger.GetLogger().Log(logger.Info, "Got control message from worker (", worker.ID, ",", worker.pid, ",", worker.racID, ",", worker.dbUname, ")")
543542
}
544543

545-
worker.setState(wsAcpt, false)
544+
worker.setState(wsAcpt)
546545

547546
pool, err := GetWorkerBrokerInstance().GetWorkerPool(worker.Type, worker.instID, worker.shardID)
548547
if err != nil {
@@ -634,11 +633,11 @@ type WorkerClientRecoverParam struct {
634633
func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam WorkerClientRecoverParam, info *strandedCalInfo, param ...int) {
635634
if atomic.CompareAndSwapInt32(&worker.isUnderRecovery, 0, 1) {
636635
if logger.GetLogger().V(logger.Debug) {
637-
logger.GetLogger().Log(logger.Debug, "begin recover worker Id: ", worker.ID, " process Id: ", worker.pid)
636+
logger.GetLogger().Log(logger.Debug, "begin recover worker: ", worker.pid)
638637
}
639638
} else {
640639
if logger.GetLogger().V(logger.Debug) {
641-
logger.GetLogger().Log(logger.Debug, "worker already underrecovery: ", worker.ID, " process Id: ", worker.pid)
640+
logger.GetLogger().Log(logger.Debug, "worker already underrecovery: ", worker.pid)
642641
}
643642
//
644643
// defer will not be called.
@@ -666,10 +665,7 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor
666665
return
667666
}
668667
priorWorkerStatus := worker.Status
669-
if logger.GetLogger().V(logger.Debug) {
670-
logger.GetLogger().Log(logger.Debug, fmt.Sprintf("about to recover worker Id: %d, worker process Id: %d as part of reconvery process, setting worker state to Quece", worker.ID, worker.pid))
671-
}
672-
worker.setState(wsQuce, true)
668+
worker.setState(wsQuce)
673669
killparam := common.StrandedClientClose
674670
if len(param) > 0 {
675671
killparam = param[0]
@@ -680,17 +676,9 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor
680676
select {
681677
case <-workerRecoverTimeout:
682678
worker.thr.CanRun()
683-
worker.setState(wsInit, true) // Set the worker state to INIT when we decide to Terminate the worker
684-
GetStateLog().PublishStateEvent(StateEvent{eType: WorkerStateEvt, shardID: worker.shardID, wType: worker.Type, instID: worker.instID, workerID: worker.ID, newWState: worker.Status})
679+
worker.setState(wsInit) // Set the worker state to INIT when we decide to Terminate the worker
685680
worker.Terminate()
686681
worker.callogStranded("RECYCLED", info)
687-
if logger.GetLogger().V(logger.Debug) {
688-
logger.GetLogger().Log(logger.Debug, fmt.Sprintf("worker Id: %d and process: %d recovered as part of workerRecoverTimeout set status to INIT", worker.ID, worker.pid))
689-
}
690-
err := p.RestartWorker(worker)
691-
if err != nil {
692-
logger.GetLogger().Log(logger.Alert, fmt.Sprintf("worker: %d failed to restart worker process", worker.ID))
693-
}
694682
return
695683
case msg, ok := <-worker.channel():
696684
if !ok {
@@ -727,10 +715,7 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor
727715
}
728716
worker.callogStranded("RECOVERED", info)
729717

730-
worker.setState(wsFnsh, true)
731-
if logger.GetLogger().V(logger.Debug) {
732-
logger.GetLogger().Log(logger.Debug, fmt.Sprintf("worker Id: %d, worker process: %d recovered as part of message from channel set status to FINSH", worker.ID, worker.pid))
733-
}
718+
worker.setState(wsFnsh)
734719
p.ReturnWorker(worker, ticket)
735720
//
736721
// donot set state to ACPT since worker could already be picked up by another
@@ -910,13 +895,13 @@ func (worker *WorkerClient) doRead() {
910895
logger.GetLogger().Log(logger.Verbose, "workerclient (<<< pid =", worker.pid, ",wrqId:", worker.rqId, "): EOR code:", eor, ", rqId: ", rqId, ", data:", DebugString(payload))
911896
}
912897
if eor == common.EORFree {
913-
worker.setState(wsFnsh, false)
898+
worker.setState(wsFnsh)
914899
/*worker.sqlStartTimeMs = 0
915900
if logger.GetLogger().V(logger.Verbose) {
916901
logger.GetLogger().Log(logger.Verbose, "workerclient sqltime=", worker.sqlStartTimeMs)
917902
}*/
918903
} else {
919-
worker.setState(wsWait, false)
904+
worker.setState(wsWait)
920905
}
921906
if eor != common.EORMoreIncomingRequests {
922907
worker.outCh <- &workerMsg{data: payload, eor: true, free: (eor == common.EORFree), inTransaction: ((eor == common.EORInTransaction) || (eor == common.EORInCursorInTransaction)), rqId: rqId}
@@ -940,7 +925,7 @@ func (worker *WorkerClient) doRead() {
940925
return
941926
default:
942927
if ns.Cmd != common.RcStillExecuting {
943-
worker.setState(wsWait, false)
928+
worker.setState(wsWait)
944929
}
945930
if logger.GetLogger().V(logger.Verbose) {
946931
logger.GetLogger().Log(logger.Verbose, "workerclient (<<< pid =", worker.pid, "): data:", DebugString(ns.Serialized), len(ns.Serialized))
@@ -956,7 +941,7 @@ func (worker *WorkerClient) doRead() {
956941

957942
// Write sends a message to the worker
958943
func (worker *WorkerClient) Write(ns *netstring.Netstring, nsCount uint16) error {
959-
worker.setState(wsBusy, false)
944+
worker.setState(wsBusy)
960945

961946
worker.rqId += uint32(nsCount)
962947

@@ -980,24 +965,12 @@ func (worker *WorkerClient) Write(ns *netstring.Netstring, nsCount uint16) error
980965
}
981966

982967
// setState updates the worker state
983-
func (worker *WorkerClient) setState(status HeraWorkerStatus, callFromRecovery bool) {
968+
func (worker *WorkerClient) setState(status HeraWorkerStatus) {
984969
if worker.Status == status {
985970
return
986971
}
987-
if worker.isUnderRecovery == 1 && !callFromRecovery {
988-
if logger.GetLogger().V(logger.Info) {
989-
//If worker under recovery drinup of channel happens as part of DrainResponseChannel
990-
logger.GetLogger().Log(logger.Info, "worker : ", worker.ID, " is under recovery. "+
991-
"workerclient pid=", worker.pid, "not allowed changing status from", worker.Status, "to", status)
992-
}
993-
if logger.GetLogger().V(logger.Debug) {
994-
worker.printCallStack()
995-
}
996-
return
997-
}
998972
if logger.GetLogger().V(logger.Debug) {
999-
logger.GetLogger().Log(logger.Debug, "worker Id=", worker.ID, " worker pid=", worker.pid, " changing status from", worker.Status, "to", status)
1000-
worker.printCallStack()
973+
logger.GetLogger().Log(logger.Debug, "worker pid=", worker.pid, " changing status from", worker.Status, "to", status)
1001974
}
1002975

1003976
// TODO: sync atomic set
@@ -1029,27 +1002,3 @@ func (worker *WorkerClient) isProcessRunning() bool {
10291002
}
10301003
return true
10311004
}
1032-
1033-
func (worker *WorkerClient) printCallStack() {
1034-
// Define a large enough buffer to capture the stack.
1035-
const depth = 64
1036-
pcs := make([]uintptr, depth)
1037-
1038-
// Collect the stack trace.
1039-
n := runtime.Callers(2, pcs) // Skip the first 2 callers (runtime and printCallStack itself).
1040-
frames := runtime.CallersFrames(pcs[:n])
1041-
indent := 0
1042-
// Iterate through the frames and print function names and line numbers.
1043-
var builder strings.Builder
1044-
builder.WriteString(fmt.Sprintf("worker Id= %d Process Id= %d Call Stack:", worker.ID, worker.pid))
1045-
for {
1046-
frame, more := frames.Next()
1047-
builder.WriteString(fmt.Sprintf("%s - %s\n", strings.Repeat(" ", indent), frame.Function))
1048-
builder.WriteString(fmt.Sprintf("%s at %s:%d\n", strings.Repeat(" ", indent), frame.File, frame.Line))
1049-
indent++
1050-
if !more {
1051-
break
1052-
}
1053-
}
1054-
logger.GetLogger().Log(logger.Debug, builder.String())
1055-
}

lib/workerpool.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func (pool *WorkerPool) Init(wType HeraWorkerType, size int, instID int, shardID
116116
func (pool *WorkerPool) spawnWorker(wid int) error {
117117
worker := NewWorker(wid, pool.Type, pool.InstID, pool.ShardID, pool.moduleName, pool.thr)
118118

119-
worker.setState(wsSchd, false)
119+
worker.setState(wsSchd)
120120
millis := rand.Intn(GetConfig().RandomStartMs)
121121
if logger.GetLogger().V(logger.Alert) {
122122
logger.GetLogger().Log(logger.Alert, wid, "randomized start ms", millis)
@@ -495,7 +495,7 @@ func (pool *WorkerPool) ReturnWorker(worker *WorkerClient, ticket string) (err e
495495
worker.DrainResponseChannel(time.Microsecond * 10)
496496
}
497497

498-
worker.setState(wsAcpt, true)
498+
worker.setState(wsAcpt)
499499
if (pool.desiredSize < pool.currentSize) && (worker.ID >= pool.desiredSize) {
500500
go func(w *WorkerClient) {
501501
if logger.GetLogger().V(logger.Info) {

lib/workerpool_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,12 @@ func TestPoolDempotency(t *testing.T) {
7676
wd := NewWorker(3, wtypeRW, 0, 0, "cloc", nil)
7777
we := NewWorker(4, wtypeRW, 0, 0, "cloc", nil)
7878
wf := NewWorker(5, wtypeRW, 0, 0, "cloc", nil)
79-
wa.setState(wsAcpt, false)
80-
wb.setState(wsAcpt, false)
81-
wc.setState(wsAcpt, false)
82-
wd.setState(wsAcpt, false)
83-
we.setState(wsAcpt, false)
84-
wf.setState(wsAcpt, false)
79+
wa.setState(wsAcpt)
80+
wb.setState(wsAcpt)
81+
wc.setState(wsAcpt)
82+
wd.setState(wsAcpt)
83+
we.setState(wsAcpt)
84+
wf.setState(wsAcpt)
8585
pool.WorkerReady(wa)
8686
pool.WorkerReady(wb)
8787
pool.WorkerReady(wc)

0 commit comments

Comments
 (0)