Skip to content

Commit 918e0fb

Browse files
committed
[chore] Change QueueBatch implementation to use the final config
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 761cca5 commit 918e0fb

18 files changed

+507
-472
lines changed

exporter/exporterhelper/internal/queue_sender.go

+20-1
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,24 @@ func NewQueueSender(
4040
return nil
4141
}
4242

43-
return queuebatch.NewQueueBatch(qSet, qCfg, bCfg, exportFunc)
43+
return queuebatch.NewQueueBatch(qSet, newQueueBatchConfig(qCfg, bCfg), exportFunc)
44+
}
45+
46+
func newQueueBatchConfig(qCfg exporterqueue.Config, bCfg exporterbatcher.Config) queuebatch.Config {
47+
qbCfg := queuebatch.Config{
48+
Enabled: true,
49+
WaitForResult: !qCfg.Enabled,
50+
Sizer: exporterbatcher.SizerTypeRequests,
51+
QueueSize: qCfg.QueueSize,
52+
NumConsumers: qCfg.NumConsumers,
53+
BlockOnOverflow: qCfg.Blocking,
54+
}
55+
if bCfg.Enabled {
56+
qbCfg.Batch = &queuebatch.BatchConfig{
57+
FlushTimeout: bCfg.FlushTimeout,
58+
MinSize: bCfg.MinSize,
59+
MaxSize: bCfg.MaxSize,
60+
}
61+
}
62+
return qbCfg
4463
}

exporter/exporterhelper/internal/queuebatch/config.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,27 @@ package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhel
55

66
import (
77
"errors"
8+
"runtime"
89
"time"
910

1011
"go.opentelemetry.io/collector/component"
1112
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1213
)
1314

15+
func NewDefaultConfig() Config {
16+
return Config{
17+
Enabled: true,
18+
Sizer: exporterbatcher.SizerTypeItems,
19+
NumConsumers: runtime.NumCPU(),
20+
QueueSize: 100_000,
21+
BlockOnOverflow: true,
22+
Batch: &BatchConfig{
23+
FlushTimeout: 200 * time.Millisecond,
24+
MinSize: 2048,
25+
},
26+
}
27+
}
28+
1429
// Config defines configuration for queueing and batching incoming requests.
1530
type Config struct {
1631
// Enabled indicates whether to not enqueue and batch before exporting.
@@ -44,7 +59,7 @@ type Config struct {
4459

4560
// BatchConfig it configures how the requests are consumed from the queue and batch together during consumption.
4661
// TODO: This will be changed to Optional when available.
47-
BatchConfig *BatchConfig `mapstructure:"batch"`
62+
Batch *BatchConfig `mapstructure:"batch"`
4863
}
4964

5065
// Validate checks if the Config is valid

exporter/exporterhelper/internal/queuebatch/default_batcher.go

+6-9
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ type batch struct {
2424

2525
// defaultBatcher continuously batch incoming requests and flushes asynchronously if minimum size limit is met or on timeout.
2626
type defaultBatcher struct {
27-
batchCfg exporterbatcher.Config
27+
batchCfg BatchConfig
2828
workerPool chan struct{}
2929
consumeFunc sender.SendFunc[request.Request]
3030
stopWG sync.WaitGroup
@@ -34,10 +34,7 @@ type defaultBatcher struct {
3434
shutdownCh chan struct{}
3535
}
3636

37-
func newDefaultBatcher(batchCfg exporterbatcher.Config,
38-
consumeFunc sender.SendFunc[request.Request],
39-
maxWorkers int,
40-
) *defaultBatcher {
37+
func newDefaultBatcher(batchCfg BatchConfig, consumeFunc sender.SendFunc[request.Request], maxWorkers int) *defaultBatcher {
4138
// TODO: Determine what is the right behavior for this in combination with async queue.
4239
var workerPool chan struct{}
4340
if maxWorkers != 0 {
@@ -56,7 +53,7 @@ func newDefaultBatcher(batchCfg exporterbatcher.Config,
5653
}
5754

5855
func (qb *defaultBatcher) resetTimer() {
59-
if qb.batchCfg.FlushTimeout != 0 {
56+
if qb.batchCfg.FlushTimeout > 0 {
6057
qb.timer.Reset(qb.batchCfg.FlushTimeout)
6158
}
6259
}
@@ -65,7 +62,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
6562
qb.currentBatchMu.Lock()
6663

6764
if qb.currentBatch == nil {
68-
reqList, mergeSplitErr := req.MergeSplit(ctx, qb.batchCfg.SizeConfig, nil)
65+
reqList, mergeSplitErr := req.MergeSplit(ctx, qb.batchCfg.MaxSize, exporterbatcher.SizerTypeItems, nil)
6966
if mergeSplitErr != nil || len(reqList) == 0 {
7067
done.OnDone(mergeSplitErr)
7168
qb.currentBatchMu.Unlock()
@@ -99,7 +96,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
9996
return
10097
}
10198

102-
reqList, mergeSplitErr := qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.SizeConfig, req)
99+
reqList, mergeSplitErr := qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSize, exporterbatcher.SizerTypeItems, req)
103100
// If failed to merge signal all Done callbacks from current batch as well as the current request and reset the current batch.
104101
if mergeSplitErr != nil || len(reqList) == 0 {
105102
done.OnDone(mergeSplitErr)
@@ -174,7 +171,7 @@ func (qb *defaultBatcher) startTimeBasedFlushingGoroutine() {
174171

175172
// Start starts the goroutine that reads from the queue and flushes asynchronously.
176173
func (qb *defaultBatcher) Start(_ context.Context, _ component.Host) error {
177-
if qb.batchCfg.FlushTimeout != 0 {
174+
if qb.batchCfg.FlushTimeout > 0 {
178175
qb.timer = time.NewTimer(qb.batchCfg.FlushTimeout)
179176
qb.startTimeBasedFlushingGoroutine()
180177
}

exporter/exporterhelper/internal/queuebatch/default_batcher_test.go

+24-34
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/stretchr/testify/require"
1616

1717
"go.opentelemetry.io/collector/component/componenttest"
18-
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1918
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
2019
)
2120

@@ -35,12 +34,9 @@ func TestDefaultBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) {
3534
}
3635
for _, tt := range tests {
3736
t.Run(tt.name, func(t *testing.T) {
38-
cfg := exporterbatcher.NewDefaultConfig()
39-
cfg.Enabled = true
40-
cfg.FlushTimeout = 0
41-
cfg.SizeConfig = exporterbatcher.SizeConfig{
42-
Sizer: exporterbatcher.SizerTypeItems,
43-
MinSize: 0,
37+
cfg := BatchConfig{
38+
FlushTimeout: 0,
39+
MinSize: 0,
4440
}
4541

4642
sink := requesttest.NewSink()
@@ -85,12 +81,9 @@ func TestDefaultBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
8581
}
8682
for _, tt := range tests {
8783
t.Run(tt.name, func(t *testing.T) {
88-
cfg := exporterbatcher.NewDefaultConfig()
89-
cfg.Enabled = true
90-
cfg.FlushTimeout = 0
91-
cfg.SizeConfig = exporterbatcher.SizeConfig{
92-
Sizer: exporterbatcher.SizerTypeItems,
93-
MinSize: 10,
84+
cfg := BatchConfig{
85+
FlushTimeout: 0,
86+
MinSize: 10,
9487
}
9588

9689
sink := requesttest.NewSink()
@@ -150,12 +143,9 @@ func TestDefaultBatcher_NoSplit_WithTimeout(t *testing.T) {
150143
}
151144
for _, tt := range tests {
152145
t.Run(tt.name, func(t *testing.T) {
153-
cfg := exporterbatcher.NewDefaultConfig()
154-
cfg.Enabled = true
155-
cfg.FlushTimeout = 50 * time.Millisecond
156-
cfg.SizeConfig = exporterbatcher.SizeConfig{
157-
Sizer: exporterbatcher.SizerTypeItems,
158-
MinSize: 100,
146+
cfg := BatchConfig{
147+
FlushTimeout: 50 * time.Millisecond,
148+
MinSize: 100,
159149
}
160150

161151
sink := requesttest.NewSink()
@@ -205,13 +195,10 @@ func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) {
205195
}
206196
for _, tt := range tests {
207197
t.Run(tt.name, func(t *testing.T) {
208-
cfg := exporterbatcher.NewDefaultConfig()
209-
cfg.Enabled = true
210-
cfg.FlushTimeout = 0
211-
cfg.SizeConfig = exporterbatcher.SizeConfig{
212-
Sizer: exporterbatcher.SizerTypeItems,
213-
MinSize: 100,
214-
MaxSize: 100,
198+
cfg := BatchConfig{
199+
FlushTimeout: 0,
200+
MinSize: 100,
201+
MaxSize: 100,
215202
}
216203

217204
sink := requesttest.NewSink()
@@ -256,12 +243,13 @@ func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) {
256243
}
257244

258245
func TestDefaultBatcher_Shutdown(t *testing.T) {
259-
batchCfg := exporterbatcher.NewDefaultConfig()
260-
batchCfg.MinSize = 10
261-
batchCfg.FlushTimeout = 100 * time.Second
246+
cfg := BatchConfig{
247+
FlushTimeout: 100 * time.Second,
248+
MinSize: 10,
249+
}
262250

263251
sink := requesttest.NewSink()
264-
ba := newDefaultBatcher(batchCfg, sink.Export, 2)
252+
ba := newDefaultBatcher(cfg, sink.Export, 2)
265253
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
266254

267255
done := newFakeDone()
@@ -282,12 +270,14 @@ func TestDefaultBatcher_Shutdown(t *testing.T) {
282270
}
283271

284272
func TestDefaultBatcher_MergeError(t *testing.T) {
285-
batchCfg := exporterbatcher.NewDefaultConfig()
286-
batchCfg.MinSize = 5
287-
batchCfg.MaxSize = 7
273+
cfg := BatchConfig{
274+
FlushTimeout: 200 * time.Second,
275+
MinSize: 5,
276+
MaxSize: 7,
277+
}
288278

289279
sink := requesttest.NewSink()
290-
ba := newDefaultBatcher(batchCfg, sink.Export, 2)
280+
ba := newDefaultBatcher(cfg, sink.Export, 2)
291281

292282
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
293283
t.Cleanup(func() {

exporter/exporterhelper/internal/queuebatch/persistent_queue.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515

1616
"go.opentelemetry.io/collector/component"
1717
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
18-
"go.opentelemetry.io/collector/exporter/exporterqueue"
1918
"go.opentelemetry.io/collector/extension/xextension/storage"
2019
"go.opentelemetry.io/collector/pipeline"
2120
)
@@ -50,7 +49,7 @@ type persistentQueueSettings[T any] struct {
5049
blocking bool
5150
signal pipeline.Signal
5251
storageID component.ID
53-
encoding exporterqueue.Encoding[T]
52+
encoding Encoding[T]
5453
id component.ID
5554
telemetry component.TelemetrySettings
5655
}

exporter/exporterhelper/internal/queuebatch/queue.go

+8
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,14 @@ import (
1010
"go.opentelemetry.io/collector/component"
1111
)
1212

13+
type Encoding[T any] interface {
14+
// Marshal is a function that can marshal a request into bytes.
15+
Marshal(T) ([]byte, error)
16+
17+
// Unmarshal is a function that can unmarshal bytes into a request.
18+
Unmarshal([]byte) (T, error)
19+
}
20+
1321
// ErrQueueIsFull is the error returned when an item is offered to the Queue and the queue is full and setup to
1422
// not block.
1523
// Experimental: This API is at the early stage of development and may change without backward compatibility

exporter/exporterhelper/internal/queuebatch/queue_batch.go

+21-23
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1212
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1313
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
14-
"go.opentelemetry.io/collector/exporter/exporterqueue"
1514
"go.opentelemetry.io/collector/pipeline"
1615
)
1716

@@ -20,7 +19,7 @@ type Settings[K any] struct {
2019
Signal pipeline.Signal
2120
ID component.ID
2221
Telemetry component.TelemetrySettings
23-
Encoding exporterqueue.Encoding[K]
22+
Encoding Encoding[K]
2423
Sizers map[exporterbatcher.SizerType]Sizer[K]
2524
}
2625

@@ -31,20 +30,17 @@ type QueueBatch struct {
3130

3231
func NewQueueBatch(
3332
qSet Settings[request.Request],
34-
qCfg exporterqueue.Config,
35-
bCfg exporterbatcher.Config,
33+
cfg Config,
3634
next sender.SendFunc[request.Request],
3735
) (*QueueBatch, error) {
3836
var b Batcher[request.Request]
39-
switch bCfg.Enabled {
40-
case false:
37+
switch {
38+
case cfg.Batch == nil:
4139
b = newDisabledBatcher[request.Request](next)
4240
default:
43-
b = newDefaultBatcher(bCfg, next, qCfg.NumConsumers)
44-
}
45-
// TODO: https://github.com/open-telemetry/opentelemetry-collector/issues/12244
46-
if bCfg.Enabled {
47-
qCfg.NumConsumers = 1
41+
// TODO: https://github.com/open-telemetry/opentelemetry-collector/issues/12244
42+
cfg.NumConsumers = 1
43+
b = newDefaultBatcher(*cfg.Batch, next, cfg.NumConsumers)
4844
}
4945

5046
sizer, ok := qSet.Sizers[exporterbatcher.SizerTypeRequests]
@@ -54,25 +50,25 @@ func NewQueueBatch(
5450

5551
var q Queue[request.Request]
5652
switch {
57-
case !qCfg.Enabled:
53+
case cfg.WaitForResult:
5854
q = newDisabledQueue(b.Consume)
59-
case qCfg.StorageID != nil:
55+
case cfg.StorageID != nil:
6056
q = newAsyncQueue(newPersistentQueue[request.Request](persistentQueueSettings[request.Request]{
6157
sizer: sizer,
62-
capacity: int64(qCfg.QueueSize),
63-
blocking: qCfg.Blocking,
58+
capacity: int64(cfg.QueueSize),
59+
blocking: cfg.BlockOnOverflow,
6460
signal: qSet.Signal,
65-
storageID: *qCfg.StorageID,
61+
storageID: *cfg.StorageID,
6662
encoding: qSet.Encoding,
6763
id: qSet.ID,
6864
telemetry: qSet.Telemetry,
69-
}), qCfg.NumConsumers, b.Consume)
65+
}), cfg.NumConsumers, b.Consume)
7066
default:
7167
q = newAsyncQueue(newMemoryQueue[request.Request](memoryQueueSettings[request.Request]{
7268
sizer: sizer,
73-
capacity: int64(qCfg.QueueSize),
74-
blocking: qCfg.Blocking,
75-
}), qCfg.NumConsumers, b.Consume)
69+
capacity: int64(cfg.QueueSize),
70+
blocking: cfg.BlockOnOverflow,
71+
}), cfg.NumConsumers, b.Consume)
7672
}
7773

7874
oq, err := newObsQueue(qSet, q)
@@ -85,11 +81,13 @@ func NewQueueBatch(
8581

8682
// Start is invoked during service startup.
8783
func (qs *QueueBatch) Start(ctx context.Context, host component.Host) error {
88-
if err := qs.queue.Start(ctx, host); err != nil {
84+
if err := qs.batcher.Start(ctx, host); err != nil {
8985
return err
9086
}
91-
92-
return qs.batcher.Start(ctx, host)
87+
if err := qs.queue.Start(ctx, host); err != nil {
88+
return errors.Join(err, qs.batcher.Shutdown(ctx))
89+
}
90+
return nil
9391
}
9492

9593
// Shutdown is invoked during service shutdown.

0 commit comments

Comments
 (0)