Skip to content

Commit e4f7de9

Browse files
committed
Multi batch support
1 parent ea0a4c6 commit e4f7de9

File tree

5 files changed

+175
-71
lines changed

5 files changed

+175
-71
lines changed

exporter/exporterhelper/internal/queue_sender.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ import (
2222

2323
// QueueBatchSettings is a subset of the queuebatch.Settings that are needed when used within an Exporter.
2424
type QueueBatchSettings[K any] struct {
25-
Encoding queuebatch.Encoding[K]
26-
Sizers map[request.SizerType]request.Sizer[K]
25+
Encoding queuebatch.Encoding[K]
26+
Sizers map[request.SizerType]request.Sizer[K]
27+
BatchKeyGetter queuebatch.BatchKeyGetter[K]
2728
}
2829

2930
// NewDefaultQueueConfig returns the default config for QueueConfig.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
5+
6+
import (
7+
"context"
8+
"sync"
9+
"time"
10+
11+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
12+
)
13+
14+
type batch struct {
15+
ctx context.Context
16+
req request.Request
17+
done multiDone
18+
created time.Time
19+
}
20+
21+
type batchEntry struct {
22+
*batch
23+
mu sync.Mutex
24+
}
25+
26+
func newBatchEntry() *batchEntry {
27+
return &batchEntry{
28+
nil,
29+
sync.Mutex{},
30+
}
31+
}
32+
33+
type batchManager interface {
34+
getBatch(ctx context.Context, req request.Request) *batchEntry
35+
forEachBatch(func(*batchEntry))
36+
}
37+
38+
func newBatchManager(keyGetter BatchKeyGetter[request.Request]) batchManager {
39+
if keyGetter == nil {
40+
return &singleBatchManager{
41+
batch: newBatchEntry(),
42+
}
43+
}
44+
return &multiBatchManager{
45+
batchMap: make(map[string]*batchEntry),
46+
keyGetter: keyGetter,
47+
}
48+
}
49+
50+
type singleBatchManager struct {
51+
batch *batchEntry
52+
}
53+
54+
func (bm *singleBatchManager) getBatch(_ context.Context, _ request.Request) *batchEntry {
55+
return bm.batch
56+
}
57+
58+
func (bm *singleBatchManager) forEachBatch(callback func(*batchEntry)) {
59+
callback(bm.batch)
60+
}
61+
62+
type multiBatchManager struct {
63+
batchMap map[string]*batchEntry
64+
keyGetter BatchKeyGetter[request.Request]
65+
mu sync.RWMutex
66+
}
67+
68+
func (bm *multiBatchManager) forEachBatch(callback func(*batchEntry)) {
69+
bm.mu.RLock()
70+
for _, batchEntry := range bm.batchMap {
71+
callback(batchEntry)
72+
}
73+
bm.mu.RUnlock()
74+
}
75+
76+
func (bm *multiBatchManager) getBatch(ctx context.Context, req request.Request) *batchEntry {
77+
key := bm.keyGetter.GetKey(req)
78+
79+
bm.mu.RLock()
80+
batchEntry, ok := bm.batchMap[key]
81+
bm.mu.RUnlock()
82+
if ok {
83+
return batchEntry
84+
}
85+
bm.mu.Lock()
86+
batchEntry = newBatchEntry()
87+
bm.batchMap[key] = batchEntry
88+
bm.mu.Unlock()
89+
return batchEntry
90+
}

exporter/exporterhelper/internal/queuebatch/default_batcher.go

+63-67
Original file line numberDiff line numberDiff line change
@@ -15,31 +15,26 @@ import (
1515
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1616
)
1717

18-
type batch struct {
19-
ctx context.Context
20-
req request.Request
21-
done multiDone
22-
}
23-
2418
type batcherSettings[K any] struct {
2519
sizerType request.SizerType
2620
sizer request.Sizer[K]
21+
keyGetter BatchKeyGetter[K]
2722
next sender.SendFunc[K]
2823
maxWorkers int
2924
}
3025

3126
// defaultBatcher continuously batch incoming requests and flushes asynchronously if minimum size limit is met or on timeout.
3227
type defaultBatcher struct {
33-
cfg BatchConfig
34-
workerPool chan struct{}
35-
sizerType request.SizerType
36-
sizer request.Sizer[request.Request]
37-
consumeFunc sender.SendFunc[request.Request]
38-
stopWG sync.WaitGroup
39-
currentBatchMu sync.Mutex
40-
currentBatch *batch
41-
timer *time.Timer
42-
shutdownCh chan struct{}
28+
cfg BatchConfig
29+
workerPool chan struct{}
30+
sizerType request.SizerType
31+
sizer request.Sizer[request.Request]
32+
consumeFunc sender.SendFunc[request.Request]
33+
stopWG sync.WaitGroup
34+
ticker *time.Ticker
35+
shutdownCh chan struct{}
36+
37+
batchManager batchManager
4338
}
4439

4540
func newDefaultBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request]) *defaultBatcher {
@@ -52,30 +47,26 @@ func newDefaultBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request])
5247
}
5348
}
5449
return &defaultBatcher{
55-
cfg: bCfg,
56-
workerPool: workerPool,
57-
sizerType: bSet.sizerType,
58-
sizer: bSet.sizer,
59-
consumeFunc: bSet.next,
60-
stopWG: sync.WaitGroup{},
61-
shutdownCh: make(chan struct{}, 1),
62-
}
63-
}
64-
65-
func (qb *defaultBatcher) resetTimer() {
66-
if qb.cfg.FlushTimeout > 0 {
67-
qb.timer.Reset(qb.cfg.FlushTimeout)
50+
cfg: bCfg,
51+
workerPool: workerPool,
52+
sizerType: bSet.sizerType,
53+
sizer: bSet.sizer,
54+
consumeFunc: bSet.next,
55+
stopWG: sync.WaitGroup{},
56+
shutdownCh: make(chan struct{}, 1),
57+
batchManager: newBatchManager(bSet.keyGetter),
6858
}
6959
}
7060

7161
func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done Done) {
72-
qb.currentBatchMu.Lock()
62+
batchEntry := qb.batchManager.getBatch(ctx, req)
63+
batchEntry.mu.Lock()
7364

74-
if qb.currentBatch == nil {
65+
if batchEntry.batch == nil {
7566
reqList, mergeSplitErr := req.MergeSplit(ctx, int(qb.cfg.MaxSize), qb.sizerType, nil)
7667
if mergeSplitErr != nil || len(reqList) == 0 {
7768
done.OnDone(mergeSplitErr)
78-
qb.currentBatchMu.Unlock()
69+
batchEntry.mu.Unlock()
7970
return
8071
}
8172

@@ -90,27 +81,27 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
9081
if qb.sizer.Sizeof(lastReq) < qb.cfg.MinSize {
9182
// Do not flush the last item and add it to the current batch.
9283
reqList = reqList[:len(reqList)-1]
93-
qb.currentBatch = &batch{
94-
ctx: ctx,
95-
req: lastReq,
96-
done: multiDone{done},
84+
batchEntry.batch = &batch{
85+
ctx: ctx,
86+
req: lastReq,
87+
done: multiDone{done},
88+
created: time.Now(),
9789
}
98-
qb.resetTimer()
9990
}
10091

101-
qb.currentBatchMu.Unlock()
92+
batchEntry.mu.Unlock()
10293
for i := 0; i < len(reqList); i++ {
10394
qb.flush(ctx, reqList[i], done)
10495
}
10596

10697
return
10798
}
10899

109-
reqList, mergeSplitErr := qb.currentBatch.req.MergeSplit(ctx, int(qb.cfg.MaxSize), qb.sizerType, req)
100+
reqList, mergeSplitErr := batchEntry.batch.req.MergeSplit(ctx, int(qb.cfg.MaxSize), qb.sizerType, req)
110101
// If failed to merge signal all Done callbacks from current batch as well as the current request and reset the current batch.
111102
if mergeSplitErr != nil || len(reqList) == 0 {
112103
done.OnDone(mergeSplitErr)
113-
qb.currentBatchMu.Unlock()
104+
batchEntry.mu.Unlock()
114105
return
115106
}
116107

@@ -126,15 +117,15 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
126117

127118
// Logic on how to deal with the current batch:
128119
// TODO: Deal with merging Context.
129-
qb.currentBatch.req = reqList[0]
130-
qb.currentBatch.done = append(qb.currentBatch.done, done)
120+
batchEntry.req = reqList[0]
121+
batchEntry.done = append(batchEntry.done, done)
131122
// Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and
132123
// cannot unlock and re-lock because we are not done processing all the responses.
133124
var firstBatch *batch
134125
// Need to check the currentBatch if more than 1 result returned or if 1 result return but larger than MinSize.
135-
if len(reqList) > 1 || qb.sizer.Sizeof(qb.currentBatch.req) >= qb.cfg.MinSize {
136-
firstBatch = qb.currentBatch
137-
qb.currentBatch = nil
126+
if len(reqList) > 1 || qb.sizer.Sizeof(batchEntry.batch.req) >= qb.cfg.MinSize {
127+
firstBatch = batchEntry.batch
128+
batchEntry.batch = nil
138129
}
139130
// At this moment we dealt with the first result which is iter in the currentBatch or in the `firstBatch` we will flush.
140131
reqList = reqList[1:]
@@ -145,16 +136,16 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
145136
if qb.sizer.Sizeof(lastReq) < qb.cfg.MinSize {
146137
// Do not flush the last item and add it to the current batch.
147138
reqList = reqList[:len(reqList)-1]
148-
qb.currentBatch = &batch{
149-
ctx: ctx,
150-
req: lastReq,
151-
done: multiDone{done},
139+
batchEntry.batch = &batch{
140+
ctx: ctx,
141+
req: lastReq,
142+
done: multiDone{done},
143+
created: time.Now(),
152144
}
153-
qb.resetTimer()
154145
}
155146
}
156147

157-
qb.currentBatchMu.Unlock()
148+
batchEntry.mu.Unlock()
158149
if firstBatch != nil {
159150
qb.flush(firstBatch.ctx, firstBatch.req, firstBatch.done)
160151
}
@@ -172,8 +163,8 @@ func (qb *defaultBatcher) startTimeBasedFlushingGoroutine() {
172163
select {
173164
case <-qb.shutdownCh:
174165
return
175-
case <-qb.timer.C:
176-
qb.flushCurrentBatchIfNecessary()
166+
case <-qb.ticker.C:
167+
qb.flushCurrentBatchIfNecessary(false)
177168
}
178169
}
179170
}()
@@ -182,27 +173,32 @@ func (qb *defaultBatcher) startTimeBasedFlushingGoroutine() {
182173
// Start starts the goroutine that reads from the queue and flushes asynchronously.
183174
func (qb *defaultBatcher) Start(_ context.Context, _ component.Host) error {
184175
if qb.cfg.FlushTimeout > 0 {
185-
qb.timer = time.NewTimer(qb.cfg.FlushTimeout)
176+
qb.ticker = time.NewTicker(qb.cfg.FlushTimeout)
186177
qb.startTimeBasedFlushingGoroutine()
187178
}
188179

189180
return nil
190181
}
191182

192183
// flushCurrentBatchIfNecessary sends out the current request batch if it is not nil
193-
func (qb *defaultBatcher) flushCurrentBatchIfNecessary() {
194-
qb.currentBatchMu.Lock()
195-
if qb.currentBatch == nil {
196-
qb.currentBatchMu.Unlock()
197-
return
198-
}
199-
batchToFlush := qb.currentBatch
200-
qb.currentBatch = nil
201-
qb.currentBatchMu.Unlock()
184+
func (qb *defaultBatcher) flushCurrentBatchIfNecessary(forceFlush bool) {
185+
qb.batchManager.forEachBatch(func(batchEntry *batchEntry) {
186+
batchEntry.mu.Lock()
187+
if batchEntry.batch == nil {
188+
batchEntry.mu.Unlock()
189+
return
190+
}
191+
if !forceFlush && time.Since(batchEntry.created) < qb.cfg.FlushTimeout {
192+
batchEntry.mu.Unlock()
193+
return
194+
}
195+
batchToFlush := batchEntry.batch
196+
batchEntry.batch = nil
197+
batchEntry.mu.Unlock()
202198

203-
// flush() blocks until successfully started a goroutine for flushing.
204-
qb.flush(batchToFlush.ctx, batchToFlush.req, batchToFlush.done)
205-
qb.resetTimer()
199+
// flush() blocks until successfully started a goroutine for flushing.
200+
qb.flush(batchToFlush.ctx, batchToFlush.req, batchToFlush.done)
201+
})
206202
}
207203

208204
// flush starts a goroutine that calls consumeFunc. It blocks until a worker is available if necessary.
@@ -224,7 +220,7 @@ func (qb *defaultBatcher) flush(ctx context.Context, req request.Request, done D
224220
func (qb *defaultBatcher) Shutdown(_ context.Context) error {
225221
close(qb.shutdownCh)
226222
// Make sure execute one last flush if necessary.
227-
qb.flushCurrentBatchIfNecessary()
223+
qb.flushCurrentBatchIfNecessary(true)
228224
qb.stopWG.Wait()
229225
return nil
230226
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
5+
6+
import "context"
7+
8+
// BatchKeyGetter is an interface that returns the the batch key of the given element.
9+
type BatchKeyGetter[T any] interface {
10+
GetKey(T) string
11+
}
12+
13+
type KeyFunc[T any] func(T) string
14+
15+
func (f KeyFunc[T]) GetKey(ctx context.Context, t T) string {
16+
return f(t)
17+
}

exporter/exporterhelper/internal/queuebatch/queue_batch_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -590,8 +590,8 @@ func TestQueueBatchTimerFlush(t *testing.T) {
590590
assert.LessOrEqual(t, 1, sink.RequestsCount())
591591
assert.Equal(t, 8, sink.ItemsCount())
592592

593-
// Confirm that it is flushed after 100ms (using 60+50=110 here to be safe)
594-
time.Sleep(50 * time.Millisecond)
593+
// Confirm that it is flushed after 100ms (using 100+50=150 here to be safe)
594+
time.Sleep(100 * time.Millisecond)
595595
assert.LessOrEqual(t, 2, sink.RequestsCount())
596596
assert.Equal(t, 12, sink.ItemsCount())
597597
require.NoError(t, qb.Shutdown(context.Background()))

0 commit comments

Comments
 (0)