Skip to content

Commit 7161bcd

Browse files
committed
fix bug
1 parent 08ff2ac commit 7161bcd

File tree

1 file changed

+6
-6
lines changed

1 file changed

+6
-6
lines changed

mapreduce.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func Finish(fns ...func() error) error {
5656
return nil
5757
}
5858

59-
return MapReduceVoid[func() error](func(source chan<- func() error) {
59+
return MapReduceVoid(func(source chan<- func() error) {
6060
for _, fn := range fns {
6161
source <- fn
6262
}
@@ -101,8 +101,8 @@ func Map[T, U any](generate GenerateFunc[T], mapper MapFunc[T, U], opts ...Optio
101101
// and reduces the output elements with given reducer.
102102
func MapReduce[T, U, V any](generate GenerateFunc[T], mapper MapperFunc[T, U], reducer ReducerFunc[U, V],
103103
opts ...Option) (V, error) {
104-
source := buildSource[T](generate)
105-
return MapReduceWithSource[T, U, V](source, mapper, reducer, opts...)
104+
source := buildSource(generate)
105+
return MapReduceWithSource(source, mapper, reducer, opts...)
106106
}
107107

108108
// MapReduceWithSource maps all elements from source, and reduce the output elements with given reducer.
@@ -141,7 +141,7 @@ func MapReduceWithSource[T, U, V any](source <-chan T, mapper MapperFunc[T, U],
141141

142142
go func() {
143143
defer func() {
144-
drain[U](collector)
144+
drain(collector)
145145

146146
if r := recover(); r != nil {
147147
cancel(fmt.Errorf("%v", r))
@@ -159,7 +159,7 @@ func MapReduceWithSource[T, U, V any](source <-chan T, mapper MapperFunc[T, U],
159159
}, source, collector, done, options.workers)
160160

161161
value, ok := <-output
162-
if e := retErr.Load(); err != nil {
162+
if e := retErr.Load(); e != nil {
163163
err = e.(error)
164164
} else if ok {
165165
val = value
@@ -174,7 +174,7 @@ func MapReduceWithSource[T, U, V any](source <-chan T, mapper MapperFunc[T, U],
174174
// and reduce the output elements with given reducer.
175175
func MapReduceVoid[T, U any](generate GenerateFunc[T], mapper MapperFunc[T, U],
176176
reducer VoidReducerFunc[U], opts ...Option) error {
177-
_, err := MapReduce[T, U, interface{}](generate, mapper, func(input <-chan U,
177+
_, err := MapReduce(generate, mapper, func(input <-chan U,
178178
writer Writer[interface{}], cancel func(error)) {
179179
reducer(input, cancel)
180180
// We need to write a placeholder to let MapReduce to continue on reducer done,

0 commit comments

Comments
 (0)