From 37eaf9eead86802c19be86d6924f33c69c737ff7 Mon Sep 17 00:00:00 2001 From: Carter Date: Sat, 7 May 2022 12:37:59 +0800 Subject: [PATCH 1/2] add FlatMap method --- stream.go | 12 +++++++++++ stream_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/stream.go b/stream.go index 9018753..6bcc465 100644 --- a/stream.go +++ b/stream.go @@ -30,6 +30,8 @@ type ( LessFunc func(a, b interface{}) bool // MapFunc defines the method to map each element to another object in a Stream. MapFunc func(item interface{}) interface{} + // FlatMapFunc defines the method to map each element to another Stream in a Stream. + FlatMapFunc func(item interface{}) Stream // Option defines the method to customize a Stream. Option func(opts *rxOptions) // ParallelFunc defines the method to handle elements parallelly. @@ -288,6 +290,16 @@ func (s Stream) Map(fn MapFunc, opts ...Option) Stream { }, opts...) } +// FlatMap returns a Stream that flattens the result of the given FlatMapFunc. +func (s Stream) FlatMap(fn FlatMapFunc, opts ...Option) Stream { + return s.Walk(func(item interface{}, pipe chan<- interface{}) { + otherStream := fn(item) + for other := range otherStream.source { + pipe <- other + } + }, opts...) +} + // Merge merges all the items into a slice and generates a new stream. func (s Stream) Merge() Stream { var items []interface{} diff --git a/stream_test.go b/stream_test.go index 91abf34..88cd6b1 100644 --- a/stream_test.go +++ b/stream_test.go @@ -292,6 +292,61 @@ func TestMap(t *testing.T) { }) } +func TestFlatMap(t *testing.T) { + runCheckedTest(t, func(t *testing.T) { + log.SetOutput(ioutil.Discard) + + tests := []struct { + name string + mapper FlatMapFunc + expect int + expectNum int + }{ + { + name: "flat map with square", + mapper: func(item interface{}) Stream { + return Just(item).Map(func(item interface{}) interface{} { + v := item.(int) + return v * v + }) + }, + expect: 30, + expectNum: 4, + }, + { + name: "flat map to double items", + mapper: func(item interface{}) Stream { + return Just(item, item).Map(func(item interface{}) interface{} { + v := item.(int) + return v * v + }) + }, + expect: 60, + expectNum: 8, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var result int + var num int + From(func(source chan<- interface{}) { + for i := 1; i < 5; i++ { + source <- i + } + }).FlatMap(test.mapper).Reduce(func(pipe <-chan interface{}) (interface{}, error) { + for item := range pipe { + result += item.(int) + num++ + } + return result, nil + }) + assert.Equal(t, test.expect, result) + assert.Equal(t, test.expectNum, num) + }) + } + }) +} + func TestMerge(t *testing.T) { runCheckedTest(t, func(t *testing.T) { Just(1, 2, 3, 4).Merge().ForEach(func(item interface{}) { From 21ee86986ad8b7fdd78827de98932682b563f035 Mon Sep 17 00:00:00 2001 From: Carter Date: Sat, 7 May 2022 12:56:26 +0800 Subject: [PATCH 2/2] add FlatMap document --- readme-cn.md | 37 ++++++++++++++++++++++++++++++++++++- readme.md | 35 ++++++++++++++++++++++++++++++++++- stream.go | 2 +- 3 files changed, 71 insertions(+), 3 deletions(-) diff --git a/readme-cn.md b/readme-cn.md index 0e722f3..801a889 100644 --- a/readme-cn.md +++ b/readme-cn.md @@ -82,6 +82,8 @@ Head(n int64) Stream Tail(n int64) Stream // 转换对象 Map(fn MapFunc, opts ...Option) Stream +// 将每个item分别转换为Stream,并将其合并为一个Stream +FlatMap(fn FlatMapFunc, opts ...Option) Stream // 合并item到slice生成新的stream Merge() Stream // 反转 @@ -137,7 +139,9 @@ KeyFunc func(item interface{}) interface{} // 过滤函数 FilterFunc func(item interface{}) bool // 对象转换函数 -MapFunc func(intem interface{}) interface{} +MapFunc func(item interface{}) interface{} +// 将每个item转换为Stream +FlatMapFunc func(item interface{}) Stream // 对象比较 LessFunc func(a, b interface{}) bool // 遍历函数 @@ -172,6 +176,8 @@ Stream interface { Last() interface{} // 转换对象 Map(fn MapFunc, opts ...Option) Stream + // 将每个item分别转换为Stream,并将其合并为一个Stream + FlatMap(fn FlatMapFunc, opts ...Option) Stream // 合并item到slice生成新的stream Merge() Stream // 反转 @@ -711,6 +717,35 @@ func TestInternalStream_Map(t *testing.T) { } ``` +#### 元素转换FlatMap + +元素转换,内部由协程完成转换操作,注意输出channel并不保证按原序输出。 + +```go +FlatMap func(fn FlatMapFunc, opts ...Option) Stream { + return s.Walk(func(item interface{}, pipe chan<- interface{}) { + otherStream := fn(item) + for other := range otherStream.source { + pipe <- other + } + }, opts...) +} +``` + +使用示例: + +```go +func TestInternalStream_FlatMap(t *testing.T) { + channel := Just(1, 2, 3, 4).FlatMap(func(item interface{}) Stream { + // 将每个item映射成2个同样的item + return Just(item, item) + }).channel() + for item := range channel { + t.Log(item) + } +} +``` + #### 合并 Merge 实现比较简单 diff --git a/readme.md b/readme.md index 3378409..11eedf4 100644 --- a/readme.md +++ b/readme.md @@ -139,7 +139,9 @@ KeyFunc func(item interface{}) interface{} // filter function FilterFunc func(item interface{}) bool // object conversion function -MapFunc func(intem interface{}) interface{} +MapFunc func(item interface{}) interface{} +// convert each item to Stream separately and combine them into one Stream +FlatMapFunc func(item interface{}) Stream // object comparison LessFunc func(a, b interface{}) bool // traversal function @@ -174,6 +176,8 @@ Stream interface { Last() interface{} // Convert the object Map(fn MapFunc, opts . . Option) Stream + // FlatMap returns a Stream that flattens the result of the given FlatMapFunc, which means it's a 1:N model. + FlatMap(fn FlatMapFunc, opts . . Option) Stream // Merge items into slice to create a new stream Merge() Stream // Reverse @@ -713,6 +717,35 @@ func TestInternalStream_Map(t *testing.T) { } ``` +#### element conversion FlatMap + +Element conversion, internally done by a concurrent process to complete the conversion operation, note that the output channel is not guaranteed to be output in the original order. + +```go +FlatMap func(fn FlatMapFunc, opts ...Option) Stream { + return s.Walk(func(item interface{}, pipe chan<- interface{}) { + otherStream := fn(item) + for other := range otherStream.source { + pipe <- other + } + }, opts...) +} +``` + +Example usage. + +```go +func TestInternalStream_FlatMap(t *testing.T) { + channel := Just(1, 2, 3, 4).FlatMap(func(item interface{}) Stream { + // Map each item to 2 identical items + return Just(item, item) + }).channel() + for item := range channel { + t.Log(item) + } +} +``` + #### Merge Merge The implementation is relatively simple, and I've thought long and hard about what scenarios would be suitable for this method. diff --git a/stream.go b/stream.go index 6bcc465..e7755f0 100644 --- a/stream.go +++ b/stream.go @@ -290,7 +290,7 @@ func (s Stream) Map(fn MapFunc, opts ...Option) Stream { }, opts...) } -// FlatMap returns a Stream that flattens the result of the given FlatMapFunc. +// FlatMap returns a Stream that flattens the result of the given FlatMapFunc, which means it's a 1:N model. func (s Stream) FlatMap(fn FlatMapFunc, opts ...Option) Stream { return s.Walk(func(item interface{}, pipe chan<- interface{}) { otherStream := fn(item)