Skip to content

Commit c01b293

Browse files
committed
fix: goroutine stuck on edge case
1 parent 8abf6b7 commit c01b293

File tree

2 files changed

+6
-16
lines changed

2 files changed

+6
-16
lines changed

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
# dev files
99
.idea
1010

11-
# Test binary, built with `go test -c`
11+
# For test
12+
**/testdata
1213
*.test
1314

1415
# Output of the go coverage tool, specifically when used with LiteIDE

mapreduce.go

+4-15
Original file line numberDiff line numberDiff line change
@@ -286,33 +286,21 @@ func drain(channel <-chan interface{}) {
286286

287287
func executeMappers(mCtx mapperContext) {
288288
var wg sync.WaitGroup
289-
pc := &onceChan{channel: make(chan interface{})}
290289
defer func() {
291-
// in case panic happens when processing last item, for loop not handling it.
292-
select {
293-
case r := <-pc.channel:
294-
mCtx.panicChan.write(r)
295-
default:
296-
}
297-
298290
wg.Wait()
299291
close(mCtx.collector)
300292
drain(mCtx.source)
301293
}()
302294

295+
var failed int32
303296
pool := make(chan struct{}, mCtx.workers)
304297
writer := newGuardedWriter(mCtx.ctx, mCtx.collector, mCtx.doneChan)
305-
for {
298+
for atomic.LoadInt32(&failed) == 0 {
306299
select {
307300
case <-mCtx.ctx.Done():
308301
return
309302
case <-mCtx.doneChan:
310303
return
311-
case r := <-pc.channel:
312-
// make sure this method quit ASAP,
313-
// without this case branch, all the items from source will be consumed.
314-
mCtx.panicChan.write(r)
315-
return
316304
case pool <- struct{}{}:
317305
item, ok := <-mCtx.source
318306
if !ok {
@@ -324,7 +312,8 @@ func executeMappers(mCtx mapperContext) {
324312
go func() {
325313
defer func() {
326314
if r := recover(); r != nil {
327-
pc.write(r)
315+
atomic.AddInt32(&failed, 1)
316+
mCtx.panicChan.write(r)
328317
}
329318
wg.Done()
330319
<-pool

0 commit comments

Comments
 (0)