Skip to content

Commit 1a3b181

Browse files
committed
[chore] Start merging queue and batcher
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent e5bbc40 commit 1a3b181

13 files changed

+94
-131
lines changed

exporter/exporterhelper/internal/base_exporter.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ type BaseExporter struct {
4848
timeoutCfg TimeoutConfig
4949
retryCfg configretry.BackOffConfig
5050

51-
queueBatchSettings queuebatch.Settings[request.Request]
51+
queueBatchSettings QueueBatchSettings[request.Request]
5252
queueCfg exporterqueue.Config
5353
batcherCfg exporterbatcher.Config
5454
}
@@ -91,11 +91,12 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sende
9191
}
9292

9393
if be.queueCfg.Enabled || be.batcherCfg.Enabled {
94-
qSet := queuebatch.QueueSettings[request.Request]{
94+
qSet := queuebatch.Settings[request.Request]{
9595
Signal: signal,
9696
ID: set.ID,
9797
Telemetry: set.TelemetrySettings,
98-
Settings: be.queueBatchSettings,
98+
Encoding: be.queueBatchSettings.Encoding,
99+
Sizers: be.queueBatchSettings.Sizers,
99100
}
100101
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
101102
if err != nil {
@@ -207,7 +208,7 @@ func WithQueue(cfg exporterqueue.Config) Option {
207208
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
208209
// Experimental: This API is at the early stage of development and may change without backward compatibility
209210
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
210-
func WithQueueBatch(cfg exporterqueue.Config, set queuebatch.Settings[request.Request]) Option {
211+
func WithQueueBatch(cfg exporterqueue.Config, set QueueBatchSettings[request.Request]) Option {
211212
return func(o *BaseExporter) error {
212213
if !cfg.Enabled {
213214
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
@@ -244,9 +245,9 @@ func WithBatcher(cfg exporterbatcher.Config) Option {
244245
}
245246
}
246247

247-
// WithQueueBatchSettings is used to set the queuebatch.Settings for the new request based exporter helper.
248+
// WithQueueBatchSettings is used to set the QueueBatchSettings for the new request based exporter helper.
248249
// It must be provided as the first option when creating a new exporter helper.
249-
func WithQueueBatchSettings(set queuebatch.Settings[request.Request]) Option {
250+
func WithQueueBatchSettings(set QueueBatchSettings[request.Request]) Option {
250251
return func(o *BaseExporter) error {
251252
o.queueBatchSettings = set
252253
return nil

exporter/exporterhelper/internal/base_exporter_test.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
6060
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
6161
WithQueueBatchSettings(newFakeQueueBatch(&requesttest.FakeRequest{Items: 1})),
6262
WithRetry(configretry.NewDefaultBackOffConfig()),
63-
WithQueueBatch(qCfg, queuebatch.Settings[request.Request]{}))
63+
WithQueueBatch(qCfg, QueueBatchSettings[request.Request]{}))
6464
require.Error(t, err)
6565
}
6666

@@ -152,8 +152,13 @@ func noopExport(context.Context, request.Request) error {
152152
return nil
153153
}
154154

155-
func newFakeQueueBatch(mr request.Request) queuebatch.Settings[request.Request] {
156-
return queuebatch.Settings[request.Request]{Encoding: &fakeEncoding{mr: mr}}
155+
func newFakeQueueBatch(mr request.Request) QueueBatchSettings[request.Request] {
156+
return QueueBatchSettings[request.Request]{
157+
Encoding: &fakeEncoding{mr: mr},
158+
Sizers: map[exporterbatcher.SizerType]queuebatch.Sizer[request.Request]{
159+
exporterbatcher.SizerTypeRequests: queuebatch.RequestsSizer[request.Request]{},
160+
},
161+
}
157162
}
158163

159164
type fakeEncoding struct {

exporter/exporterhelper/internal/queue_sender.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,14 @@ import (
1515
"go.opentelemetry.io/collector/exporter/exporterqueue"
1616
)
1717

18+
// QueueBatchSettings is a subset of the queuebatch.Settings that are needed when used within an Exporter.
19+
type QueueBatchSettings[K any] struct {
20+
Encoding exporterqueue.Encoding[K]
21+
Sizers map[exporterbatcher.SizerType]queuebatch.Sizer[K]
22+
}
23+
1824
func NewQueueSender(
19-
qSet queuebatch.QueueSettings[request.Request],
25+
qSet queuebatch.Settings[request.Request],
2026
qCfg exporterqueue.Config,
2127
bCfg exporterbatcher.Config,
2228
exportFailureMessage string,

exporter/exporterhelper/internal/queue_sender_test.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,12 @@ import (
2626
)
2727

2828
func TestNewQueueSenderFailedRequestDropped(t *testing.T) {
29-
qSet := queuebatch.QueueSettings[request.Request]{
29+
qSet := queuebatch.Settings[request.Request]{
3030
Signal: pipeline.SignalMetrics,
3131
ID: component.NewID(exportertest.NopType),
3232
Telemetry: componenttest.NewNopTelemetrySettings(),
33-
Settings: queuebatch.Settings[request.Request]{
34-
Sizers: map[exporterbatcher.SizerType]queuebatch.Sizer[request.Request]{
35-
exporterbatcher.SizerTypeRequests: queuebatch.RequestsSizer[request.Request]{},
36-
},
33+
Sizers: map[exporterbatcher.SizerType]queuebatch.Sizer[request.Request]{
34+
exporterbatcher.SizerTypeRequests: queuebatch.RequestsSizer[request.Request]{},
3735
},
3836
}
3937
logger, observed := observer.New(zap.ErrorLevel)

exporter/exporterhelper/internal/queuebatch/batcher.go

-10
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,10 @@ import (
77
"context"
88

99
"go.opentelemetry.io/collector/component"
10-
"go.opentelemetry.io/collector/exporter/exporterbatcher"
11-
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
12-
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1310
)
1411

1512
// Batcher is in charge of reading items from the queue and send them out asynchronously.
1613
type Batcher[K any] interface {
1714
component.Component
1815
Consume(context.Context, K, Done)
1916
}
20-
21-
func NewBatcher(batchCfg exporterbatcher.Config, exportFunc sender.SendFunc[request.Request], maxWorkers int) (Batcher[request.Request], error) {
22-
if !batchCfg.Enabled {
23-
return newDisabledBatcher[request.Request](exportFunc), nil
24-
}
25-
return newDefaultBatcher(batchCfg, exportFunc, maxWorkers), nil
26-
}

exporter/exporterhelper/internal/queuebatch/default_batcher_test.go

+6-12
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@ func TestDefaultBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) {
4444
}
4545

4646
sink := requesttest.NewSink()
47-
ba, err := NewBatcher(cfg, sink.Export, tt.maxWorkers)
48-
require.NoError(t, err)
47+
ba := newDefaultBatcher(cfg, sink.Export, tt.maxWorkers)
4948
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
5049
t.Cleanup(func() {
5150
require.NoError(t, ba.Shutdown(context.Background()))
@@ -95,8 +94,7 @@ func TestDefaultBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
9594
}
9695

9796
sink := requesttest.NewSink()
98-
ba, err := NewBatcher(cfg, sink.Export, tt.maxWorkers)
99-
require.NoError(t, err)
97+
ba := newDefaultBatcher(cfg, sink.Export, tt.maxWorkers)
10098
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
10199

102100
done := newFakeDone()
@@ -161,8 +159,7 @@ func TestDefaultBatcher_NoSplit_WithTimeout(t *testing.T) {
161159
}
162160

163161
sink := requesttest.NewSink()
164-
ba, err := NewBatcher(cfg, sink.Export, tt.maxWorkers)
165-
require.NoError(t, err)
162+
ba := newDefaultBatcher(cfg, sink.Export, tt.maxWorkers)
166163
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
167164
t.Cleanup(func() {
168165
require.NoError(t, ba.Shutdown(context.Background()))
@@ -218,8 +215,7 @@ func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) {
218215
}
219216

220217
sink := requesttest.NewSink()
221-
ba, err := NewBatcher(cfg, sink.Export, tt.maxWorkers)
222-
require.NoError(t, err)
218+
ba := newDefaultBatcher(cfg, sink.Export, tt.maxWorkers)
223219
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
224220

225221
done := newFakeDone()
@@ -265,8 +261,7 @@ func TestDefaultBatcher_Shutdown(t *testing.T) {
265261
batchCfg.FlushTimeout = 100 * time.Second
266262

267263
sink := requesttest.NewSink()
268-
ba, err := NewBatcher(batchCfg, sink.Export, 2)
269-
require.NoError(t, err)
264+
ba := newDefaultBatcher(batchCfg, sink.Export, 2)
270265
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
271266

272267
done := newFakeDone()
@@ -292,8 +287,7 @@ func TestDefaultBatcher_MergeError(t *testing.T) {
292287
batchCfg.MaxSize = 7
293288

294289
sink := requesttest.NewSink()
295-
ba, err := NewBatcher(batchCfg, sink.Export, 2)
296-
require.NoError(t, err)
290+
ba := newDefaultBatcher(batchCfg, sink.Export, 2)
297291

298292
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
299293
t.Cleanup(func() {

exporter/exporterhelper/internal/queuebatch/disabled_batcher_test.go

+8-22
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,13 @@ import (
1212
"github.com/stretchr/testify/assert"
1313
"github.com/stretchr/testify/require"
1414

15-
"go.opentelemetry.io/collector/component"
1615
"go.opentelemetry.io/collector/component/componenttest"
1716
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1817
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1918
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
20-
"go.opentelemetry.io/collector/exporter/exporterqueue"
21-
"go.opentelemetry.io/collector/exporter/exportertest"
22-
"go.opentelemetry.io/collector/pipeline"
2319
)
2420

25-
func TestDisabledBatcher_Basic(t *testing.T) {
21+
func TestDisabledBatcher(t *testing.T) {
2622
tests := []struct {
2723
name string
2824
maxWorkers int
@@ -42,24 +38,14 @@ func TestDisabledBatcher_Basic(t *testing.T) {
4238
cfg.Enabled = false
4339

4440
sink := requesttest.NewSink()
45-
ba, err := NewBatcher(cfg, sink.Export, tt.maxWorkers)
46-
require.NoError(t, err)
41+
ba := newDisabledBatcher(sink.Export)
4742

48-
q, err := NewQueue[request.Request](
49-
context.Background(),
50-
QueueSettings[request.Request]{
51-
Signal: pipeline.SignalTraces,
52-
ID: component.NewID(exportertest.NopType),
53-
Telemetry: componenttest.NewNopTelemetrySettings(),
54-
Settings: Settings[request.Request]{
55-
Sizers: map[exporterbatcher.SizerType]Sizer[request.Request]{
56-
exporterbatcher.SizerTypeRequests: RequestsSizer[request.Request]{},
57-
},
58-
},
59-
},
60-
exporterqueue.NewDefaultConfig(),
61-
ba.Consume)
62-
require.NoError(t, err)
43+
mq := newMemoryQueue[request.Request](memoryQueueSettings[request.Request]{
44+
sizer: RequestsSizer[request.Request]{},
45+
capacity: 1000,
46+
blocking: true,
47+
})
48+
q := newAsyncQueue(mq, tt.maxWorkers, ba.Consume)
6349

6450
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
6551
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))

exporter/exporterhelper/internal/queuebatch/obs_queue.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type obsQueue[T request.Request] struct {
3030
enqueueFailedInst metric.Int64Counter
3131
}
3232

33-
func newObsQueue[T request.Request](set QueueSettings[T], delegate Queue[T]) (Queue[T], error) {
33+
func newObsQueue[T request.Request](set Settings[T], delegate Queue[T]) (Queue[T], error) {
3434
tb, err := metadata.NewTelemetryBuilder(set.Telemetry)
3535
if err != nil {
3636
return nil, err

exporter/exporterhelper/internal/queuebatch/obs_queue_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func TestObsQueueLogsSizeCapacity(t *testing.T) {
5151
tt := componenttest.NewTelemetry()
5252
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
5353

54-
te, err := newObsQueue[request.Request](QueueSettings[request.Request]{
54+
te, err := newObsQueue[request.Request](Settings[request.Request]{
5555
Signal: pipeline.SignalLogs,
5656
ID: exporterID,
5757
Telemetry: tt.NewTelemetrySettings(),
@@ -82,7 +82,7 @@ func TestObsQueueLogsFailure(t *testing.T) {
8282
tt := componenttest.NewTelemetry()
8383
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
8484

85-
te, err := newObsQueue[request.Request](QueueSettings[request.Request]{
85+
te, err := newObsQueue[request.Request](Settings[request.Request]{
8686
Signal: pipeline.SignalLogs,
8787
ID: exporterID,
8888
Telemetry: tt.NewTelemetrySettings(),
@@ -103,7 +103,7 @@ func TestObsQueueTracesSizeCapacity(t *testing.T) {
103103
tt := componenttest.NewTelemetry()
104104
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
105105

106-
te, err := newObsQueue[request.Request](QueueSettings[request.Request]{
106+
te, err := newObsQueue[request.Request](Settings[request.Request]{
107107
Signal: pipeline.SignalTraces,
108108
ID: exporterID,
109109
Telemetry: tt.NewTelemetrySettings(),
@@ -134,7 +134,7 @@ func TestObsQueueTracesFailure(t *testing.T) {
134134
tt := componenttest.NewTelemetry()
135135
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
136136

137-
te, err := newObsQueue[request.Request](QueueSettings[request.Request]{
137+
te, err := newObsQueue[request.Request](Settings[request.Request]{
138138
Signal: pipeline.SignalTraces,
139139
ID: exporterID,
140140
Telemetry: tt.NewTelemetrySettings(),
@@ -155,7 +155,7 @@ func TestObsQueueMetrics(t *testing.T) {
155155
tt := componenttest.NewTelemetry()
156156
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
157157

158-
te, err := newObsQueue[request.Request](QueueSettings[request.Request]{
158+
te, err := newObsQueue[request.Request](Settings[request.Request]{
159159
Signal: pipeline.SignalMetrics,
160160
ID: exporterID,
161161
Telemetry: tt.NewTelemetrySettings(),
@@ -186,7 +186,7 @@ func TestObsQueueMetricsFailure(t *testing.T) {
186186
tt := componenttest.NewTelemetry()
187187
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
188188

189-
te, err := newObsQueue[request.Request](QueueSettings[request.Request]{
189+
te, err := newObsQueue[request.Request](Settings[request.Request]{
190190
Signal: pipeline.SignalMetrics,
191191
ID: exporterID,
192192
Telemetry: tt.NewTelemetrySettings(),

exporter/exporterhelper/internal/queuebatch/queue.go

-47
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ import (
88
"errors"
99

1010
"go.opentelemetry.io/collector/component"
11-
"go.opentelemetry.io/collector/exporter/exporterbatcher"
12-
"go.opentelemetry.io/collector/exporter/exporterqueue"
13-
"go.opentelemetry.io/collector/pipeline"
1411
)
1512

1613
// ErrQueueIsFull is the error returned when an item is offered to the Queue and the queue is full and setup to
@@ -55,47 +52,3 @@ type readableQueue[T any] interface {
5552
// If the queue is stopped returns false, otherwise true.
5653
Read(context.Context) (context.Context, T, Done, bool)
5754
}
58-
59-
// QueueSettings defines settings for creating a queue.
60-
type QueueSettings[K any] struct {
61-
Signal pipeline.Signal
62-
ID component.ID
63-
Telemetry component.TelemetrySettings
64-
Settings[K]
65-
}
66-
67-
// NewQueue returns a queue
68-
// Experimental: This API is at the early stage of development and may change without backward compatibility
69-
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
70-
func NewQueue[T any](_ context.Context, set QueueSettings[T], cfg exporterqueue.Config, consume ConsumeFunc[T]) (Queue[T], error) {
71-
if !cfg.Enabled {
72-
return newDisabledQueue(consume), nil
73-
}
74-
75-
sizer, ok := set.Sizers[exporterbatcher.SizerTypeRequests]
76-
if !ok {
77-
return nil, errors.New("unsupported queue_batch sizer")
78-
}
79-
80-
if cfg.StorageID != nil {
81-
q := newPersistentQueue[T](persistentQueueSettings[T]{
82-
sizer: sizer,
83-
capacity: int64(cfg.QueueSize),
84-
blocking: cfg.Blocking,
85-
signal: set.Signal,
86-
storageID: *cfg.StorageID,
87-
encoding: set.Encoding,
88-
id: set.ID,
89-
telemetry: set.Telemetry,
90-
})
91-
return newAsyncQueue(q, cfg.NumConsumers, consume), nil
92-
}
93-
94-
q := newMemoryQueue[T](memoryQueueSettings[T]{
95-
sizer: sizer,
96-
capacity: int64(cfg.QueueSize),
97-
blocking: cfg.Blocking,
98-
})
99-
100-
return newAsyncQueue(q, cfg.NumConsumers, consume), nil
101-
}

0 commit comments

Comments
 (0)