Skip to content

Commit 9a9eeb9

Browse files
committed
Multi batch support
1 parent f9197ad commit 9a9eeb9

File tree

6 files changed

+253
-75
lines changed

6 files changed

+253
-75
lines changed

Diff for: exporter/exporterhelper/internal/queue_sender.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ import (
2020

2121
// QueueBatchSettings is a subset of the queuebatch.Settings that are needed when used within an Exporter.
2222
type QueueBatchSettings[K any] struct {
23-
Encoding queuebatch.Encoding[K]
24-
Sizers map[request.SizerType]request.Sizer[K]
23+
Encoding queuebatch.Encoding[K]
24+
Sizers map[request.SizerType]request.Sizer[K]
25+
Partitioner queuebatch.Partitioner[K]
2526
}
2627

2728
// NewDefaultQueueConfig returns the default config for queuebatch.Config.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
5+
6+
import (
7+
"context"
8+
"sync"
9+
"time"
10+
11+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
12+
)
13+
14+
type batch struct {
15+
ctx context.Context
16+
req request.Request
17+
done multiDone
18+
created time.Time
19+
}
20+
21+
type shard struct {
22+
sync.Mutex
23+
*batch
24+
}
25+
26+
func newShard() *shard {
27+
return &shard{
28+
sync.Mutex{},
29+
nil,
30+
}
31+
}
32+
33+
type partitionManager interface {
34+
// getShard() returns a shard for the given request.
35+
getShard(ctx context.Context, req request.Request) *shard
36+
37+
// forEachShard() iterates over all shards and calls the given callback function.
38+
forEachShard(func(*shard))
39+
}
40+
41+
func newPartitionManager(partitioner Partitioner[request.Request]) partitionManager {
42+
if partitioner == nil {
43+
return newSinglePartitionManager()
44+
}
45+
return newMultiPartitionManager(partitioner)
46+
}
47+
48+
type singlePartitionManager struct {
49+
shard *shard
50+
}
51+
52+
func newSinglePartitionManager() *singlePartitionManager {
53+
return &singlePartitionManager{
54+
shard: newShard(),
55+
}
56+
}
57+
58+
func (bm *singlePartitionManager) getShard(_ context.Context, _ request.Request) *shard {
59+
return bm.shard
60+
}
61+
62+
func (bm *singlePartitionManager) forEachShard(callback func(*shard)) {
63+
callback(bm.shard)
64+
}
65+
66+
type multiPartitionManager struct {
67+
// TODO: Currently, each partition can has only one shard. We need to support multiple shards per partition.
68+
partitionMap map[string]*shard
69+
partitioner Partitioner[request.Request]
70+
mu sync.RWMutex
71+
}
72+
73+
func newMultiPartitionManager(partitioner Partitioner[request.Request]) *multiPartitionManager {
74+
return &multiPartitionManager{
75+
partitionMap: make(map[string]*shard),
76+
partitioner: partitioner,
77+
}
78+
}
79+
80+
// getShard() retursn the shard for the given request. It creates a new shard if the partition is not found.
81+
func (bm *multiPartitionManager) getShard(ctx context.Context, req request.Request) *shard {
82+
key := bm.partitioner.GetKey(ctx, req)
83+
84+
bm.mu.RLock()
85+
if shard, ok := bm.partitionMap[key]; ok {
86+
return shard
87+
}
88+
bm.mu.RUnlock()
89+
90+
bm.mu.Lock()
91+
defer bm.mu.Unlock()
92+
shard := newShard()
93+
bm.partitionMap[key] = shard
94+
return shard
95+
}
96+
97+
func (bm *multiPartitionManager) forEachShard(callback func(*shard)) {
98+
bm.mu.RLock()
99+
defer bm.mu.RUnlock()
100+
for _, shard := range bm.partitionMap {
101+
callback(shard)
102+
}
103+
}

Diff for: exporter/exporterhelper/internal/queuebatch/default_batcher.go

+67-71
Original file line numberDiff line numberDiff line change
@@ -15,31 +15,26 @@ import (
1515
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1616
)
1717

18-
type batch struct {
19-
ctx context.Context
20-
req request.Request
21-
done multiDone
22-
}
23-
2418
type batcherSettings[K any] struct {
25-
sizerType request.SizerType
26-
sizer request.Sizer[K]
27-
next sender.SendFunc[K]
28-
maxWorkers int
19+
sizerType request.SizerType
20+
sizer request.Sizer[K]
21+
partitioner Partitioner[K]
22+
next sender.SendFunc[K]
23+
maxWorkers int
2924
}
3025

3126
// defaultBatcher continuously batch incoming requests and flushes asynchronously if minimum size limit is met or on timeout.
3227
type defaultBatcher struct {
33-
cfg BatchConfig
34-
workerPool chan struct{}
35-
sizerType request.SizerType
36-
sizer request.Sizer[request.Request]
37-
consumeFunc sender.SendFunc[request.Request]
38-
stopWG sync.WaitGroup
39-
currentBatchMu sync.Mutex
40-
currentBatch *batch
41-
timer *time.Timer
42-
shutdownCh chan struct{}
28+
cfg BatchConfig
29+
workerPool chan struct{}
30+
sizerType request.SizerType
31+
sizer request.Sizer[request.Request]
32+
consumeFunc sender.SendFunc[request.Request]
33+
stopWG sync.WaitGroup
34+
ticker *time.Ticker
35+
shutdownCh chan struct{}
36+
37+
partitionManager partitionManager
4338
}
4439

4540
func newDefaultBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request]) *defaultBatcher {
@@ -52,30 +47,26 @@ func newDefaultBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request])
5247
}
5348
}
5449
return &defaultBatcher{
55-
cfg: bCfg,
56-
workerPool: workerPool,
57-
sizerType: bSet.sizerType,
58-
sizer: bSet.sizer,
59-
consumeFunc: bSet.next,
60-
stopWG: sync.WaitGroup{},
61-
shutdownCh: make(chan struct{}, 1),
62-
}
63-
}
64-
65-
func (qb *defaultBatcher) resetTimer() {
66-
if qb.cfg.FlushTimeout > 0 {
67-
qb.timer.Reset(qb.cfg.FlushTimeout)
50+
cfg: bCfg,
51+
workerPool: workerPool,
52+
sizerType: bSet.sizerType,
53+
sizer: bSet.sizer,
54+
consumeFunc: bSet.next,
55+
stopWG: sync.WaitGroup{},
56+
shutdownCh: make(chan struct{}, 1),
57+
partitionManager: newPartitionManager(bSet.partitioner),
6858
}
6959
}
7060

7161
func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done Done) {
72-
qb.currentBatchMu.Lock()
62+
shard := qb.partitionManager.getShard(ctx, req)
63+
shard.Lock()
7364

74-
if qb.currentBatch == nil {
65+
if shard.batch == nil {
7566
reqList, mergeSplitErr := req.MergeSplit(ctx, int(qb.cfg.MaxSize), qb.sizerType, nil)
7667
if mergeSplitErr != nil || len(reqList) == 0 {
7768
done.OnDone(mergeSplitErr)
78-
qb.currentBatchMu.Unlock()
69+
shard.Unlock()
7970
return
8071
}
8172

@@ -90,27 +81,27 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
9081
if qb.sizer.Sizeof(lastReq) < qb.cfg.MinSize {
9182
// Do not flush the last item and add it to the current batch.
9283
reqList = reqList[:len(reqList)-1]
93-
qb.currentBatch = &batch{
94-
ctx: ctx,
95-
req: lastReq,
96-
done: multiDone{done},
84+
shard.batch = &batch{
85+
ctx: ctx,
86+
req: lastReq,
87+
done: multiDone{done},
88+
created: time.Now(),
9789
}
98-
qb.resetTimer()
9990
}
10091

101-
qb.currentBatchMu.Unlock()
92+
shard.Unlock()
10293
for i := 0; i < len(reqList); i++ {
10394
qb.flush(ctx, reqList[i], done)
10495
}
10596

10697
return
10798
}
10899

109-
reqList, mergeSplitErr := qb.currentBatch.req.MergeSplit(ctx, int(qb.cfg.MaxSize), qb.sizerType, req)
100+
reqList, mergeSplitErr := shard.batch.req.MergeSplit(ctx, int(qb.cfg.MaxSize), qb.sizerType, req)
110101
// If failed to merge signal all Done callbacks from current batch as well as the current request and reset the current batch.
111102
if mergeSplitErr != nil || len(reqList) == 0 {
112103
done.OnDone(mergeSplitErr)
113-
qb.currentBatchMu.Unlock()
104+
shard.Unlock()
114105
return
115106
}
116107

@@ -126,15 +117,15 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
126117

127118
// Logic on how to deal with the current batch:
128119
// TODO: Deal with merging Context.
129-
qb.currentBatch.req = reqList[0]
130-
qb.currentBatch.done = append(qb.currentBatch.done, done)
120+
shard.req = reqList[0]
121+
shard.done = append(shard.done, done)
131122
// Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and
132123
// cannot unlock and re-lock because we are not done processing all the responses.
133124
var firstBatch *batch
134125
// Need to check the currentBatch if more than 1 result returned or if 1 result return but larger than MinSize.
135-
if len(reqList) > 1 || qb.sizer.Sizeof(qb.currentBatch.req) >= qb.cfg.MinSize {
136-
firstBatch = qb.currentBatch
137-
qb.currentBatch = nil
126+
if len(reqList) > 1 || qb.sizer.Sizeof(shard.batch.req) >= qb.cfg.MinSize {
127+
firstBatch = shard.batch
128+
shard.batch = nil
138129
}
139130
// At this moment we dealt with the first result which is iter in the currentBatch or in the `firstBatch` we will flush.
140131
reqList = reqList[1:]
@@ -145,16 +136,16 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
145136
if qb.sizer.Sizeof(lastReq) < qb.cfg.MinSize {
146137
// Do not flush the last item and add it to the current batch.
147138
reqList = reqList[:len(reqList)-1]
148-
qb.currentBatch = &batch{
149-
ctx: ctx,
150-
req: lastReq,
151-
done: multiDone{done},
139+
shard.batch = &batch{
140+
ctx: ctx,
141+
req: lastReq,
142+
done: multiDone{done},
143+
created: time.Now(),
152144
}
153-
qb.resetTimer()
154145
}
155146
}
156147

157-
qb.currentBatchMu.Unlock()
148+
shard.Unlock()
158149
if firstBatch != nil {
159150
qb.flush(firstBatch.ctx, firstBatch.req, firstBatch.done)
160151
}
@@ -172,8 +163,8 @@ func (qb *defaultBatcher) startTimeBasedFlushingGoroutine() {
172163
select {
173164
case <-qb.shutdownCh:
174165
return
175-
case <-qb.timer.C:
176-
qb.flushCurrentBatchIfNecessary()
166+
case <-qb.ticker.C:
167+
qb.flushCurrentBatchIfNecessary(false)
177168
}
178169
}
179170
}()
@@ -182,27 +173,32 @@ func (qb *defaultBatcher) startTimeBasedFlushingGoroutine() {
182173
// Start starts the goroutine that reads from the queue and flushes asynchronously.
183174
func (qb *defaultBatcher) Start(_ context.Context, _ component.Host) error {
184175
if qb.cfg.FlushTimeout > 0 {
185-
qb.timer = time.NewTimer(qb.cfg.FlushTimeout)
176+
qb.ticker = time.NewTicker(qb.cfg.FlushTimeout)
186177
qb.startTimeBasedFlushingGoroutine()
187178
}
188179

189180
return nil
190181
}
191182

192183
// flushCurrentBatchIfNecessary sends out the current request batch if it is not nil
193-
func (qb *defaultBatcher) flushCurrentBatchIfNecessary() {
194-
qb.currentBatchMu.Lock()
195-
if qb.currentBatch == nil {
196-
qb.currentBatchMu.Unlock()
197-
return
198-
}
199-
batchToFlush := qb.currentBatch
200-
qb.currentBatch = nil
201-
qb.currentBatchMu.Unlock()
184+
func (qb *defaultBatcher) flushCurrentBatchIfNecessary(forceFlush bool) {
185+
qb.partitionManager.forEachShard(func(shard *shard) {
186+
shard.Lock()
187+
if shard.batch == nil {
188+
shard.Unlock()
189+
return
190+
}
191+
if !forceFlush && time.Since(shard.created) < qb.cfg.FlushTimeout {
192+
shard.Unlock()
193+
return
194+
}
195+
batchToFlush := shard.batch
196+
shard.batch = nil
197+
shard.Unlock()
202198

203-
// flush() blocks until successfully started a goroutine for flushing.
204-
qb.flush(batchToFlush.ctx, batchToFlush.req, batchToFlush.done)
205-
qb.resetTimer()
199+
// flush() blocks until successfully started a goroutine for flushing.
200+
qb.flush(batchToFlush.ctx, batchToFlush.req, batchToFlush.done)
201+
})
206202
}
207203

208204
// flush starts a goroutine that calls consumeFunc. It blocks until a worker is available if necessary.
@@ -224,7 +220,7 @@ func (qb *defaultBatcher) flush(ctx context.Context, req request.Request, done D
224220
func (qb *defaultBatcher) Shutdown(_ context.Context) error {
225221
close(qb.shutdownCh)
226222
// Make sure execute one last flush if necessary.
227-
qb.flushCurrentBatchIfNecessary()
223+
qb.flushCurrentBatchIfNecessary(true)
228224
qb.stopWG.Wait()
229225
return nil
230226
}

0 commit comments

Comments
 (0)