From e9699a8c0cfa946676f7ce0151f0c05ae3fe661a Mon Sep 17 00:00:00 2001 From: kevin Date: Thu, 6 Jan 2022 17:32:32 +0800 Subject: [PATCH 1/2] feat: version with generics --- go.mod | 2 +- ring.go | 16 +++--- ring_test.go | 10 ++-- stream.go | 134 +++++++++++++++++++++++++-------------------------- 4 files changed, 81 insertions(+), 81 deletions(-) diff --git a/go.mod b/go.mod index f8ff3b7..55ba13a 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/kevwan/stream -go 1.17 +go 1.18 require github.com/stretchr/testify v1.7.0 diff --git a/ring.go b/ring.go index 820dccf..2b19c31 100644 --- a/ring.go +++ b/ring.go @@ -3,25 +3,25 @@ package stream import "sync" // A Ring can be used as fixed size ring. -type Ring struct { - elements []interface{} +type Ring[T any] struct { + elements []T index int lock sync.Mutex } // NewRing returns a Ring object with the given size n. -func NewRing(n int) *Ring { +func NewRing[T any](n int) *Ring[T] { if n < 1 { panic("n should be greater than 0") } - return &Ring{ - elements: make([]interface{}, n), + return &Ring[T]{ + elements: make([]T, n), } } // Add adds v into r. -func (r *Ring) Add(v interface{}) { +func (r *Ring[T]) Add(v T) { r.lock.Lock() defer r.lock.Unlock() @@ -30,7 +30,7 @@ func (r *Ring) Add(v interface{}) { } // Take takes all items from r. -func (r *Ring) Take() []interface{} { +func (r *Ring[T]) Take() []T { r.lock.Lock() defer r.lock.Unlock() @@ -43,7 +43,7 @@ func (r *Ring) Take() []interface{} { size = r.index } - elements := make([]interface{}, size) + elements := make([]T, size) for i := 0; i < size; i++ { elements[i] = r.elements[(start+i)%len(r.elements)] } diff --git a/ring_test.go b/ring_test.go index 7e86f66..4c6d7be 100644 --- a/ring_test.go +++ b/ring_test.go @@ -9,12 +9,12 @@ import ( func TestNewRing(t *testing.T) { assert.Panics(t, func() { - NewRing(0) + NewRing[int](0) }) } func TestRingLess(t *testing.T) { - ring := NewRing(5) + ring := NewRing[int](5) for i := 0; i < 3; i++ { ring.Add(i) } @@ -23,7 +23,7 @@ func TestRingLess(t *testing.T) { } func TestRingMore(t *testing.T) { - ring := NewRing(5) + ring := NewRing[int](5) for i := 0; i < 11; i++ { ring.Add(i) } @@ -32,7 +32,7 @@ func TestRingMore(t *testing.T) { } func TestRingAdd(t *testing.T) { - ring := NewRing(5051) + ring := NewRing[int](5051) wg := sync.WaitGroup{} for i := 1; i <= 100; i++ { wg.Add(1) @@ -48,7 +48,7 @@ func TestRingAdd(t *testing.T) { } func BenchmarkRingAdd(b *testing.B) { - ring := NewRing(500) + ring := NewRing[int](500) b.RunParallel(func(pb *testing.PB) { for pb.Next() { for i := 0; i < b.N; i++ { diff --git a/stream.go b/stream.go index 9018753..060e7ed 100644 --- a/stream.go +++ b/stream.go @@ -17,42 +17,42 @@ type ( } // FilterFunc defines the method to filter a Stream. - FilterFunc func(item interface{}) bool + FilterFunc[T any] func(item T) bool // ForAllFunc defines the method to handle all elements in a Stream. - ForAllFunc func(pipe <-chan interface{}) + ForAllFunc[T any] func(pipe <-chan T) // ForEachFunc defines the method to handle each element in a Stream. - ForEachFunc func(item interface{}) + ForEachFunc[T any] func(item T) // GenerateFunc defines the method to send elements into a Stream. - GenerateFunc func(source chan<- interface{}) + GenerateFunc[T any] func(source chan<- T) // KeyFunc defines the method to generate keys for the elements in a Stream. - KeyFunc func(item interface{}) interface{} + KeyFunc[T, U any] func(item T) U // LessFunc defines the method to compare the elements in a Stream. - LessFunc func(a, b interface{}) bool + LessFunc[T any] func(a, b T) bool // MapFunc defines the method to map each element to another object in a Stream. - MapFunc func(item interface{}) interface{} + MapFunc[T, U any] func(item T) U // Option defines the method to customize a Stream. Option func(opts *rxOptions) // ParallelFunc defines the method to handle elements parallelly. - ParallelFunc func(item interface{}) + ParallelFunc[T any] func(item T) // ReduceFunc defines the method to reduce all the elements in a Stream. - ReduceFunc func(pipe <-chan interface{}) (interface{}, error) + ReduceFunc[T, U any] func(pipe <-chan T) (U, error) // WalkFunc defines the method to walk through all the elements in a Stream. - WalkFunc func(item interface{}, pipe chan<- interface{}) + WalkFunc[T, U any] func(item T, pipe chan<- U) // A Stream is a stream that can be used to do stream processing. - Stream struct { - source <-chan interface{} + Stream[T any] struct { + source <-chan T } ) // Concat returns a concatenated Stream. -func Concat(s Stream, others ...Stream) Stream { +func Concat[T any](s Stream[T], others ...Stream[T]) Stream[T] { return s.Concat(others...) } // From constructs a Stream from the given GenerateFunc. -func From(generate GenerateFunc) Stream { - source := make(chan interface{}) +func From[T any](generate GenerateFunc[T]) Stream[T] { + source := make(chan T) go func() { defer close(source) @@ -63,8 +63,8 @@ func From(generate GenerateFunc) Stream { } // Just converts the given arbitrary items to a Stream. -func Just(items ...interface{}) Stream { - source := make(chan interface{}, len(items)) +func Just[T any](items ...T) Stream[T] { + source := make(chan T, len(items)) for _, item := range items { source <- item } @@ -74,8 +74,8 @@ func Just(items ...interface{}) Stream { } // Range converts the given channel to a Stream. -func Range(source <-chan interface{}) Stream { - return Stream{ +func Range[T any](source <-chan T) Stream[T] { + return Stream[T]{ source: source, } } @@ -83,7 +83,7 @@ func Range(source <-chan interface{}) Stream { // AllMach returns whether all elements of this stream match the provided predicate. // May not evaluate the predicate on all elements if not necessary for determining the result. // If the stream is empty then true is returned and the predicate is not evaluated. -func (s Stream) AllMach(predicate func(item interface{}) bool) bool { +func (s Stream[T]) AllMach(predicate func(item T) bool) bool { for item := range s.source { if !predicate(item) { // make sure the former goroutine not block, and current func returns fast. @@ -98,7 +98,7 @@ func (s Stream) AllMach(predicate func(item interface{}) bool) bool { // AnyMach returns whether any elements of this stream match the provided predicate. // May not evaluate the predicate on all elements if not necessary for determining the result. // If the stream is empty then false is returned and the predicate is not evaluated. -func (s Stream) AnyMach(predicate func(item interface{}) bool) bool { +func (s Stream[T]) AnyMach(predicate func(item T) bool) bool { for item := range s.source { if predicate(item) { // make sure the former goroutine not block, and current func returns fast. @@ -112,12 +112,12 @@ func (s Stream) AnyMach(predicate func(item interface{}) bool) bool { // Buffer buffers the items into a queue with size n. // It can balance the producer and the consumer if their processing throughput don't match. -func (s Stream) Buffer(n int) Stream { +func (s Stream[T]) Buffer(n int) Stream[T] { if n < 0 { n = 0 } - source := make(chan interface{}, n) + source := make(chan T, n) go func() { for item := range s.source { source <- item @@ -129,8 +129,8 @@ func (s Stream) Buffer(n int) Stream { } // Concat returns a Stream that concatenated other streams -func (s Stream) Concat(others ...Stream) Stream { - source := make(chan interface{}) +func (s Stream[T]) Concat(others ...Stream[T]) Stream[T] { + source := make(chan T) go func() { group := NewRoutineGroup() @@ -157,7 +157,7 @@ func (s Stream) Concat(others ...Stream) Stream { } // Count counts the number of elements in the result. -func (s Stream) Count() (count int) { +func (s Stream[T]) Count() (count int) { for range s.source { count++ } @@ -165,13 +165,13 @@ func (s Stream) Count() (count int) { } // Distinct removes the duplicated items base on the given KeyFunc. -func (s Stream) Distinct(fn KeyFunc) Stream { - source := make(chan interface{}) +func (s Stream[T, U]) Distinct(fn KeyFunc[U]) Stream[T] { + source := make(chan T) go func() { defer close(source) - keys := make(map[interface{}]struct{}) + keys := make(map[U]struct{}) for item := range s.source { key := fn(item) if _, ok := keys[key]; !ok { @@ -185,46 +185,46 @@ func (s Stream) Distinct(fn KeyFunc) Stream { } // Done waits all upstreaming operations to be done. -func (s Stream) Done() { +func (s Stream[T]) Done() { drain(s.source) } // Filter filters the items by the given FilterFunc. -func (s Stream) Filter(fn FilterFunc, opts ...Option) Stream { - return s.Walk(func(item interface{}, pipe chan<- interface{}) { +func (s Stream[T]) Filter(fn FilterFunc[T], opts ...Option) Stream[T] { + return s.Walk(func(item T, pipe chan<- T) { if fn(item) { pipe <- item } }, opts...) } -// First returns the first item, nil if no items. -func (s Stream) First() interface{} { - for item := range s.source { +// First returns the first item, zero value if no items. +func (s Stream[T]) First() (item T) { + for item = range s.source { // make sure the former goroutine not block, and current func returns fast. go drain(s.source) return item } - return nil + return } // ForAll handles the streaming elements from the source and no later streams. -func (s Stream) ForAll(fn ForAllFunc) { +func (s Stream[T]) ForAll(fn ForAllFunc[T]) { fn(s.source) // avoid goroutine leak on fn not consuming all items. go drain(s.source) } // ForEach seals the Stream with the ForEachFunc on each item, no successive operations. -func (s Stream) ForEach(fn ForEachFunc) { +func (s Stream[T]) ForEach(fn ForEachFunc[T]) { for item := range s.source { fn(item) } } // Group groups the elements into different groups based on their keys. -func (s Stream) Group(fn KeyFunc) Stream { +func (s Stream[T, U]) Group(fn KeyFunc[U]) Stream[T] { groups := make(map[interface{}][]interface{}) for item := range s.source { key := fn(item) @@ -243,12 +243,12 @@ func (s Stream) Group(fn KeyFunc) Stream { } // Head returns the first n elements in p. -func (s Stream) Head(n int64) Stream { +func (s Stream[T]) Head(n int64) Stream[T] { if n < 1 { panic("n must be greater than 0") } - source := make(chan interface{}) + source := make(chan T) go func() { for item := range s.source { @@ -275,27 +275,27 @@ func (s Stream) Head(n int64) Stream { } // Last returns the last item, or nil if no items. -func (s Stream) Last() (item interface{}) { +func (s Stream[T]) Last() (item T) { for item = range s.source { } return } // Map converts each item to another corresponding item, which means it's a 1:1 model. -func (s Stream) Map(fn MapFunc, opts ...Option) Stream { - return s.Walk(func(item interface{}, pipe chan<- interface{}) { +func (s Stream[T, U]) Map(fn MapFunc[U], opts ...Option) Stream[U] { + return s.Walk(func(item T, pipe chan<- U) { pipe <- fn(item) }, opts...) } // Merge merges all the items into a slice and generates a new stream. -func (s Stream) Merge() Stream { - var items []interface{} +func (s Stream[T]) Merge() Stream[[]T] { + var items []T for item := range s.source { items = append(items, item) } - source := make(chan interface{}, 1) + source := make(chan []T, 1) source <- items close(source) @@ -305,7 +305,7 @@ func (s Stream) Merge() Stream { // NoneMatch returns whether all elements of this stream don't match the provided predicate. // May not evaluate the predicate on all elements if not necessary for determining the result. // If the stream is empty then true is returned and the predicate is not evaluated. -func (s Stream) NoneMatch(predicate func(item interface{}) bool) bool { +func (s Stream[T]) NoneMatch(predicate func(item T) bool) bool { for item := range s.source { if predicate(item) { // make sure the former goroutine not block, and current func returns fast. @@ -318,20 +318,20 @@ func (s Stream) NoneMatch(predicate func(item interface{}) bool) bool { } // Parallel applies the given ParallelFunc to each item concurrently with given number of workers. -func (s Stream) Parallel(fn ParallelFunc, opts ...Option) { - s.Walk(func(item interface{}, pipe chan<- interface{}) { +func (s Stream[T]) Parallel(fn ParallelFunc[T], opts ...Option) { + s.Walk(func(item T, pipe chan<- interface{}) { fn(item) }, opts...).Done() } // Reduce is a utility method to let the caller deal with the underlying channel. -func (s Stream) Reduce(fn ReduceFunc) (interface{}, error) { +func (s Stream[T]) Reduce(fn ReduceFunc) (interface{}, error) { return fn(s.source) } // Reverse reverses the elements in the stream. -func (s Stream) Reverse() Stream { - var items []interface{} +func (s Stream[T]) Reverse() Stream[T] { + var items []T for item := range s.source { items = append(items, item) } @@ -345,7 +345,7 @@ func (s Stream) Reverse() Stream { } // Skip returns a Stream that skips size elements. -func (s Stream) Skip(n int64) Stream { +func (s Stream[T]) Skip(n int64) Stream[T] { if n < 0 { panic("n must not be negative") } @@ -353,7 +353,7 @@ func (s Stream) Skip(n int64) Stream { return s } - source := make(chan interface{}) + source := make(chan T) go func() { for item := range s.source { @@ -371,8 +371,8 @@ func (s Stream) Skip(n int64) Stream { } // Sort sorts the items from the underlying source. -func (s Stream) Sort(less LessFunc) Stream { - var items []interface{} +func (s Stream[T]) Sort(less LessFunc[T]) Stream[T] { + var items []T for item := range s.source { items = append(items, item) } @@ -385,14 +385,14 @@ func (s Stream) Sort(less LessFunc) Stream { // Split splits the elements into chunk with size up to n, // might be less than n on tailing elements. -func (s Stream) Split(n int) Stream { +func (s Stream[T]) Split(n int) Stream[[]T] { if n < 1 { panic("n should be greater than 0") } - source := make(chan interface{}) + source := make(chan []T) go func() { - var chunk []interface{} + var chunk []T for item := range s.source { chunk = append(chunk, item) if len(chunk) == n { @@ -410,15 +410,15 @@ func (s Stream) Split(n int) Stream { } // Tail returns the last n elements in p. -func (s Stream) Tail(n int64) Stream { +func (s Stream[T]) Tail(n int64) Stream[T] { if n < 1 { panic("n should be greater than 0") } - source := make(chan interface{}) + source := make(chan T) go func() { - ring := NewRing(int(n)) + ring := NewRing[T](int(n)) for item := range s.source { ring.Add(item) } @@ -432,7 +432,7 @@ func (s Stream) Tail(n int64) Stream { } // Walk lets the callers handle each item, the caller may write zero, one or more items base on the given item. -func (s Stream) Walk(fn WalkFunc, opts ...Option) Stream { +func (s Stream[T]) Walk(fn WalkFunc, opts ...Option) Stream { option := buildOptions(opts...) if option.unlimitedWorkers { return s.walkUnlimited(fn, option) @@ -441,7 +441,7 @@ func (s Stream) Walk(fn WalkFunc, opts ...Option) Stream { return s.walkLimited(fn, option) } -func (s Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream { +func (s Stream[T]) walkLimited(fn WalkFunc, option *rxOptions) Stream { pipe := make(chan interface{}, option.workers) go func() { @@ -471,7 +471,7 @@ func (s Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream { return Range(pipe) } -func (s Stream) walkUnlimited(fn WalkFunc, option *rxOptions) Stream { +func (s Stream[T]) walkUnlimited(fn WalkFunc, option *rxOptions) Stream { pipe := make(chan interface{}, option.workers) go func() { @@ -523,7 +523,7 @@ func buildOptions(opts ...Option) *rxOptions { } // drain drains the given channel. -func drain(channel <-chan interface{}) { +func drain[T any](channel <-chan T) { for range channel { } } From c59f1307c5ee527bd0520724656ec426685f587f Mon Sep 17 00:00:00 2001 From: kevin Date: Thu, 6 Jan 2022 18:34:50 +0800 Subject: [PATCH 2/2] fix: some methods with type not working --- stream.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/stream.go b/stream.go index 060e7ed..06a728c 100644 --- a/stream.go +++ b/stream.go @@ -25,7 +25,7 @@ type ( // GenerateFunc defines the method to send elements into a Stream. GenerateFunc[T any] func(source chan<- T) // KeyFunc defines the method to generate keys for the elements in a Stream. - KeyFunc[T, U any] func(item T) U + KeyFunc[T any] func(item T) interface{} // LessFunc defines the method to compare the elements in a Stream. LessFunc[T any] func(a, b T) bool // MapFunc defines the method to map each element to another object in a Stream. @@ -165,13 +165,13 @@ func (s Stream[T]) Count() (count int) { } // Distinct removes the duplicated items base on the given KeyFunc. -func (s Stream[T, U]) Distinct(fn KeyFunc[U]) Stream[T] { +func (s Stream[T]) Distinct(fn KeyFunc[T]) Stream[T] { source := make(chan T) go func() { defer close(source) - keys := make(map[U]struct{}) + keys := make(map[interface{}]struct{}) for item := range s.source { key := fn(item) if _, ok := keys[key]; !ok { @@ -224,14 +224,14 @@ func (s Stream[T]) ForEach(fn ForEachFunc[T]) { } // Group groups the elements into different groups based on their keys. -func (s Stream[T, U]) Group(fn KeyFunc[U]) Stream[T] { - groups := make(map[interface{}][]interface{}) +func (s Stream[T]) Group(fn KeyFunc[T]) Stream[[]T] { + groups := make(map[interface{}][]T) for item := range s.source { key := fn(item) groups[key] = append(groups[key], item) } - source := make(chan interface{}) + source := make(chan []T) go func() { for _, group := range groups { source <- group @@ -282,7 +282,7 @@ func (s Stream[T]) Last() (item T) { } // Map converts each item to another corresponding item, which means it's a 1:1 model. -func (s Stream[T, U]) Map(fn MapFunc[U], opts ...Option) Stream[U] { +func (s Stream[T]) Map(fn MapFunc[T, U any], opts ...Option) Stream[U] { return s.Walk(func(item T, pipe chan<- U) { pipe <- fn(item) }, opts...)