Skip to content

Commit e726f00

Browse files
committed
[chore] Start merging queue and batcher
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent e5bbc40 commit e726f00

13 files changed

+101
-143
lines changed

exporter/exporterhelper/internal/base_exporter.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ type BaseExporter struct {
4848
timeoutCfg TimeoutConfig
4949
retryCfg configretry.BackOffConfig
5050

51-
queueBatchSettings queuebatch.Settings[request.Request]
51+
queueBatchSettings QueueBatchSettings[request.Request]
5252
queueCfg exporterqueue.Config
5353
batcherCfg exporterbatcher.Config
5454
}
@@ -91,11 +91,12 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sende
9191
}
9292

9393
if be.queueCfg.Enabled || be.batcherCfg.Enabled {
94-
qSet := queuebatch.QueueSettings[request.Request]{
94+
qSet := queuebatch.Settings[request.Request]{
9595
Signal: signal,
9696
ID: set.ID,
9797
Telemetry: set.TelemetrySettings,
98-
Settings: be.queueBatchSettings,
98+
Encoding: be.queueBatchSettings.Encoding,
99+
Sizers: be.queueBatchSettings.Sizers,
99100
}
100101
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
101102
if err != nil {
@@ -207,7 +208,7 @@ func WithQueue(cfg exporterqueue.Config) Option {
207208
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
208209
// Experimental: This API is at the early stage of development and may change without backward compatibility
209210
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
210-
func WithQueueBatch(cfg exporterqueue.Config, set queuebatch.Settings[request.Request]) Option {
211+
func WithQueueBatch(cfg exporterqueue.Config, set QueueBatchSettings[request.Request]) Option {
211212
return func(o *BaseExporter) error {
212213
if !cfg.Enabled {
213214
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
@@ -244,9 +245,9 @@ func WithBatcher(cfg exporterbatcher.Config) Option {
244245
}
245246
}
246247

247-
// WithQueueBatchSettings is used to set the queuebatch.Settings for the new request based exporter helper.
248+
// WithQueueBatchSettings is used to set the QueueBatchSettings for the new request based exporter helper.
248249
// It must be provided as the first option when creating a new exporter helper.
249-
func WithQueueBatchSettings(set queuebatch.Settings[request.Request]) Option {
250+
func WithQueueBatchSettings(set QueueBatchSettings[request.Request]) Option {
250251
return func(o *BaseExporter) error {
251252
o.queueBatchSettings = set
252253
return nil

exporter/exporterhelper/internal/base_exporter_test.go

+15-11
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
5858
storageID := component.NewID(component.MustNewType("test"))
5959
qCfg.StorageID = &storageID
6060
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
61-
WithQueueBatchSettings(newFakeQueueBatch(&requesttest.FakeRequest{Items: 1})),
61+
WithQueueBatchSettings(newFakeQueueBatch()),
6262
WithRetry(configretry.NewDefaultBackOffConfig()),
63-
WithQueueBatch(qCfg, queuebatch.Settings[request.Request]{}))
63+
WithQueueBatch(qCfg, QueueBatchSettings[request.Request]{}))
6464
require.Error(t, err)
6565
}
6666

@@ -73,7 +73,8 @@ func TestBaseExporterLogging(t *testing.T) {
7373
qCfg := exporterqueue.NewDefaultConfig()
7474
qCfg.Enabled = false
7575
bs, err := NewBaseExporter(set, pipeline.SignalMetrics, errExport,
76-
WithQueueBatch(qCfg, newFakeQueueBatch(&requesttest.FakeRequest{})),
76+
WithQueueBatchSettings(newFakeQueueBatch()),
77+
WithQueue(qCfg),
7778
WithBatcher(exporterbatcher.NewDefaultConfig()),
7879
WithRetry(rCfg))
7980
require.NoError(t, err)
@@ -97,7 +98,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
9798
{
9899
name: "WithQueue",
99100
queueOptions: []Option{
100-
WithQueueBatchSettings(newFakeQueueBatch(&requesttest.FakeRequest{Items: 1})),
101+
WithQueueBatchSettings(newFakeQueueBatch()),
101102
func() Option {
102103
qs := exporterqueue.NewDefaultConfig()
103104
qs.Enabled = false
@@ -116,7 +117,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
116117
func() Option {
117118
qs := exporterqueue.NewDefaultConfig()
118119
qs.Enabled = false
119-
return WithQueueBatch(qs, newFakeQueueBatch(&requesttest.FakeRequest{Items: 1}))
120+
return WithQueueBatch(qs, newFakeQueueBatch())
120121
}(),
121122
func() Option {
122123
bs := exporterbatcher.NewDefaultConfig()
@@ -152,18 +153,21 @@ func noopExport(context.Context, request.Request) error {
152153
return nil
153154
}
154155

155-
func newFakeQueueBatch(mr request.Request) queuebatch.Settings[request.Request] {
156-
return queuebatch.Settings[request.Request]{Encoding: &fakeEncoding{mr: mr}}
156+
func newFakeQueueBatch() QueueBatchSettings[request.Request] {
157+
return QueueBatchSettings[request.Request]{
158+
Encoding: fakeEncoding{},
159+
Sizers: map[exporterbatcher.SizerType]queuebatch.Sizer[request.Request]{
160+
exporterbatcher.SizerTypeRequests: queuebatch.RequestsSizer[request.Request]{},
161+
},
162+
}
157163
}
158164

159-
type fakeEncoding struct {
160-
mr request.Request
161-
}
165+
type fakeEncoding struct{}
162166

163167
func (f fakeEncoding) Marshal(request.Request) ([]byte, error) {
164168
return []byte("mockRequest"), nil
165169
}
166170

167171
func (f fakeEncoding) Unmarshal([]byte) (request.Request, error) {
168-
return f.mr, nil
172+
return &requesttest.FakeRequest{}, nil
169173
}

exporter/exporterhelper/internal/queue_sender.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,14 @@ import (
1515
"go.opentelemetry.io/collector/exporter/exporterqueue"
1616
)
1717

18+
// QueueBatchSettings is a subset of the queuebatch.Settings that are needed when used within an Exporter.
19+
type QueueBatchSettings[K any] struct {
20+
Encoding exporterqueue.Encoding[K]
21+
Sizers map[exporterbatcher.SizerType]queuebatch.Sizer[K]
22+
}
23+
1824
func NewQueueSender(
19-
qSet queuebatch.QueueSettings[request.Request],
25+
qSet queuebatch.Settings[request.Request],
2026
qCfg exporterqueue.Config,
2127
bCfg exporterbatcher.Config,
2228
exportFailureMessage string,

exporter/exporterhelper/internal/queue_sender_test.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,12 @@ import (
2626
)
2727

2828
func TestNewQueueSenderFailedRequestDropped(t *testing.T) {
29-
qSet := queuebatch.QueueSettings[request.Request]{
29+
qSet := queuebatch.Settings[request.Request]{
3030
Signal: pipeline.SignalMetrics,
3131
ID: component.NewID(exportertest.NopType),
3232
Telemetry: componenttest.NewNopTelemetrySettings(),
33-
Settings: queuebatch.Settings[request.Request]{
34-
Sizers: map[exporterbatcher.SizerType]queuebatch.Sizer[request.Request]{
35-
exporterbatcher.SizerTypeRequests: queuebatch.RequestsSizer[request.Request]{},
36-
},
33+
Sizers: map[exporterbatcher.SizerType]queuebatch.Sizer[request.Request]{
34+
exporterbatcher.SizerTypeRequests: queuebatch.RequestsSizer[request.Request]{},
3735
},
3836
}
3937
logger, observed := observer.New(zap.ErrorLevel)

exporter/exporterhelper/internal/queuebatch/batcher.go

-10
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,10 @@ import (
77
"context"
88

99
"go.opentelemetry.io/collector/component"
10-
"go.opentelemetry.io/collector/exporter/exporterbatcher"
11-
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
12-
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1310
)
1411

1512
// Batcher is in charge of reading items from the queue and send them out asynchronously.
1613
type Batcher[K any] interface {
1714
component.Component
1815
Consume(context.Context, K, Done)
1916
}
20-
21-
func NewBatcher(batchCfg exporterbatcher.Config, exportFunc sender.SendFunc[request.Request], maxWorkers int) (Batcher[request.Request], error) {
22-
if !batchCfg.Enabled {
23-
return newDisabledBatcher[request.Request](exportFunc), nil
24-
}
25-
return newDefaultBatcher(batchCfg, exportFunc, maxWorkers), nil
26-
}

exporter/exporterhelper/internal/queuebatch/default_batcher_test.go

+6-12
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@ func TestDefaultBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) {
4444
}
4545

4646
sink := requesttest.NewSink()
47-
ba, err := NewBatcher(cfg, sink.Export, tt.maxWorkers)
48-
require.NoError(t, err)
47+
ba := newDefaultBatcher(cfg, sink.Export, tt.maxWorkers)
4948
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
5049
t.Cleanup(func() {
5150
require.NoError(t, ba.Shutdown(context.Background()))
@@ -95,8 +94,7 @@ func TestDefaultBatcher_NoSplit_TimeoutDisabled(t *testing.T) {
9594
}
9695

9796
sink := requesttest.NewSink()
98-
ba, err := NewBatcher(cfg, sink.Export, tt.maxWorkers)
99-
require.NoError(t, err)
97+
ba := newDefaultBatcher(cfg, sink.Export, tt.maxWorkers)
10098
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
10199

102100
done := newFakeDone()
@@ -161,8 +159,7 @@ func TestDefaultBatcher_NoSplit_WithTimeout(t *testing.T) {
161159
}
162160

163161
sink := requesttest.NewSink()
164-
ba, err := NewBatcher(cfg, sink.Export, tt.maxWorkers)
165-
require.NoError(t, err)
162+
ba := newDefaultBatcher(cfg, sink.Export, tt.maxWorkers)
166163
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
167164
t.Cleanup(func() {
168165
require.NoError(t, ba.Shutdown(context.Background()))
@@ -218,8 +215,7 @@ func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) {
218215
}
219216

220217
sink := requesttest.NewSink()
221-
ba, err := NewBatcher(cfg, sink.Export, tt.maxWorkers)
222-
require.NoError(t, err)
218+
ba := newDefaultBatcher(cfg, sink.Export, tt.maxWorkers)
223219
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
224220

225221
done := newFakeDone()
@@ -265,8 +261,7 @@ func TestDefaultBatcher_Shutdown(t *testing.T) {
265261
batchCfg.FlushTimeout = 100 * time.Second
266262

267263
sink := requesttest.NewSink()
268-
ba, err := NewBatcher(batchCfg, sink.Export, 2)
269-
require.NoError(t, err)
264+
ba := newDefaultBatcher(batchCfg, sink.Export, 2)
270265
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
271266

272267
done := newFakeDone()
@@ -292,8 +287,7 @@ func TestDefaultBatcher_MergeError(t *testing.T) {
292287
batchCfg.MaxSize = 7
293288

294289
sink := requesttest.NewSink()
295-
ba, err := NewBatcher(batchCfg, sink.Export, 2)
296-
require.NoError(t, err)
290+
ba := newDefaultBatcher(batchCfg, sink.Export, 2)
297291

298292
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
299293
t.Cleanup(func() {

exporter/exporterhelper/internal/queuebatch/disabled_batcher_test.go

+8-26
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,12 @@ import (
1212
"github.com/stretchr/testify/assert"
1313
"github.com/stretchr/testify/require"
1414

15-
"go.opentelemetry.io/collector/component"
1615
"go.opentelemetry.io/collector/component/componenttest"
17-
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1816
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1917
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
20-
"go.opentelemetry.io/collector/exporter/exporterqueue"
21-
"go.opentelemetry.io/collector/exporter/exportertest"
22-
"go.opentelemetry.io/collector/pipeline"
2318
)
2419

25-
func TestDisabledBatcher_Basic(t *testing.T) {
20+
func TestDisabledBatcher(t *testing.T) {
2621
tests := []struct {
2722
name string
2823
maxWorkers int
@@ -38,28 +33,15 @@ func TestDisabledBatcher_Basic(t *testing.T) {
3833
}
3934
for _, tt := range tests {
4035
t.Run(tt.name, func(t *testing.T) {
41-
cfg := exporterbatcher.NewDefaultConfig()
42-
cfg.Enabled = false
43-
4436
sink := requesttest.NewSink()
45-
ba, err := NewBatcher(cfg, sink.Export, tt.maxWorkers)
46-
require.NoError(t, err)
37+
ba := newDisabledBatcher(sink.Export)
4738

48-
q, err := NewQueue[request.Request](
49-
context.Background(),
50-
QueueSettings[request.Request]{
51-
Signal: pipeline.SignalTraces,
52-
ID: component.NewID(exportertest.NopType),
53-
Telemetry: componenttest.NewNopTelemetrySettings(),
54-
Settings: Settings[request.Request]{
55-
Sizers: map[exporterbatcher.SizerType]Sizer[request.Request]{
56-
exporterbatcher.SizerTypeRequests: RequestsSizer[request.Request]{},
57-
},
58-
},
59-
},
60-
exporterqueue.NewDefaultConfig(),
61-
ba.Consume)
62-
require.NoError(t, err)
39+
mq := newMemoryQueue[request.Request](memoryQueueSettings[request.Request]{
40+
sizer: RequestsSizer[request.Request]{},
41+
capacity: 1000,
42+
blocking: true,
43+
})
44+
q := newAsyncQueue(mq, tt.maxWorkers, ba.Consume)
6345

6446
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
6547
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))

exporter/exporterhelper/internal/queuebatch/obs_queue.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type obsQueue[T request.Request] struct {
3030
enqueueFailedInst metric.Int64Counter
3131
}
3232

33-
func newObsQueue[T request.Request](set QueueSettings[T], delegate Queue[T]) (Queue[T], error) {
33+
func newObsQueue[T request.Request](set Settings[T], delegate Queue[T]) (Queue[T], error) {
3434
tb, err := metadata.NewTelemetryBuilder(set.Telemetry)
3535
if err != nil {
3636
return nil, err

exporter/exporterhelper/internal/queuebatch/obs_queue_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func TestObsQueueLogsSizeCapacity(t *testing.T) {
5151
tt := componenttest.NewTelemetry()
5252
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
5353

54-
te, err := newObsQueue[request.Request](QueueSettings[request.Request]{
54+
te, err := newObsQueue[request.Request](Settings[request.Request]{
5555
Signal: pipeline.SignalLogs,
5656
ID: exporterID,
5757
Telemetry: tt.NewTelemetrySettings(),
@@ -82,7 +82,7 @@ func TestObsQueueLogsFailure(t *testing.T) {
8282
tt := componenttest.NewTelemetry()
8383
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
8484

85-
te, err := newObsQueue[request.Request](QueueSettings[request.Request]{
85+
te, err := newObsQueue[request.Request](Settings[request.Request]{
8686
Signal: pipeline.SignalLogs,
8787
ID: exporterID,
8888
Telemetry: tt.NewTelemetrySettings(),
@@ -103,7 +103,7 @@ func TestObsQueueTracesSizeCapacity(t *testing.T) {
103103
tt := componenttest.NewTelemetry()
104104
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
105105

106-
te, err := newObsQueue[request.Request](QueueSettings[request.Request]{
106+
te, err := newObsQueue[request.Request](Settings[request.Request]{
107107
Signal: pipeline.SignalTraces,
108108
ID: exporterID,
109109
Telemetry: tt.NewTelemetrySettings(),
@@ -134,7 +134,7 @@ func TestObsQueueTracesFailure(t *testing.T) {
134134
tt := componenttest.NewTelemetry()
135135
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
136136

137-
te, err := newObsQueue[request.Request](QueueSettings[request.Request]{
137+
te, err := newObsQueue[request.Request](Settings[request.Request]{
138138
Signal: pipeline.SignalTraces,
139139
ID: exporterID,
140140
Telemetry: tt.NewTelemetrySettings(),
@@ -155,7 +155,7 @@ func TestObsQueueMetrics(t *testing.T) {
155155
tt := componenttest.NewTelemetry()
156156
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
157157

158-
te, err := newObsQueue[request.Request](QueueSettings[request.Request]{
158+
te, err := newObsQueue[request.Request](Settings[request.Request]{
159159
Signal: pipeline.SignalMetrics,
160160
ID: exporterID,
161161
Telemetry: tt.NewTelemetrySettings(),
@@ -186,7 +186,7 @@ func TestObsQueueMetricsFailure(t *testing.T) {
186186
tt := componenttest.NewTelemetry()
187187
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
188188

189-
te, err := newObsQueue[request.Request](QueueSettings[request.Request]{
189+
te, err := newObsQueue[request.Request](Settings[request.Request]{
190190
Signal: pipeline.SignalMetrics,
191191
ID: exporterID,
192192
Telemetry: tt.NewTelemetrySettings(),

0 commit comments

Comments
 (0)