Skip to content

[exporter][batcher] Multi-batch support - Version 1 #12735

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (

// QueueBatchSettings is a subset of the queuebatch.Settings that are needed when used within an Exporter.
type QueueBatchSettings[T any] struct {
Encoding queuebatch.Encoding[T]
Sizers map[request.SizerType]request.Sizer[T]
Encoding queuebatch.Encoding[T]
Sizers map[request.SizerType]request.Sizer[T]
Partitioner queuebatch.Partitioner[T]
}

// NewDefaultQueueConfig returns the default config for queuebatch.Config.
Expand Down
103 changes: 103 additions & 0 deletions exporter/exporterhelper/internal/queuebatch/batch_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"

import (
"context"
"sync"
"time"

"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
)

type batch struct {
ctx context.Context
req request.Request
done multiDone
created time.Time
}

type shard struct {
sync.Mutex
*batch
}

func newShard() *shard {
return &shard{
sync.Mutex{},
nil,
}
}

type partitionManager interface {
// getShard() returns a shard for the given request.
getShard(ctx context.Context, req request.Request) *shard

// forEachShard() iterates over all shards and calls the given callback function.
forEachShard(func(*shard))
}

func newPartitionManager(partitioner Partitioner[request.Request]) partitionManager {
if partitioner == nil {
return newSinglePartitionManager()
}
return newMultiPartitionManager(partitioner)
}

type singlePartitionManager struct {
shard *shard
}

func newSinglePartitionManager() *singlePartitionManager {
return &singlePartitionManager{
shard: newShard(),
}
}

func (bm *singlePartitionManager) getShard(_ context.Context, _ request.Request) *shard {
return bm.shard
}

func (bm *singlePartitionManager) forEachShard(callback func(*shard)) {
callback(bm.shard)
}

type multiPartitionManager struct {
// TODO: Currently, each partition can has only one shard. We need to support multiple shards per partition.
partitionMap map[string]*shard
partitioner Partitioner[request.Request]
mu sync.RWMutex
}

func newMultiPartitionManager(partitioner Partitioner[request.Request]) *multiPartitionManager {
return &multiPartitionManager{
partitionMap: make(map[string]*shard),
partitioner: partitioner,
}
}

// getShard() retursn the shard for the given request. It creates a new shard if the partition is not found.
func (bm *multiPartitionManager) getShard(ctx context.Context, req request.Request) *shard {
key := bm.partitioner.GetKey(ctx, req)

bm.mu.RLock()
if shard, ok := bm.partitionMap[key]; ok {
return shard
}
bm.mu.RUnlock()

bm.mu.Lock()
defer bm.mu.Unlock()
shard := newShard()
bm.partitionMap[key] = shard
return shard
}

func (bm *multiPartitionManager) forEachShard(callback func(*shard)) {
bm.mu.RLock()
defer bm.mu.RUnlock()
for _, shard := range bm.partitionMap {
callback(shard)
}
}
138 changes: 67 additions & 71 deletions exporter/exporterhelper/internal/queuebatch/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,26 @@ import (
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
)

type batch struct {
ctx context.Context
req request.Request
done multiDone
}

type batcherSettings[T any] struct {
sizerType request.SizerType
sizer request.Sizer[T]
next sender.SendFunc[T]
maxWorkers int
sizerType request.SizerType
sizer request.Sizer[T]
partitioner Partitioner[T]
next sender.SendFunc[T]
maxWorkers int
}

// defaultBatcher continuously batch incoming requests and flushes asynchronously if minimum size limit is met or on timeout.
type defaultBatcher struct {
cfg BatchConfig
workerPool chan struct{}
sizerType request.SizerType
sizer request.Sizer[request.Request]
consumeFunc sender.SendFunc[request.Request]
stopWG sync.WaitGroup
currentBatchMu sync.Mutex
currentBatch *batch
timer *time.Timer
shutdownCh chan struct{}
cfg BatchConfig
workerPool chan struct{}
sizerType request.SizerType
sizer request.Sizer[request.Request]
consumeFunc sender.SendFunc[request.Request]
stopWG sync.WaitGroup
ticker *time.Ticker
shutdownCh chan struct{}

partitionManager partitionManager
}

func newDefaultBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request]) *defaultBatcher {
Expand All @@ -52,30 +47,26 @@ func newDefaultBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request])
}
}
return &defaultBatcher{
cfg: bCfg,
workerPool: workerPool,
sizerType: bSet.sizerType,
sizer: bSet.sizer,
consumeFunc: bSet.next,
stopWG: sync.WaitGroup{},
shutdownCh: make(chan struct{}, 1),
}
}

func (qb *defaultBatcher) resetTimer() {
if qb.cfg.FlushTimeout > 0 {
qb.timer.Reset(qb.cfg.FlushTimeout)
cfg: bCfg,
workerPool: workerPool,
sizerType: bSet.sizerType,
sizer: bSet.sizer,
consumeFunc: bSet.next,
stopWG: sync.WaitGroup{},
shutdownCh: make(chan struct{}, 1),
partitionManager: newPartitionManager(bSet.partitioner),
}
}

func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done Done) {
qb.currentBatchMu.Lock()
shard := qb.partitionManager.getShard(ctx, req)
shard.Lock()

if qb.currentBatch == nil {
if shard.batch == nil {
reqList, mergeSplitErr := req.MergeSplit(ctx, int(qb.cfg.MaxSize), qb.sizerType, nil)
if mergeSplitErr != nil || len(reqList) == 0 {
done.OnDone(mergeSplitErr)
qb.currentBatchMu.Unlock()
shard.Unlock()
return
}

Expand All @@ -90,27 +81,27 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
if qb.sizer.Sizeof(lastReq) < qb.cfg.MinSize {
// Do not flush the last item and add it to the current batch.
reqList = reqList[:len(reqList)-1]
qb.currentBatch = &batch{
ctx: ctx,
req: lastReq,
done: multiDone{done},
shard.batch = &batch{
ctx: ctx,
req: lastReq,
done: multiDone{done},
created: time.Now(),
}
qb.resetTimer()
}

qb.currentBatchMu.Unlock()
shard.Unlock()
for i := 0; i < len(reqList); i++ {
qb.flush(ctx, reqList[i], done)
}

return
}

reqList, mergeSplitErr := qb.currentBatch.req.MergeSplit(ctx, int(qb.cfg.MaxSize), qb.sizerType, req)
reqList, mergeSplitErr := shard.batch.req.MergeSplit(ctx, int(qb.cfg.MaxSize), qb.sizerType, req)
// If failed to merge signal all Done callbacks from current batch as well as the current request and reset the current batch.
if mergeSplitErr != nil || len(reqList) == 0 {
done.OnDone(mergeSplitErr)
qb.currentBatchMu.Unlock()
shard.Unlock()
return
}

Expand All @@ -126,15 +117,15 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done

// Logic on how to deal with the current batch:
// TODO: Deal with merging Context.
qb.currentBatch.req = reqList[0]
qb.currentBatch.done = append(qb.currentBatch.done, done)
shard.req = reqList[0]
shard.done = append(shard.done, done)
// Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and
// cannot unlock and re-lock because we are not done processing all the responses.
var firstBatch *batch
// Need to check the currentBatch if more than 1 result returned or if 1 result return but larger than MinSize.
if len(reqList) > 1 || qb.sizer.Sizeof(qb.currentBatch.req) >= qb.cfg.MinSize {
firstBatch = qb.currentBatch
qb.currentBatch = nil
if len(reqList) > 1 || qb.sizer.Sizeof(shard.batch.req) >= qb.cfg.MinSize {
firstBatch = shard.batch
shard.batch = nil
}
// At this moment we dealt with the first result which is iter in the currentBatch or in the `firstBatch` we will flush.
reqList = reqList[1:]
Expand All @@ -145,16 +136,16 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
if qb.sizer.Sizeof(lastReq) < qb.cfg.MinSize {
// Do not flush the last item and add it to the current batch.
reqList = reqList[:len(reqList)-1]
qb.currentBatch = &batch{
ctx: ctx,
req: lastReq,
done: multiDone{done},
shard.batch = &batch{
ctx: ctx,
req: lastReq,
done: multiDone{done},
created: time.Now(),
}
qb.resetTimer()
}
}

qb.currentBatchMu.Unlock()
shard.Unlock()
if firstBatch != nil {
qb.flush(firstBatch.ctx, firstBatch.req, firstBatch.done)
}
Expand All @@ -172,8 +163,8 @@ func (qb *defaultBatcher) startTimeBasedFlushingGoroutine() {
select {
case <-qb.shutdownCh:
return
case <-qb.timer.C:
qb.flushCurrentBatchIfNecessary()
case <-qb.ticker.C:
qb.flushCurrentBatchIfNecessary(false)
}
}
}()
Expand All @@ -182,27 +173,32 @@ func (qb *defaultBatcher) startTimeBasedFlushingGoroutine() {
// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *defaultBatcher) Start(_ context.Context, _ component.Host) error {
if qb.cfg.FlushTimeout > 0 {
qb.timer = time.NewTimer(qb.cfg.FlushTimeout)
qb.ticker = time.NewTicker(qb.cfg.FlushTimeout)
qb.startTimeBasedFlushingGoroutine()
}

return nil
}

// flushCurrentBatchIfNecessary sends out the current request batch if it is not nil
func (qb *defaultBatcher) flushCurrentBatchIfNecessary() {
qb.currentBatchMu.Lock()
if qb.currentBatch == nil {
qb.currentBatchMu.Unlock()
return
}
batchToFlush := qb.currentBatch
qb.currentBatch = nil
qb.currentBatchMu.Unlock()
func (qb *defaultBatcher) flushCurrentBatchIfNecessary(forceFlush bool) {
qb.partitionManager.forEachShard(func(shard *shard) {
shard.Lock()
if shard.batch == nil {
shard.Unlock()
return
}
if !forceFlush && time.Since(shard.created) < qb.cfg.FlushTimeout {
shard.Unlock()
return
}
batchToFlush := shard.batch
shard.batch = nil
shard.Unlock()

// flush() blocks until successfully started a goroutine for flushing.
qb.flush(batchToFlush.ctx, batchToFlush.req, batchToFlush.done)
qb.resetTimer()
// flush() blocks until successfully started a goroutine for flushing.
qb.flush(batchToFlush.ctx, batchToFlush.req, batchToFlush.done)
})
}

// flush starts a goroutine that calls consumeFunc. It blocks until a worker is available if necessary.
Expand All @@ -224,7 +220,7 @@ func (qb *defaultBatcher) flush(ctx context.Context, req request.Request, done D
func (qb *defaultBatcher) Shutdown(_ context.Context) error {
close(qb.shutdownCh)
// Make sure execute one last flush if necessary.
qb.flushCurrentBatchIfNecessary()
qb.flushCurrentBatchIfNecessary(true)
qb.stopWG.Wait()
return nil
}
Expand Down
Loading
Loading