Skip to content

Commit 8b83453

Browse files
committedJan 11, 2022
remove unnecessary drain, fix data race
1 parent ce84a6c commit 8b83453

File tree

4 files changed

+12
-12
lines changed

4 files changed

+12
-12
lines changed
 

‎go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ require github.com/stretchr/testify v1.7.0
77
require (
88
github.com/davecgh/go-spew v1.1.1 // indirect
99
github.com/kr/text v0.2.0 // indirect
10-
go.uber.org/goleak v1.1.12 // indirect
10+
go.uber.org/goleak v1.1.12
1111
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
1212
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
1313
)

‎go.sum

+5
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
1919
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
2020
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
2121
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
22+
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
2223
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
24+
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
2325
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
2426
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
2527
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
@@ -31,16 +33,19 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
3133
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
3234
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
3335
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
36+
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE=
3437
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
3538
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
3639
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
3740
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
3841
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
3942
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
4043
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
44+
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
4145
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
4246
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
4347
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
48+
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
4449
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
4550
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
4651
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

‎mapreduce.go

+6-9
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ func Finish(fns ...func() error) error {
7575
cancel(err)
7676
}
7777
}, func(pipe <-chan interface{}, cancel func(error)) {
78-
drain(pipe)
7978
}, WithWorkers(len(fns)))
8079
}
8180

@@ -185,13 +184,11 @@ func mapReduceWithPanicChan(source <-chan interface{}, panicChan chan interface{
185184

186185
go func() {
187186
defer func() {
187+
drain(collector)
188188
if r := recover(); r != nil {
189189
panicChan <- r
190-
} else {
191-
finish()
192190
}
193-
194-
drain(collector)
191+
finish()
195192
}()
196193

197194
reducer(collector, writer, cancel)
@@ -216,7 +213,6 @@ func mapReduceWithPanicChan(source <-chan interface{}, panicChan chan interface{
216213
cancel(context.DeadlineExceeded)
217214
return nil, context.DeadlineExceeded
218215
case v := <-panicChan:
219-
finish()
220216
panic(v)
221217
case v, ok := <-output:
222218
if err := retErr.Load(); err != nil {
@@ -234,10 +230,11 @@ func mapReduceWithPanicChan(source <-chan interface{}, panicChan chan interface{
234230
func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
235231
_, err := MapReduce(generate, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
236232
reducer(input, cancel)
237-
// We need to write a placeholder to let MapReduce to continue on reducer done,
238-
// otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
239-
writer.Write(struct{}{})
240233
}, opts...)
234+
if errors.Is(err, ErrReduceNoOutput) {
235+
return nil
236+
}
237+
241238
return err
242239
}
243240

‎mapreduce_test.go

-2
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,6 @@ func TestMapperPanic(t *testing.T) {
161161
}, func(item interface{}, writer Writer, cancel func(error)) {
162162
panic("foo")
163163
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
164-
for range pipe {
165-
}
166164
})
167165
})
168166
})

0 commit comments

Comments
 (0)
Please sign in to comment.