|
| 1 | +//go:build go1.18 |
| 2 | +// +build go1.18 |
| 3 | + |
| 4 | +package mapreduce |
| 5 | + |
| 6 | +import ( |
| 7 | + "fmt" |
| 8 | + "math/rand" |
| 9 | + "runtime" |
| 10 | + "strings" |
| 11 | + "testing" |
| 12 | + "time" |
| 13 | + |
| 14 | + "github.com/stretchr/testify/assert" |
| 15 | + "go.uber.org/goleak" |
| 16 | +) |
| 17 | + |
| 18 | +func FuzzMapReduce(f *testing.F) { |
| 19 | + rand.Seed(time.Now().UnixNano()) |
| 20 | + |
| 21 | + f.Add(int64(10), runtime.NumCPU()) |
| 22 | + f.Fuzz(func(t *testing.T, n int64, workers int) { |
| 23 | + n = n%5000 + 5000 |
| 24 | + genPanic := rand.Intn(100) == 0 |
| 25 | + mapperPanic := rand.Intn(100) == 0 |
| 26 | + reducerPanic := rand.Intn(100) == 0 |
| 27 | + genIdx := rand.Int63n(n) |
| 28 | + mapperIdx := rand.Int63n(n) |
| 29 | + reducerIdx := rand.Int63n(n) |
| 30 | + squareSum := (n - 1) * n * (2*n - 1) / 6 |
| 31 | + |
| 32 | + fn := func() (interface{}, error) { |
| 33 | + defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) |
| 34 | + |
| 35 | + return MapReduce(func(source chan<- interface{}) { |
| 36 | + for i := int64(0); i < n; i++ { |
| 37 | + source <- i |
| 38 | + if genPanic && i == genIdx { |
| 39 | + panic("foo") |
| 40 | + } |
| 41 | + } |
| 42 | + }, func(item interface{}, writer Writer, cancel func(error)) { |
| 43 | + v := item.(int64) |
| 44 | + if mapperPanic && v == mapperIdx { |
| 45 | + panic("bar") |
| 46 | + } |
| 47 | + writer.Write(v * v) |
| 48 | + }, func(pipe <-chan interface{}, writer Writer, cancel func(error)) { |
| 49 | + var idx int64 |
| 50 | + var total int64 |
| 51 | + for v := range pipe { |
| 52 | + if reducerPanic && idx == reducerIdx { |
| 53 | + panic("baz") |
| 54 | + } |
| 55 | + total += v.(int64) |
| 56 | + idx++ |
| 57 | + } |
| 58 | + writer.Write(total) |
| 59 | + }, WithWorkers(workers%50+runtime.NumCPU())) |
| 60 | + } |
| 61 | + |
| 62 | + if genPanic || mapperPanic || reducerPanic { |
| 63 | + var buf strings.Builder |
| 64 | + buf.WriteString(fmt.Sprintf("n: %d", n)) |
| 65 | + buf.WriteString(fmt.Sprintf(", genPanic: %t", genPanic)) |
| 66 | + buf.WriteString(fmt.Sprintf(", mapperPanic: %t", mapperPanic)) |
| 67 | + buf.WriteString(fmt.Sprintf(", reducerPanic: %t", reducerPanic)) |
| 68 | + buf.WriteString(fmt.Sprintf(", genIdx: %d", genIdx)) |
| 69 | + buf.WriteString(fmt.Sprintf(", mapperIdx: %d", mapperIdx)) |
| 70 | + buf.WriteString(fmt.Sprintf(", reducerIdx: %d", reducerIdx)) |
| 71 | + assert.Panicsf(t, func() { fn() }, buf.String()) |
| 72 | + } else { |
| 73 | + val, err := fn() |
| 74 | + assert.Nil(t, err) |
| 75 | + assert.Equal(t, squareSum, val.(int64)) |
| 76 | + } |
| 77 | + }) |
| 78 | +} |
0 commit comments