Skip to content

Commit 0e4c25f

Browse files
committed
move process reaping login go from C
1 parent 0ad258d commit 0e4c25f

File tree

2 files changed

+60
-65
lines changed

2 files changed

+60
-65
lines changed

lib/workerbroker.go

+55-46
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"sync"
2525
"syscall"
2626

27-
"github.com/paypal/hera/utility"
2827
"github.com/paypal/hera/utility/logger"
2928
)
3029

@@ -275,59 +274,69 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) {
275274
// we can get all the pids in this call. double the size in case we
276275
// get none-hera defunct processes. +1 in case racing casue mapsize=0.
277276
//
278-
var arraySize = 2*len(broker.pidworkermap) + 1
279-
var defunctPids = make([]int32, arraySize)
280-
if logger.GetLogger().V(logger.Verbose) {
281-
logger.GetLogger().Log(logger.Verbose, "Wait SIGCHLD len=", arraySize-1, ", pwmap:", broker.pidworkermap)
282-
}
283-
if arraySize > 0 {
284-
utility.ReapDefunctPids(defunctPids)
285-
}
286-
if logger.GetLogger().V(logger.Info) {
287-
logger.GetLogger().Log(logger.Info, "exited worker", defunctPids)
288-
}
289-
broker.lock.Lock()
290-
for i := 0; i < arraySize; i++ {
291-
//
292-
// last valid entry in stoppedpids is followed by one or more zeros.
293-
//
294-
if defunctPids[i] == 0 {
277+
defunctPids := make([]int32, 0)
278+
for {
279+
var status syscall.WaitStatus
280+
281+
//Reap exited children in non-blocking mode
282+
pid, err := syscall.Wait4(-1, &status, syscall.WNOHANG, nil)
283+
if pid > 0 {
284+
if logger.GetLogger().V(logger.Verbose) {
285+
logger.GetLogger().Log(logger.Verbose, "received worker exit signal for pid:", pid, " status: ", status)
286+
}
287+
defunctPids = append(defunctPids, int32(pid))
288+
} else if pid == 0 {
295289
break
290+
} else {
291+
if errors.Is(err, syscall.ECHILD) {
292+
break
293+
} else {
294+
logger.GetLogger().Log(logger.Verbose, "error in wait signal: ", err)
295+
}
296296
}
297-
var workerclient = broker.pidworkermap[defunctPids[i]]
298-
if workerclient != nil {
299-
delete(broker.pidworkermap, defunctPids[i])
300-
pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID)
301-
if err != nil {
302-
if logger.GetLogger().V(logger.Alert) {
303-
logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err)
297+
}
298+
299+
if len(defunctPids) > 0 {
300+
if logger.GetLogger().V(logger.Debug) {
301+
logger.GetLogger().Log(logger.Debug, "worker exit signal received from pids :", defunctPids)
302+
}
303+
broker.lock.Lock()
304+
for _, pid := range defunctPids {
305+
var workerclient = broker.pidworkermap[pid]
306+
if workerclient != nil {
307+
delete(broker.pidworkermap, pid)
308+
pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID)
309+
if err != nil {
310+
if logger.GetLogger().V(logger.Alert) {
311+
logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err)
312+
}
313+
} else {
314+
//
315+
// a worker could be terminated while serving a request.
316+
// in these cases, doRead() in workerclient will get an
317+
// EOF and exit. doSession() in coordinator will get the
318+
// worker outCh closed event and exit, at which point
319+
// coordinator itself calls returnworker to set connstate
320+
// from assign to idle.
321+
// no need to publish the following event again.
322+
//
323+
//if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) {
324+
// GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle})
325+
//}
326+
if logger.GetLogger().V(logger.Debug) {
327+
logger.GetLogger().Log(logger.Debug, "worker (id=", workerclient.ID, "pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.")
328+
}
329+
workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long
330+
pool.RestartWorker(workerclient)
304331
}
305332
} else {
306-
//
307-
// a worker could be terminated while serving a request.
308-
// in these cases, doRead() in workerclient will get an
309-
// EOF and exit. doSession() in coordinator will get the
310-
// worker outCh closed event and exit, at which point
311-
// coordinator itself calls returnworker to set connstate
312-
// from assign to idle.
313-
// no need to publish the following event again.
314-
//
315-
//if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) {
316-
// GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle})
317-
//}
318-
if logger.GetLogger().V(logger.Debug) {
319-
logger.GetLogger().Log(logger.Debug, "worker (pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.")
333+
if logger.GetLogger().V(logger.Alert) {
334+
logger.GetLogger().Log(logger.Alert, "Exited worker pid =", pid, " not found")
320335
}
321-
workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long
322-
pool.RestartWorker(workerclient)
323-
}
324-
} else {
325-
if logger.GetLogger().V(logger.Alert) {
326-
logger.GetLogger().Log(logger.Alert, "Exited worker pid =", defunctPids[i], " not found")
327336
}
328337
}
338+
broker.lock.Unlock()
329339
}
330-
broker.lock.Unlock()
331340
case syscall.SIGTERM:
332341
if logger.GetLogger().V(logger.Debug) {
333342
logger.GetLogger().Log(logger.Debug, "Got SIGTERM")

lib/workerclient.go

+5-19
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,6 @@ const (
5454
MaxWorkerState = 7
5555
)
5656

57-
var validStateTransitionMap map[HeraWorkerStatus][]HeraWorkerStatus = map[HeraWorkerStatus][]HeraWorkerStatus{
58-
wsUnset: {wsSchd, wsInit},
59-
wsSchd: {wsInit, wsUnset},
60-
wsInit: {wsSchd, wsAcpt, wsUnset},
61-
wsAcpt: {wsBusy},
62-
wsBusy: {wsWait, wsQuce, wsFnsh},
63-
wsWait: {wsQuce, wsFnsh},
64-
wsFnsh: {wsAcpt, wsSchd},
65-
wsQuce: {wsInit, wsFnsh}, //Forceful termination target state "wsInit", Graceful termination "wsFnsh"
66-
}
67-
6857
const bfChannelSize = 30
6958

7059
// workerMsg is used to communicate with the coordinator, it contains the control message metadata plus the actual payload
@@ -995,19 +984,16 @@ func (worker *WorkerClient) setState(status HeraWorkerStatus) {
995984
if currentStatus == status {
996985
return
997986
}
998-
//This checks whether state transition is valid or not
999-
if Contains(validStateTransitionMap[currentStatus], status) {
1000-
worker.stateLock.Lock()
1001-
worker.Status = status
1002-
worker.stateLock.Unlock()
1003-
GetStateLog().PublishStateEvent(StateEvent{eType: WorkerStateEvt, shardID: worker.shardID, wType: worker.Type, instID: worker.instID, workerID: worker.ID, newWState: status})
1004-
} else {
987+
if worker.isUnderRecovery == 1 && (status == wsWait || status == wsBusy) {
1005988
logger.GetLogger().Log(logger.Warning, "worker : ", worker.ID, "processId: ", worker.pid, " seeing invalid state transition from ", currentStatus, " to ", status)
1006989
if logger.GetLogger().V(logger.Debug) {
1007990
worker.printCallStack()
1008991
}
992+
return
1009993
}
1010-
994+
//This checks whether state transition is valid or not
995+
worker.Status = status
996+
GetStateLog().PublishStateEvent(StateEvent{eType: WorkerStateEvt, shardID: worker.shardID, wType: worker.Type, instID: worker.instID, workerID: worker.ID, newWState: status})
1011997
}
1012998

1013999
// Channel returns the worker out channel

0 commit comments

Comments
 (0)