Skip to content

Commit 2771716

Browse files
committed
feat: support panic handling with generics version
1 parent c01b293 commit 2771716

8 files changed

+163
-184
lines changed

examples/sum.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,20 @@ import (
44
"fmt"
55
"log"
66

7-
"github.com/kevwan/mapreduce"
7+
"github.com/kevwan/mapreduce/v2"
88
)
99

1010
func main() {
11-
val, err := mapreduce.MapReduce(func(source chan<- interface{}) {
11+
val, err := mapreduce.MapReduce(func(source chan<- int) {
1212
for i := 0; i < 10; i++ {
1313
source <- i
1414
}
15-
}, func(item interface{}, writer mapreduce.Writer, cancel func(error)) {
16-
i := item.(int)
17-
writer.Write(i * i)
18-
}, func(pipe <-chan interface{}, writer mapreduce.Writer, cancel func(error)) {
15+
}, func(item int, writer mapreduce.Writer[int], cancel func(error)) {
16+
writer.Write(item * item)
17+
}, func(pipe <-chan int, writer mapreduce.Writer[int], cancel func(error)) {
1918
var sum int
2019
for i := range pipe {
21-
sum += i.(int)
20+
sum += i
2221
}
2322
writer.Write(sum)
2423
})

go.mod

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
1-
module github.com/kevwan/mapreduce
1+
module github.com/kevwan/mapreduce/v2
22

3-
go 1.14
3+
go 1.18
44

5-
require github.com/stretchr/testify v1.7.0
5+
require (
6+
github.com/stretchr/testify v1.7.0
7+
go.uber.org/goleak v1.1.12
8+
)
69

710
require (
811
github.com/davecgh/go-spew v1.1.1 // indirect
912
github.com/kr/text v0.2.0 // indirect
10-
go.uber.org/goleak v1.1.12
13+
github.com/pmezard/go-difflib v1.0.0 // indirect
1114
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
1215
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
1316
)

go.sum

-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
2121
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
2222
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
2323
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
24-
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
2524
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
2625
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
2726
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
@@ -33,7 +32,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
3332
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
3433
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
3534
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
36-
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE=
3735
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
3836
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
3937
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -45,7 +43,6 @@ golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
4543
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
4644
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
4745
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
48-
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
4946
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
5047
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
5148
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

mapreduce.go

+55-55
Original file line numberDiff line numberDiff line change
@@ -21,29 +21,29 @@ var (
2121

2222
type (
2323
// ForEachFunc is used to do element processing, but no output.
24-
ForEachFunc func(item interface{})
24+
ForEachFunc[T any] func(item T)
2525
// GenerateFunc is used to let callers send elements into source.
26-
GenerateFunc func(source chan<- interface{})
26+
GenerateFunc[T any] func(source chan<- T)
2727
// MapFunc is used to do element processing and write the output to writer.
28-
MapFunc func(item interface{}, writer Writer)
28+
MapFunc[T, U any] func(item T, writer Writer[U])
2929
// MapperFunc is used to do element processing and write the output to writer,
3030
// use cancel func to cancel the processing.
31-
MapperFunc func(item interface{}, writer Writer, cancel func(error))
31+
MapperFunc[T, U any] func(item T, writer Writer[U], cancel func(error))
3232
// ReducerFunc is used to reduce all the mapping output and write to writer,
3333
// use cancel func to cancel the processing.
34-
ReducerFunc func(pipe <-chan interface{}, writer Writer, cancel func(error))
34+
ReducerFunc[U, V any] func(pipe <-chan U, writer Writer[V], cancel func(error))
3535
// VoidReducerFunc is used to reduce all the mapping output, but no output.
3636
// Use cancel func to cancel the processing.
37-
VoidReducerFunc func(pipe <-chan interface{}, cancel func(error))
37+
VoidReducerFunc[U any] func(pipe <-chan U, cancel func(error))
3838
// Option defines the method to customize the mapreduce.
3939
Option func(opts *mapReduceOptions)
4040

41-
mapperContext struct {
41+
mapperContext[T, U any] struct {
4242
ctx context.Context
43-
mapper MapFunc
44-
source <-chan interface{}
43+
mapper MapFunc[T, U]
44+
source <-chan T
4545
panicChan *onceChan
46-
collector chan<- interface{}
46+
collector chan<- U
4747
doneChan <-chan struct{}
4848
workers int
4949
}
@@ -54,8 +54,8 @@ type (
5454
}
5555

5656
// Writer interface wraps Write method.
57-
Writer interface {
58-
Write(v interface{})
57+
Writer[T any] interface {
58+
Write(v T)
5959
}
6060
)
6161

@@ -65,16 +65,15 @@ func Finish(fns ...func() error) error {
6565
return nil
6666
}
6767

68-
return MapReduceVoid(func(source chan<- interface{}) {
68+
return MapReduceVoid(func(source chan<- func() error) {
6969
for _, fn := range fns {
7070
source <- fn
7171
}
72-
}, func(item interface{}, writer Writer, cancel func(error)) {
73-
fn := item.(func() error)
72+
}, func(fn func() error, writer Writer[any], cancel func(error)) {
7473
if err := fn(); err != nil {
7574
cancel(err)
7675
}
77-
}, func(pipe <-chan interface{}, cancel func(error)) {
76+
}, func(pipe <-chan any, cancel func(error)) {
7877
}, WithWorkers(len(fns)))
7978
}
8079

@@ -84,27 +83,26 @@ func FinishVoid(fns ...func()) {
8483
return
8584
}
8685

87-
ForEach(func(source chan<- interface{}) {
86+
ForEach(func(source chan<- func()) {
8887
for _, fn := range fns {
8988
source <- fn
9089
}
91-
}, func(item interface{}) {
92-
fn := item.(func())
90+
}, func(fn func()) {
9391
fn()
9492
}, WithWorkers(len(fns)))
9593
}
9694

9795
// ForEach maps all elements from given generate but no output.
98-
func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) {
96+
func ForEach[T any](generate GenerateFunc[T], mapper ForEachFunc[T], opts ...Option) {
9997
options := buildOptions(opts...)
100-
panicChan := &onceChan{channel: make(chan interface{})}
98+
panicChan := &onceChan{channel: make(chan any)}
10199
source := buildSource(generate, panicChan)
102-
collector := make(chan interface{}, options.workers)
100+
collector := make(chan any, options.workers)
103101
done := make(chan struct{})
104102

105-
go executeMappers(mapperContext{
103+
go executeMappers(mapperContext[T, any]{
106104
ctx: options.ctx,
107-
mapper: func(item interface{}, writer Writer) {
105+
mapper: func(item T, writer Writer[any]) {
108106
mapper(item)
109107
},
110108
source: source,
@@ -128,26 +126,26 @@ func ForEach(generate GenerateFunc, mapper ForEachFunc, opts ...Option) {
128126

129127
// MapReduce maps all elements generated from given generate func,
130128
// and reduces the output elements with given reducer.
131-
func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc,
132-
opts ...Option) (interface{}, error) {
133-
panicChan := &onceChan{channel: make(chan interface{})}
129+
func MapReduce[T, U, V any](generate GenerateFunc[T], mapper MapperFunc[T, U], reducer ReducerFunc[U, V],
130+
opts ...Option) (V, error) {
131+
panicChan := &onceChan{channel: make(chan any)}
134132
source := buildSource(generate, panicChan)
135133
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
136134
}
137135

138136
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
139-
func MapReduceChan(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,
140-
opts ...Option) (interface{}, error) {
141-
panicChan := &onceChan{channel: make(chan interface{})}
137+
func MapReduceChan[T, U, V any](source <-chan T, mapper MapperFunc[T, U], reducer ReducerFunc[U, V],
138+
opts ...Option) (V, error) {
139+
panicChan := &onceChan{channel: make(chan any)}
142140
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
143141
}
144142

145143
// MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
146-
func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapper MapperFunc,
147-
reducer ReducerFunc, opts ...Option) (interface{}, error) {
144+
func mapReduceWithPanicChan[T, U, V any](source <-chan T, panicChan *onceChan, mapper MapperFunc[T, U],
145+
reducer ReducerFunc[U, V], opts ...Option) (val V, err error) {
148146
options := buildOptions(opts...)
149147
// output is used to write the final result
150-
output := make(chan interface{})
148+
output := make(chan V)
151149
defer func() {
152150
// reducer can only write once, if more, panic
153151
for range output {
@@ -156,7 +154,7 @@ func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapp
156154
}()
157155

158156
// collector is used to collect data from mapper, and consume in reducer
159-
collector := make(chan interface{}, options.workers)
157+
collector := make(chan U, options.workers)
160158
// if done is closed, all mappers and reducer should stop processing
161159
done := make(chan struct{})
162160
writer := newGuardedWriter(options.ctx, output, done)
@@ -192,9 +190,9 @@ func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapp
192190
reducer(collector, writer, cancel)
193191
}()
194192

195-
go executeMappers(mapperContext{
193+
go executeMappers(mapperContext[T, U]{
196194
ctx: options.ctx,
197-
mapper: func(item interface{}, w Writer) {
195+
mapper: func(item T, w Writer[U]) {
198196
mapper(item, w, cancel)
199197
},
200198
source: source,
@@ -207,24 +205,27 @@ func mapReduceWithPanicChan(source <-chan interface{}, panicChan *onceChan, mapp
207205
select {
208206
case <-options.ctx.Done():
209207
cancel(context.DeadlineExceeded)
210-
return nil, context.DeadlineExceeded
208+
err = context.DeadlineExceeded
211209
case v := <-panicChan.channel:
212210
panic(v)
213211
case v, ok := <-output:
214-
if err := retErr.Load(); err != nil {
215-
return nil, err.(error)
212+
if e := retErr.Load(); e != nil {
213+
err = e.(error)
216214
} else if ok {
217-
return v, nil
215+
val = v
218216
} else {
219-
return nil, ErrReduceNoOutput
217+
err = ErrReduceNoOutput
220218
}
221219
}
220+
221+
return
222222
}
223223

224224
// MapReduceVoid maps all elements generated from given generate,
225225
// and reduce the output elements with given reducer.
226-
func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {
227-
_, err := MapReduce(generate, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {
226+
func MapReduceVoid[T, U any](generate GenerateFunc[T], mapper MapperFunc[T, U],
227+
reducer VoidReducerFunc[U], opts ...Option) error {
228+
_, err := MapReduce(generate, mapper, func(input <-chan U, writer Writer[any], cancel func(error)) {
228229
reducer(input, cancel)
229230
}, opts...)
230231
if errors.Is(err, ErrReduceNoOutput) {
@@ -261,8 +262,8 @@ func buildOptions(opts ...Option) *mapReduceOptions {
261262
return options
262263
}
263264

264-
func buildSource(generate GenerateFunc, panicChan *onceChan) chan interface{} {
265-
source := make(chan interface{})
265+
func buildSource[T any](generate GenerateFunc[T], panicChan *onceChan) chan T {
266+
source := make(chan T)
266267
go func() {
267268
defer func() {
268269
if r := recover(); r != nil {
@@ -278,13 +279,13 @@ func buildSource(generate GenerateFunc, panicChan *onceChan) chan interface{} {
278279
}
279280

280281
// drain drains the channel.
281-
func drain(channel <-chan interface{}) {
282+
func drain[T any](channel <-chan T) {
282283
// drain the channel
283284
for range channel {
284285
}
285286
}
286287

287-
func executeMappers(mCtx mapperContext) {
288+
func executeMappers[T, U any](mCtx mapperContext[T, U]) {
288289
var wg sync.WaitGroup
289290
defer func() {
290291
wg.Wait()
@@ -341,22 +342,21 @@ func once(fn func(error)) func(error) {
341342
}
342343
}
343344

344-
type guardedWriter struct {
345+
type guardedWriter[T any] struct {
345346
ctx context.Context
346-
channel chan<- interface{}
347+
channel chan<- T
347348
done <-chan struct{}
348349
}
349350

350-
func newGuardedWriter(ctx context.Context, channel chan<- interface{},
351-
done <-chan struct{}) guardedWriter {
352-
return guardedWriter{
351+
func newGuardedWriter[T any](ctx context.Context, channel chan<- T, done <-chan struct{}) guardedWriter[T] {
352+
return guardedWriter[T]{
353353
ctx: ctx,
354354
channel: channel,
355355
done: done,
356356
}
357357
}
358358

359-
func (gw guardedWriter) Write(v interface{}) {
359+
func (gw guardedWriter[T]) Write(v T) {
360360
select {
361361
case <-gw.ctx.Done():
362362
return
@@ -368,11 +368,11 @@ func (gw guardedWriter) Write(v interface{}) {
368368
}
369369

370370
type onceChan struct {
371-
channel chan interface{}
371+
channel chan any
372372
wrote int32
373373
}
374374

375-
func (oc *onceChan) write(val interface{}) {
375+
func (oc *onceChan) write(val any) {
376376
if atomic.AddInt32(&oc.wrote, 1) > 1 {
377377
return
378378
}

mapreduce_fuzz_test.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -29,30 +29,29 @@ func FuzzMapReduce(f *testing.F) {
2929
reducerIdx := rand.Int63n(n)
3030
squareSum := (n - 1) * n * (2*n - 1) / 6
3131

32-
fn := func() (interface{}, error) {
32+
fn := func() (int64, error) {
3333
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
3434

35-
return MapReduce(func(source chan<- interface{}) {
35+
return MapReduce(func(source chan<- int64) {
3636
for i := int64(0); i < n; i++ {
3737
source <- i
3838
if genPanic && i == genIdx {
3939
panic("foo")
4040
}
4141
}
42-
}, func(item interface{}, writer Writer, cancel func(error)) {
43-
v := item.(int64)
42+
}, func(v int64, writer Writer[int64], cancel func(error)) {
4443
if mapperPanic && v == mapperIdx {
4544
panic("bar")
4645
}
4746
writer.Write(v * v)
48-
}, func(pipe <-chan interface{}, writer Writer, cancel func(error)) {
47+
}, func(pipe <-chan int64, writer Writer[int64], cancel func(error)) {
4948
var idx int64
5049
var total int64
5150
for v := range pipe {
5251
if reducerPanic && idx == reducerIdx {
5352
panic("baz")
5453
}
55-
total += v.(int64)
54+
total += v
5655
idx++
5756
}
5857
writer.Write(total)
@@ -72,7 +71,7 @@ func FuzzMapReduce(f *testing.F) {
7271
} else {
7372
val, err := fn()
7473
assert.Nil(t, err)
75-
assert.Equal(t, squareSum, val.(int64))
74+
assert.Equal(t, squareSum, val)
7675
}
7776
})
7877
}

0 commit comments

Comments
 (0)