-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker_observer.go
90 lines (77 loc) · 1.62 KB
/
worker_observer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package siq
import (
"sync"
"github.com/mrasu/Siq/workers"
)
type WorkerObserver struct {
workingWorker map[*workers.Worker]struct{}
workers []*workers.Worker
m sync.Locker
ss *SystemSiq
}
func newWorkerObserver(s *Siq) *WorkerObserver {
wo := &WorkerObserver{}
wo.init(s)
return wo
}
func (wo *WorkerObserver) init(s *Siq) {
wo.workingWorker = map[*workers.Worker]struct{}{}
wo.workers = []*workers.Worker{}
wo.m = &sync.Mutex{}
wo.ss = NewSystemSiq(wo, s)
}
func (wo *WorkerObserver) Add(w *workers.Worker) {
wo.m.Lock()
defer wo.m.Unlock()
wo.workers = append(wo.workers, w)
}
func (wo *WorkerObserver) Delete(tw *workers.Worker) {
wo.m.Lock()
defer wo.m.Unlock()
for i, w := range(wo.workers) {
if w == tw {
wo.workers = append(wo.workers[:i], wo.workers[i+1:]...)
break
}
}
}
func (wo *WorkerObserver) Count() int {
return len(wo.workers)
}
func (wo *WorkerObserver) LockIdleWorker(start int) (int, *workers.Worker){
wo.m.Lock()
defer wo.m.Unlock()
if len(wo.workers) == 0 {
return -1, nil
}
if len(wo.workers) <= start {
start = 0
}
next := start
for true {
w := wo.workers[next]
if _, working := wo.workingWorker[w]; !working {
wo.workingWorker[w] = struct{}{}
return next, w
}
next += 1
if len(wo.workers) - 1 < next {
next = 0
}
if next == start {
break
}
}
return -1, nil
}
func (wo *WorkerObserver) Unlock(w *workers.Worker) {
wo.m.Lock()
defer wo.m.Unlock()
if _, ok := wo.workingWorker[w]; ok {
delete(wo.workingWorker, w)
}
}
func (wo *WorkerObserver) NotifyFail(w *workers.Worker) {
wo.Delete(w)
wo.ss.AddDyingWorker(w)
}