Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/flat map #5

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion readme-cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ Head(n int64) Stream
Tail(n int64) Stream
// 转换对象
Map(fn MapFunc, opts ...Option) Stream
// 将每个item分别转换为Stream,并将其合并为一个Stream
FlatMap(fn FlatMapFunc, opts ...Option) Stream
// 合并item到slice生成新的stream
Merge() Stream
// 反转
Expand Down Expand Up @@ -137,7 +139,9 @@ KeyFunc func(item interface{}) interface{}
// 过滤函数
FilterFunc func(item interface{}) bool
// 对象转换函数
MapFunc func(intem interface{}) interface{}
MapFunc func(item interface{}) interface{}
// 将每个item转换为Stream
FlatMapFunc func(item interface{}) Stream
// 对象比较
LessFunc func(a, b interface{}) bool
// 遍历函数
Expand Down Expand Up @@ -172,6 +176,8 @@ Stream interface {
Last() interface{}
// 转换对象
Map(fn MapFunc, opts ...Option) Stream
// 将每个item分别转换为Stream,并将其合并为一个Stream
FlatMap(fn FlatMapFunc, opts ...Option) Stream
// 合并item到slice生成新的stream
Merge() Stream
// 反转
Expand Down Expand Up @@ -711,6 +717,35 @@ func TestInternalStream_Map(t *testing.T) {
}
```

#### 元素转换FlatMap

元素转换,内部由协程完成转换操作,注意输出channel并不保证按原序输出。

```go
FlatMap func(fn FlatMapFunc, opts ...Option) Stream {
return s.Walk(func(item interface{}, pipe chan<- interface{}) {
otherStream := fn(item)
for other := range otherStream.source {
pipe <- other
}
}, opts...)
}
```

使用示例:

```go
func TestInternalStream_FlatMap(t *testing.T) {
channel := Just(1, 2, 3, 4).FlatMap(func(item interface{}) Stream {
// 将每个item映射成2个同样的item
return Just(item, item)
}).channel()
for item := range channel {
t.Log(item)
}
}
```

#### 合并 Merge

实现比较简单
Expand Down
35 changes: 34 additions & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ KeyFunc func(item interface{}) interface{}
// filter function
FilterFunc func(item interface{}) bool
// object conversion function
MapFunc func(intem interface{}) interface{}
MapFunc func(item interface{}) interface{}
// convert each item to Stream separately and combine them into one Stream
FlatMapFunc func(item interface{}) Stream
// object comparison
LessFunc func(a, b interface{}) bool
// traversal function
Expand Down Expand Up @@ -174,6 +176,8 @@ Stream interface {
Last() interface{}
// Convert the object
Map(fn MapFunc, opts . . Option) Stream
// FlatMap returns a Stream that flattens the result of the given FlatMapFunc, which means it's a 1:N model.
FlatMap(fn FlatMapFunc, opts . . Option) Stream
// Merge items into slice to create a new stream
Merge() Stream
// Reverse
Expand Down Expand Up @@ -713,6 +717,35 @@ func TestInternalStream_Map(t *testing.T) {
}
```

#### element conversion FlatMap

Element conversion, internally done by a concurrent process to complete the conversion operation, note that the output channel is not guaranteed to be output in the original order.

```go
FlatMap func(fn FlatMapFunc, opts ...Option) Stream {
return s.Walk(func(item interface{}, pipe chan<- interface{}) {
otherStream := fn(item)
for other := range otherStream.source {
pipe <- other
}
}, opts...)
}
```

Example usage.

```go
func TestInternalStream_FlatMap(t *testing.T) {
channel := Just(1, 2, 3, 4).FlatMap(func(item interface{}) Stream {
// Map each item to 2 identical items
return Just(item, item)
}).channel()
for item := range channel {
t.Log(item)
}
}
```

#### Merge Merge

The implementation is relatively simple, and I've thought long and hard about what scenarios would be suitable for this method.
Expand Down
12 changes: 12 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type (
LessFunc func(a, b interface{}) bool
// MapFunc defines the method to map each element to another object in a Stream.
MapFunc func(item interface{}) interface{}
// FlatMapFunc defines the method to map each element to another Stream in a Stream.
FlatMapFunc func(item interface{}) Stream
// Option defines the method to customize a Stream.
Option func(opts *rxOptions)
// ParallelFunc defines the method to handle elements parallelly.
Expand Down Expand Up @@ -288,6 +290,16 @@ func (s Stream) Map(fn MapFunc, opts ...Option) Stream {
}, opts...)
}

// FlatMap returns a Stream that flattens the result of the given FlatMapFunc, which means it's a 1:N model.
func (s Stream) FlatMap(fn FlatMapFunc, opts ...Option) Stream {
return s.Walk(func(item interface{}, pipe chan<- interface{}) {
otherStream := fn(item)
for other := range otherStream.source {
pipe <- other
}
}, opts...)
}

// Merge merges all the items into a slice and generates a new stream.
func (s Stream) Merge() Stream {
var items []interface{}
Expand Down
55 changes: 55 additions & 0 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,61 @@ func TestMap(t *testing.T) {
})
}

func TestFlatMap(t *testing.T) {
runCheckedTest(t, func(t *testing.T) {
log.SetOutput(ioutil.Discard)

tests := []struct {
name string
mapper FlatMapFunc
expect int
expectNum int
}{
{
name: "flat map with square",
mapper: func(item interface{}) Stream {
return Just(item).Map(func(item interface{}) interface{} {
v := item.(int)
return v * v
})
},
expect: 30,
expectNum: 4,
},
{
name: "flat map to double items",
mapper: func(item interface{}) Stream {
return Just(item, item).Map(func(item interface{}) interface{} {
v := item.(int)
return v * v
})
},
expect: 60,
expectNum: 8,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var result int
var num int
From(func(source chan<- interface{}) {
for i := 1; i < 5; i++ {
source <- i
}
}).FlatMap(test.mapper).Reduce(func(pipe <-chan interface{}) (interface{}, error) {
for item := range pipe {
result += item.(int)
num++
}
return result, nil
})
assert.Equal(t, test.expect, result)
assert.Equal(t, test.expectNum, num)
})
}
})
}

func TestMerge(t *testing.T) {
runCheckedTest(t, func(t *testing.T) {
Just(1, 2, 3, 4).Merge().ForEach(func(item interface{}) {
Expand Down