@@ -3,7 +3,6 @@ package mapreduce
3
3
import (
4
4
"context"
5
5
"errors"
6
- "fmt"
7
6
"sync"
8
7
"sync/atomic"
9
8
)
@@ -21,12 +20,12 @@ var (
21
20
)
22
21
23
22
type (
23
+ // ForEachFunc is used to do element processing, but no output.
24
+ ForEachFunc func (item interface {})
24
25
// GenerateFunc is used to let callers send elements into source.
25
26
GenerateFunc func (source chan <- interface {})
26
27
// MapFunc is used to do element processing and write the output to writer.
27
28
MapFunc func (item interface {}, writer Writer )
28
- // VoidMapFunc is used to do element processing, but no output.
29
- VoidMapFunc func (item interface {})
30
29
// MapperFunc is used to do element processing and write the output to writer,
31
30
// use cancel func to cancel the processing.
32
31
MapperFunc func (item interface {}, writer Writer , cancel func (error ))
@@ -39,6 +38,16 @@ type (
39
38
// Option defines the method to customize the mapreduce.
40
39
Option func (opts * mapReduceOptions )
41
40
41
+ mapperContext struct {
42
+ ctx context.Context
43
+ mapper MapFunc
44
+ source <- chan interface {}
45
+ panicChan * onceChan
46
+ collector chan <- interface {}
47
+ doneChan <- chan struct {}
48
+ workers int
49
+ }
50
+
42
51
mapReduceOptions struct {
43
52
ctx context.Context
44
53
workers int
@@ -76,7 +85,7 @@ func FinishVoid(fns ...func()) {
76
85
return
77
86
}
78
87
79
- MapVoid (func (source chan <- interface {}) {
88
+ ForEach (func (source chan <- interface {}) {
80
89
for _ , fn := range fns {
81
90
source <- fn
82
91
}
@@ -86,38 +95,72 @@ func FinishVoid(fns ...func()) {
86
95
}, WithWorkers (len (fns )))
87
96
}
88
97
89
- // Map maps all elements generated from given generate func, and returns an output channel .
90
- func Map (generate GenerateFunc , mapper MapFunc , opts ... Option ) chan interface {} {
98
+ // ForEach maps all elements from given generate but no output.
99
+ func ForEach (generate GenerateFunc , mapper ForEachFunc , opts ... Option ) {
91
100
options := buildOptions (opts ... )
92
- source := buildSource (generate )
101
+ panicChan := make (chan interface {})
102
+ source := buildSource (generate , panicChan )
93
103
collector := make (chan interface {}, options .workers )
94
104
done := make (chan struct {})
95
105
96
- go executeMappers (options .ctx , mapper , source , collector , done , options .workers )
106
+ go executeMappers (mapperContext {
107
+ ctx : options .ctx ,
108
+ mapper : func (item interface {}, writer Writer ) {
109
+ mapper (item )
110
+ },
111
+ source : source ,
112
+ panicChan : & onceChan {
113
+ channel : panicChan ,
114
+ },
115
+ collector : collector ,
116
+ doneChan : done ,
117
+ workers : options .workers ,
118
+ })
97
119
98
- return collector
120
+ for {
121
+ select {
122
+ case v := <- panicChan :
123
+ panic (v )
124
+ case _ , ok := <- collector :
125
+ if ! ok {
126
+ return
127
+ }
128
+ }
129
+ }
99
130
}
100
131
101
132
// MapReduce maps all elements generated from given generate func,
102
133
// and reduces the output elements with given reducer.
103
134
func MapReduce (generate GenerateFunc , mapper MapperFunc , reducer ReducerFunc ,
104
135
opts ... Option ) (interface {}, error ) {
105
- source := buildSource (generate )
106
- return MapReduceWithSource (source , mapper , reducer , opts ... )
136
+ panicChan := make (chan interface {})
137
+ source := buildSource (generate , panicChan )
138
+ return mapReduceWithPanicChan (source , panicChan , mapper , reducer , opts ... )
107
139
}
108
140
109
- // MapReduceWithSource maps all elements from source, and reduce the output elements with given reducer.
110
- func MapReduceWithSource (source <- chan interface {}, mapper MapperFunc , reducer ReducerFunc ,
141
+ // MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
142
+ func MapReduceChan (source <- chan interface {}, mapper MapperFunc , reducer ReducerFunc ,
111
143
opts ... Option ) (interface {}, error ) {
144
+ panicChan := make (chan interface {})
145
+ return mapReduceWithPanicChan (source , panicChan , mapper , reducer , opts ... )
146
+ }
147
+
148
+ // MapReduceChan maps all elements from source, and reduce the output elements with given reducer.
149
+ func mapReduceWithPanicChan (source <- chan interface {}, panicChan chan interface {}, mapper MapperFunc ,
150
+ reducer ReducerFunc , opts ... Option ) (interface {}, error ) {
112
151
options := buildOptions (opts ... )
152
+ // output is used to write the final result
113
153
output := make (chan interface {})
114
154
defer func () {
155
+ // reducer can only write once, if more, panic
115
156
for range output {
116
157
panic ("more than one element written in reducer" )
117
158
}
118
159
}()
119
160
161
+ // collector is used to collect data from mapper, and consume in reducer
120
162
collector := make (chan interface {}, options .workers )
163
+ // if done is closed, all mappers and reducer should stop processing
121
164
done := make (chan struct {})
122
165
writer := newGuardedWriter (options .ctx , output , done )
123
166
var closeOnce sync.Once
@@ -142,30 +185,47 @@ func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer R
142
185
143
186
go func () {
144
187
defer func () {
145
- drain (collector )
146
-
147
188
if r := recover (); r != nil {
148
- cancel ( fmt . Errorf ( "%v" , r ))
189
+ panicChan <- r
149
190
} else {
150
191
finish ()
151
192
}
193
+
194
+ drain (collector )
152
195
}()
153
196
154
- // callers need to make sure reducer not panic
155
197
reducer (collector , writer , cancel )
156
198
}()
157
199
158
- go executeMappers (options .ctx , func (item interface {}, w Writer ) {
159
- mapper (item , w , cancel )
160
- }, source , collector , done , options .workers )
161
-
162
- value , ok := <- output
163
- if err := retErr .Load (); err != nil {
164
- return nil , err .(error )
165
- } else if ok {
166
- return value , nil
167
- } else {
168
- return nil , ErrReduceNoOutput
200
+ go executeMappers (mapperContext {
201
+ ctx : options .ctx ,
202
+ mapper : func (item interface {}, w Writer ) {
203
+ mapper (item , w , cancel )
204
+ },
205
+ source : source ,
206
+ panicChan : & onceChan {
207
+ channel : panicChan ,
208
+ },
209
+ collector : collector ,
210
+ doneChan : done ,
211
+ workers : options .workers ,
212
+ })
213
+
214
+ select {
215
+ case <- options .ctx .Done ():
216
+ cancel (context .DeadlineExceeded )
217
+ return nil , context .DeadlineExceeded
218
+ case v := <- panicChan :
219
+ finish ()
220
+ panic (v )
221
+ case v , ok := <- output :
222
+ if err := retErr .Load (); err != nil {
223
+ return nil , err .(error )
224
+ } else if ok {
225
+ return v , nil
226
+ } else {
227
+ return nil , ErrReduceNoOutput
228
+ }
169
229
}
170
230
}
171
231
@@ -175,20 +235,12 @@ func MapReduceVoid(generate GenerateFunc, mapper MapperFunc, reducer VoidReducer
175
235
_ , err := MapReduce (generate , mapper , func (input <- chan interface {}, writer Writer , cancel func (error )) {
176
236
reducer (input , cancel )
177
237
// We need to write a placeholder to let MapReduce to continue on reducer done,
178
- // otherwise, all goroutines are waiting.
179
- // The placeholder will be discarded by MapReduce.
238
+ // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.
180
239
writer .Write (struct {}{})
181
240
}, opts ... )
182
241
return err
183
242
}
184
243
185
- // MapVoid maps all elements from given generate but no output.
186
- func MapVoid (generate GenerateFunc , mapper VoidMapFunc , opts ... Option ) {
187
- drain (Map (generate , func (item interface {}, writer Writer ) {
188
- mapper (item )
189
- }, opts ... ))
190
- }
191
-
192
244
// WithContext customizes a mapreduce processing accepts a given ctx.
193
245
func WithContext (ctx context.Context ) Option {
194
246
return func (opts * mapReduceOptions ) {
@@ -216,10 +268,16 @@ func buildOptions(opts ...Option) *mapReduceOptions {
216
268
return options
217
269
}
218
270
219
- func buildSource (generate GenerateFunc ) chan interface {} {
271
+ func buildSource (generate GenerateFunc , panicChan chan interface {} ) chan interface {} {
220
272
source := make (chan interface {})
221
273
go func () {
222
- defer close (source )
274
+ defer func () {
275
+ if r := recover (); r != nil {
276
+ panicChan <- r
277
+ }
278
+ close (source )
279
+ }()
280
+
223
281
generate (source )
224
282
}()
225
283
@@ -233,24 +291,31 @@ func drain(channel <-chan interface{}) {
233
291
}
234
292
}
235
293
236
- func executeMappers (ctx context.Context , mapper MapFunc , input <- chan interface {},
237
- collector chan <- interface {}, done <- chan struct {}, workers int ) {
294
+ func executeMappers (mCtx mapperContext ) {
238
295
var wg sync.WaitGroup
239
296
defer func () {
240
297
wg .Wait ()
241
- close (collector )
298
+ close (mCtx .collector )
299
+ drain (mCtx .source )
242
300
}()
243
301
244
- pool := make (chan struct {}, workers )
245
- writer := newGuardedWriter (ctx , collector , done )
302
+ pool := make (chan struct {}, mCtx .workers )
303
+ innerPanicChan := make (chan interface {})
304
+ onceInnerChan := & onceChan {
305
+ channel : innerPanicChan ,
306
+ }
307
+ writer := newGuardedWriter (mCtx .ctx , mCtx .collector , mCtx .doneChan )
246
308
for {
247
309
select {
248
- case <- ctx .Done ():
310
+ case <- mCtx . ctx .Done ():
249
311
return
250
- case <- done :
312
+ case <- mCtx .doneChan :
313
+ return
314
+ case v := <- innerPanicChan :
315
+ mCtx .panicChan .write (v )
251
316
return
252
317
case pool <- struct {}{}:
253
- item , ok := <- input
318
+ item , ok := <- mCtx . source
254
319
if ! ok {
255
320
<- pool
256
321
return
@@ -259,12 +324,14 @@ func executeMappers(ctx context.Context, mapper MapFunc, input <-chan interface{
259
324
wg .Add (1 )
260
325
go func () {
261
326
defer func () {
327
+ if r := recover (); r != nil {
328
+ onceInnerChan .write (r )
329
+ }
262
330
wg .Done ()
263
331
<- pool
264
332
}()
265
333
266
- // callers need to make sure mapper won't panic
267
- mapper (item , writer )
334
+ mCtx .mapper (item , writer )
268
335
}()
269
336
}
270
337
}
@@ -311,3 +378,16 @@ func (gw guardedWriter) Write(v interface{}) {
311
378
gw .channel <- v
312
379
}
313
380
}
381
+
382
+ type onceChan struct {
383
+ channel chan <- interface {}
384
+ wrote int32
385
+ }
386
+
387
+ func (oc * onceChan ) write (val interface {}) {
388
+ if atomic .AddInt32 (& oc .wrote , 1 ) > 1 {
389
+ return
390
+ }
391
+
392
+ oc .channel <- val
393
+ }
0 commit comments