Skip to content

Commit d84c389

Browse files
authored
fix: goroutine leak for tasker (#38)
Signed-off-by: rfyiamcool <[email protected]>
1 parent b2e2c14 commit d84c389

File tree

2 files changed

+47
-16
lines changed

2 files changed

+47
-16
lines changed

Diff for: pkg/tasker/tasker.go

+25-14
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,17 @@ func New(opt Option) *Tasker {
9595
logger = log.New(file, "", log.LstdFlags)
9696
}
9797

98-
return &Tasker{Log: logger, loc: loc, gron: gron, exprs: exprs, tasks: tasks, verbose: opt.Verbose}
98+
ctx, cancel := context.WithCancel(context.Background())
99+
return &Tasker{
100+
Log: logger,
101+
loc: loc,
102+
gron: gron,
103+
exprs: exprs,
104+
tasks: tasks,
105+
verbose: opt.Verbose,
106+
ctx: ctx,
107+
ctxCancel: cancel,
108+
}
99109
}
100110

101111
// WithContext adds a parent context to the Tasker struct
@@ -105,14 +115,6 @@ func (t *Tasker) WithContext(ctx context.Context) *Tasker {
105115
return t
106116
}
107117

108-
func (t *Tasker) ctxDone() {
109-
<-t.ctx.Done()
110-
if t.verbose {
111-
t.Log.Printf("[tasker] received signal on context.Done, aborting")
112-
}
113-
t.abort = true
114-
}
115-
116118
// Taskify creates TaskFunc out of plain command wrt given options.
117119
func (t *Tasker) Taskify(cmd string, opt Option) TaskFunc {
118120
sh := Shell(opt.Shell)
@@ -259,6 +261,11 @@ func (t *Tasker) Run() {
259261

260262
// Stop the task manager.
261263
func (t *Tasker) Stop() {
264+
t.stop()
265+
}
266+
267+
func (t *Tasker) stop() {
268+
t.ctxCancel()
262269
t.abort = true
263270
}
264271

@@ -282,16 +289,20 @@ func (t *Tasker) doSetup() {
282289
break
283290
}
284291
}
285-
if t.ctx != nil {
286-
go t.ctxDone()
287-
}
288292

289293
sig := make(chan os.Signal, 1)
290294
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
291295

292296
go func() {
293-
<-sig
294-
t.abort = true
297+
select {
298+
case <-sig:
299+
case <-t.ctx.Done():
300+
if t.verbose {
301+
t.Log.Printf("[tasker] received signal on context.Done, aborting")
302+
}
303+
}
304+
305+
t.stop()
295306
}()
296307
}
297308

Diff for: pkg/tasker/tasker_test.go

+22-2
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ func TestConcurrency(t *testing.T) {
202202
}
203203

204204
func TestStopTasker(t *testing.T) {
205-
t.Run("Run", func(t *testing.T) {
205+
t.Run("call stop()", func(t *testing.T) {
206206
taskr := New(Option{Verbose: true, Out: "../../test/tasker.out"})
207207

208208
var incr int
@@ -216,7 +216,27 @@ func TestStopTasker(t *testing.T) {
216216
taskr.Stop()
217217
}()
218218
taskr.Run()
219-
fmt.Println(incr)
219+
220+
if incr != 1 {
221+
t.Errorf("the task should run 1x, not %dx", incr)
222+
}
223+
})
224+
225+
t.Run("cancel context", func(t *testing.T) {
226+
ctx, cancel := context.WithCancel(context.Background())
227+
taskr := New(Option{Verbose: true, Out: "../../test/tasker.out"}).WithContext(ctx)
228+
229+
var incr int
230+
taskr.Task("* * * * * *", func(ctx context.Context) (int, error) {
231+
incr++
232+
return 0, nil
233+
}, false)
234+
235+
go func() {
236+
time.Sleep(2 * time.Second)
237+
cancel()
238+
}()
239+
taskr.Run()
220240

221241
if incr != 1 {
222242
t.Errorf("the task should run 1x, not %dx", incr)

0 commit comments

Comments
 (0)