-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprotect_goroutines.go
131 lines (122 loc) · 2.82 KB
/
protect_goroutines.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package code_block
import (
"context"
"errors"
"fmt"
"reflect"
"runtime/debug"
"sync"
"time"
)
// PanicHandler 定义一个处理panic的函数类型
type PanicHandler func(p Panic)
// Panic 记录协程panic信息
type Panic struct {
R interface{} // recover 返回信息
Stack []byte // 堆栈信息
}
// ProtectGoroutine 协程守护
type ProtectGoroutine struct {
sync.Mutex
wg *sync.WaitGroup
panics chan Panic
panicHandler PanicHandler
Ctx context.Context
Cancel context.CancelFunc
retryInterval time.Duration
}
// NewProtectGoroutine 创建协程执行对象
func NewProtectGoroutine(c context.Context, panicHandler PanicHandler) *ProtectGoroutine {
ctx, cancel := context.WithCancel(c)
pg := &ProtectGoroutine{
wg: new(sync.WaitGroup),
panics: make(chan Panic, 1),
panicHandler: panicHandler,
Ctx: ctx,
Cancel: cancel,
retryInterval: 3 * time.Second,
}
go pg.handlePanic()
return pg
}
// Protect 协程守护
func (p *ProtectGoroutine) Protect(fn interface{}, params []interface{}) (ret []reflect.Value, err error) {
c := make(chan struct{}, 1)
localPanics := make(chan Panic, 1)
go func() {
protect:
for {
c <- struct{}{}
go func() {
defer func() {
if e := recover(); e != nil {
<-c
stack := debug.Stack()
localPanics <- Panic{
R: e,
Stack: stack,
}
}
}()
ret, err = p.run(fn, params)
}()
select {
case <-p.Ctx.Done():
close(c)
break protect
case panicInfo := <-localPanics:
// 当发生panic时,不阻塞select,允许重试逻辑继续执行
p.panics <- panicInfo
}
time.Sleep(p.retryInterval)
}
}()
return
}
// Go 协程启动
func (p *ProtectGoroutine) Go(fn interface{}, params []interface{}) (ret []reflect.Value, err error) {
p.wg.Add(1)
done := make(chan struct{})
go func() {
defer func() {
p.wg.Done()
if e := recover(); e != nil {
p.panics <- Panic{
R: e,
Stack: debug.Stack(),
}
}
close(done)
}()
ret, err = p.run(fn, params)
}()
select {
case <-p.Ctx.Done():
return nil, p.Ctx.Err()
case <-done:
return ret, err
}
}
// handlePanic 处理panic信息
func (p *ProtectGoroutine) handlePanic() {
for panicInfo := range p.panics {
if p.panicHandler != nil {
p.panicHandler(panicInfo)
} else {
fmt.Println(panicInfo.R, string(panicInfo.Stack), time.Now())
}
}
}
// run 执行用户任务
func (p *ProtectGoroutine) run(fn interface{}, params []interface{}) (ret []reflect.Value, err error) {
v := reflect.ValueOf(fn)
if v.Type().Kind() != reflect.Func {
return nil, errors.New("params[1] is not callable")
}
v.Type().NumIn()
in := make([]reflect.Value, len(params))
for k, param := range params {
in[k] = reflect.ValueOf(param)
}
return v.Call(in), nil
}