-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgo.go
137 lines (125 loc) · 3.65 KB
/
go.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package group
import (
"context"
"errors"
"fmt"
"log/slog"
"time"
"golang.org/x/sync/errgroup"
)
func Go(ctx context.Context, opts *Options, fs ...func() error) (err error) {
if len(fs) == 0 {
return nil
}
// no opts short circuit
if opts == nil {
g, gtx := errgroup.WithContext(ctx)
g.SetLimit(len(fs)) // limit defaults to number of funcs
groupGo(gtx, g, nil, fs...)
return g.Wait()
}
if 0 < opts.Limit && opts.Limit < len(opts.dep) {
return errors.New("limit cannot be less than the number of funcs with deps")
}
if opts.Prefix == "" {
opts.Prefix = "anonymous"
}
if opts.WithLog {
defer func(start time.Time) {
groupMonitor(ctx, fmt.Sprintf("Go%s", cond(opts.dep != nil, " | Dep", "")), opts.Prefix, start, opts.WithLog, err)
}(time.Now())
}
g, gtx := errgroup.WithContext(ctx)
g.SetLimit(cond(opts.Limit > 0, opts.Limit, len(fs))) // limit defaults to number of funcs
// set timeout for group and fs
if opts.Timeout > 0 {
var cancel context.CancelFunc
gtx, cancel = context.WithTimeout(gtx, opts.Timeout)
defer cancel()
}
if opts.dep == nil {
groupGo(gtx, g, opts, fs...)
} else {
// go runners with deps
// separate ctx for tolerance control
opts.dep.groupGo(ctx, gtx, g, opts)
// go runners without deps
groupGo(gtx, g, opts, filter(fs, func(f func() error) bool { return opts.dep[fptr(f)] == nil })...)
}
// outer timeout control
if opts.Timeout > 0 {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-gtx.Done(): // actual timeout
if errors.Is(gtx.Err(), context.DeadlineExceeded) {
if opts.WithLog {
slog.InfoContext(gtx, fmt.Sprintf("[Group Go%s] group %s timeout", cond(opts.dep != nil, " | Dep", ""), opts.Prefix), slog.Duration("after", opts.Timeout))
}
return errors.New("group timeout")
}
return g.Wait()
}
}
}
return g.Wait()
}
func TryGo(ctx context.Context, opts *Options, fs ...func() error) (ok bool, err error) {
if len(fs) == 0 {
return true, nil
}
// no opts short circuit
if opts == nil {
g, ctx := errgroup.WithContext(ctx)
// limit defaults to number of funcs
g.SetLimit(len(fs))
return groupTryGo(ctx, g, nil, fs...), g.Wait()
}
if 0 < opts.Limit && opts.Limit < len(opts.dep) {
return false, errors.New("limit cannot be less than the number of funcs with deps")
}
if opts.Prefix == "" {
opts.Prefix = "anonymous"
}
if opts.WithLog {
defer func(start time.Time) {
groupMonitor(ctx, fmt.Sprintf("TryGo%s", cond(opts.dep != nil, " | Dep", "")), opts.Prefix, start, opts.WithLog, err)
}(time.Now())
}
g, gtx := errgroup.WithContext(ctx)
g.SetLimit(cond(opts.Limit > 0, opts.Limit, len(fs))) // limit defaults to the number of funcs
// set timeout for group and fs
if opts.Timeout > 0 {
var cancel context.CancelFunc
gtx, cancel = context.WithTimeout(gtx, opts.Timeout)
defer cancel()
}
if opts.dep == nil {
ok = groupTryGo(gtx, g, opts, fs...)
} else {
// go runners with deps
// separate ctx for tolerance control
ok = opts.dep.groupTryGo(ctx, gtx, g, opts)
// go runners without deps
ok = ok && groupTryGo(gtx, g, opts, filter(fs, func(r func() error) bool { return opts.dep[fptr(r)] == nil })...)
}
// outer timeout control
if opts.Timeout > 0 {
for {
select {
case <-ctx.Done():
return ok, ctx.Err()
case <-gtx.Done(): // actual timeout
if errors.Is(gtx.Err(), context.DeadlineExceeded) {
if opts.WithLog {
slog.InfoContext(gtx, fmt.Sprintf("[Group TryGo%s] group %s timeout", cond(opts.dep != nil, " | Dep", ""), opts.Prefix), slog.Duration("after", opts.Timeout))
}
return ok, errors.New("group timeout")
}
return ok, g.Wait()
}
}
}
return ok, g.Wait()
}