Skip to content

Commit 2ab3cb8

Browse files
committed
[chore] Allow to configure available sizers in the queuebatch Settings
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent ee0f0ae commit 2ab3cb8

18 files changed

+248
-87
lines changed

exporter/exporterhelper/internal/base_exporter.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,10 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sende
9292

9393
if be.queueCfg.Enabled || be.batcherCfg.Enabled {
9494
qSet := queuebatch.QueueSettings[request.Request]{
95-
Signal: signal,
96-
ExporterSettings: set,
97-
Encoding: be.queueBatchSettings.Encoding,
95+
Signal: signal,
96+
ID: set.ID,
97+
Telemetry: set.TelemetrySettings,
98+
Settings: be.queueBatchSettings,
9899
}
99100
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
100101
if err != nil {

exporter/exporterhelper/internal/obs_queue.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ type obsQueue[T request.Request] struct {
2424
}
2525

2626
func newObsQueue[T request.Request](set queuebatch.QueueSettings[T], delegate queuebatch.Queue[T]) (queuebatch.Queue[T], error) {
27-
tb, err := metadata.NewTelemetryBuilder(set.ExporterSettings.TelemetrySettings)
27+
tb, err := metadata.NewTelemetryBuilder(set.Telemetry)
2828
if err != nil {
2929
return nil, err
3030
}
3131

32-
exporterAttr := attribute.String(ExporterKey, set.ExporterSettings.ID.String())
32+
exporterAttr := attribute.String(ExporterKey, set.ID.String())
3333
asyncAttr := metric.WithAttributeSet(attribute.NewSet(exporterAttr, attribute.String(DataTypeKey, set.Signal.String())))
3434
err = tb.RegisterExporterQueueSizeCallback(func(_ context.Context, o metric.Int64Observer) error {
3535
o.Observe(delegate.Size(), asyncAttr)

exporter/exporterhelper/internal/obs_queue_test.go

+18-14
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ import (
1313
"go.opentelemetry.io/otel/sdk/metric/metricdata"
1414
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
1515

16-
"go.opentelemetry.io/collector/component"
1716
"go.opentelemetry.io/collector/component/componenttest"
18-
"go.opentelemetry.io/collector/exporter"
1917
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadatatest"
2018
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
2119
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
@@ -51,8 +49,9 @@ func TestObsQueueLogsSizeCapacity(t *testing.T) {
5149
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
5250

5351
te, err := newObsQueue[request.Request](queuebatch.QueueSettings[request.Request]{
54-
Signal: pipeline.SignalLogs,
55-
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
52+
Signal: pipeline.SignalLogs,
53+
ID: exporterID,
54+
Telemetry: tt.NewTelemetrySettings(),
5655
}, newFakeQueue[request.Request](nil, 7, 9))
5756
require.NoError(t, err)
5857
require.NoError(t, te.Offer(context.Background(), &requesttest.FakeRequest{Items: 2}))
@@ -81,8 +80,9 @@ func TestObsQueueLogsFailure(t *testing.T) {
8180
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
8281

8382
te, err := newObsQueue[request.Request](queuebatch.QueueSettings[request.Request]{
84-
Signal: pipeline.SignalLogs,
85-
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
83+
Signal: pipeline.SignalLogs,
84+
ID: exporterID,
85+
Telemetry: tt.NewTelemetrySettings(),
8686
}, newFakeQueue[request.Request](errors.New("my error"), 7, 9))
8787
require.NoError(t, err)
8888
require.Error(t, te.Offer(context.Background(), &requesttest.FakeRequest{Items: 2}))
@@ -101,8 +101,9 @@ func TestObsQueueTracesSizeCapacity(t *testing.T) {
101101
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
102102

103103
te, err := newObsQueue[request.Request](queuebatch.QueueSettings[request.Request]{
104-
Signal: pipeline.SignalTraces,
105-
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
104+
Signal: pipeline.SignalTraces,
105+
ID: exporterID,
106+
Telemetry: tt.NewTelemetrySettings(),
106107
}, newFakeQueue[request.Request](nil, 17, 19))
107108
require.NoError(t, err)
108109
require.NoError(t, te.Offer(context.Background(), &requesttest.FakeRequest{Items: 12}))
@@ -131,8 +132,9 @@ func TestObsQueueTracesFailure(t *testing.T) {
131132
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
132133

133134
te, err := newObsQueue[request.Request](queuebatch.QueueSettings[request.Request]{
134-
Signal: pipeline.SignalTraces,
135-
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
135+
Signal: pipeline.SignalTraces,
136+
ID: exporterID,
137+
Telemetry: tt.NewTelemetrySettings(),
136138
}, newFakeQueue[request.Request](errors.New("my error"), 0, 0))
137139
require.NoError(t, err)
138140
require.Error(t, te.Offer(context.Background(), &requesttest.FakeRequest{Items: 12}))
@@ -151,8 +153,9 @@ func TestObsQueueMetrics(t *testing.T) {
151153
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
152154

153155
te, err := newObsQueue[request.Request](queuebatch.QueueSettings[request.Request]{
154-
Signal: pipeline.SignalMetrics,
155-
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
156+
Signal: pipeline.SignalMetrics,
157+
ID: exporterID,
158+
Telemetry: tt.NewTelemetrySettings(),
156159
}, newFakeQueue[request.Request](nil, 27, 29))
157160
require.NoError(t, err)
158161
require.NoError(t, te.Offer(context.Background(), &requesttest.FakeRequest{Items: 22}))
@@ -181,8 +184,9 @@ func TestObsQueueMetricsFailure(t *testing.T) {
181184
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
182185

183186
te, err := newObsQueue[request.Request](queuebatch.QueueSettings[request.Request]{
184-
Signal: pipeline.SignalMetrics,
185-
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
187+
Signal: pipeline.SignalMetrics,
188+
ID: exporterID,
189+
Telemetry: tt.NewTelemetrySettings(),
186190
}, newFakeQueue[request.Request](errors.New("my error"), 0, 0))
187191
require.NoError(t, err)
188192
require.Error(t, te.Offer(context.Background(), &requesttest.FakeRequest{Items: 22}))

exporter/exporterhelper/internal/queue_sender.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func NewQueueSender(
2929
// be modified by the downstream components like the batcher.
3030
itemsCount := req.ItemsCount()
3131
if errSend := next.Send(ctx, req); errSend != nil {
32-
qSet.ExporterSettings.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
32+
qSet.Telemetry.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
3333
zap.Error(errSend), zap.Int("dropped_items", itemsCount))
3434
return errSend
3535
}
@@ -57,12 +57,16 @@ func NewQueueBatch(
5757
if bCfg.Enabled {
5858
qCfg.NumConsumers = 1
5959
}
60-
q, err := newObsQueue(qSet, queuebatch.NewQueue(context.Background(), qSet, qCfg, b.Consume))
60+
q, err := queuebatch.NewQueue(context.Background(), qSet, qCfg, b.Consume)
61+
if err != nil {
62+
return nil, err
63+
}
64+
oq, err := newObsQueue(qSet, q)
6165
if err != nil {
6266
return nil, err
6367
}
6468

65-
return &QueueBatch{queue: q, batcher: b}, nil
69+
return &QueueBatch{queue: oq, batcher: b}, nil
6670
}
6771

6872
// Start is invoked during service startup.

exporter/exporterhelper/internal/queue_sender_test.go

+38-17
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sendertest"
2929
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/storagetest"
3030
"go.opentelemetry.io/collector/exporter/exporterqueue"
31+
"go.opentelemetry.io/collector/exporter/exportertest"
3132
)
3233

3334
type fakeEncoding struct {
@@ -47,8 +48,14 @@ func newFakeEncoding(mr request.Request) exporterqueue.Encoding[request.Request]
4748
}
4849

4950
var defaultQueueSettings = queuebatch.QueueSettings[request.Request]{
50-
Signal: defaultSignal,
51-
ExporterSettings: defaultSettings,
51+
Signal: defaultSignal,
52+
ID: component.NewID(exportertest.NopType),
53+
Telemetry: componenttest.NewNopTelemetrySettings(),
54+
Settings: queuebatch.Settings[request.Request]{
55+
Sizers: map[exporterbatcher.SizerType]queuebatch.Sizer[request.Request]{
56+
exporterbatcher.SizerTypeRequests: queuebatch.RequestsSizer[request.Request]{},
57+
},
58+
},
5259
}
5360

5461
func TestQueueBatchStopWhileWaiting(t *testing.T) {
@@ -117,7 +124,7 @@ func TestQueueBatchHappyPath(t *testing.T) {
117124
func TestNewQueueSenderFailedRequestDropped(t *testing.T) {
118125
qSet := defaultQueueSettings
119126
logger, observed := observer.New(zap.ErrorLevel)
120-
qSet.ExporterSettings.Logger = zap.New(logger)
127+
qSet.Telemetry.Logger = zap.New(logger)
121128
be, err := NewQueueSender(
122129
qSet, exporterqueue.NewDefaultConfig(), exporterbatcher.Config{}, "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") }))
123130

@@ -131,9 +138,15 @@ func TestNewQueueSenderFailedRequestDropped(t *testing.T) {
131138

132139
func TestQueueBatchPersistenceEnabled(t *testing.T) {
133140
qSet := queuebatch.QueueSettings[request.Request]{
134-
Signal: defaultSignal,
135-
ExporterSettings: defaultSettings,
136-
Encoding: newFakeEncoding(&requesttest.FakeRequest{}),
141+
Signal: defaultSignal,
142+
ID: component.NewID(exportertest.NopType),
143+
Telemetry: componenttest.NewNopTelemetrySettings(),
144+
Settings: queuebatch.Settings[request.Request]{
145+
Encoding: newFakeEncoding(&requesttest.FakeRequest{}),
146+
Sizers: map[exporterbatcher.SizerType]queuebatch.Sizer[request.Request]{
147+
exporterbatcher.SizerTypeRequests: queuebatch.RequestsSizer[request.Request]{},
148+
},
149+
},
137150
}
138151
qCfg := exporterqueue.NewDefaultConfig()
139152
storageID := component.MustNewIDWithName("file_storage", "storage")
@@ -154,9 +167,15 @@ func TestQueueBatchPersistenceEnabledStorageError(t *testing.T) {
154167
storageError := errors.New("could not get storage client")
155168

156169
qSet := queuebatch.QueueSettings[request.Request]{
157-
Signal: defaultSignal,
158-
ExporterSettings: defaultSettings,
159-
Encoding: newFakeEncoding(&requesttest.FakeRequest{}),
170+
Signal: defaultSignal,
171+
ID: component.NewID(exportertest.NopType),
172+
Telemetry: componenttest.NewNopTelemetrySettings(),
173+
Settings: queuebatch.Settings[request.Request]{
174+
Encoding: newFakeEncoding(&requesttest.FakeRequest{}),
175+
Sizers: map[exporterbatcher.SizerType]queuebatch.Sizer[request.Request]{
176+
exporterbatcher.SizerTypeRequests: queuebatch.RequestsSizer[request.Request]{},
177+
},
178+
},
160179
}
161180
qCfg := exporterqueue.NewDefaultConfig()
162181
storageID := component.MustNewIDWithName("file_storage", "storage")
@@ -186,9 +205,15 @@ func TestQueueBatchPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
186205

187206
mockReq := &requesttest.FakeRequest{Items: 2}
188207
qSet := queuebatch.QueueSettings[request.Request]{
189-
Signal: defaultSignal,
190-
ExporterSettings: defaultSettings,
191-
Encoding: newFakeEncoding(mockReq),
208+
Signal: defaultSignal,
209+
ID: component.NewID(exportertest.NopType),
210+
Telemetry: componenttest.NewNopTelemetrySettings(),
211+
Settings: queuebatch.Settings[request.Request]{
212+
Encoding: newFakeEncoding(&requesttest.FakeRequest{}),
213+
Sizers: map[exporterbatcher.SizerType]queuebatch.Sizer[request.Request]{
214+
exporterbatcher.SizerTypeRequests: queuebatch.RequestsSizer[request.Request]{},
215+
},
216+
},
192217
}
193218
be, err := NewQueueBatch(qSet, qCfg, exporterbatcher.Config{}, rs.Send)
194219
require.NoError(t, err)
@@ -339,11 +364,7 @@ func TestQueueBatch_BatchExportError(t *testing.T) {
339364
for _, tt := range tests {
340365
t.Run(tt.name, func(t *testing.T) {
341366
sink := requesttest.NewSink()
342-
qSet := queuebatch.QueueSettings[request.Request]{
343-
Signal: defaultSignal,
344-
ExporterSettings: defaultSettings,
345-
}
346-
be, err := NewQueueBatch(qSet, exporterqueue.NewDefaultConfig(), tt.batchCfg, sink.Export)
367+
be, err := NewQueueBatch(defaultQueueSettings, exporterqueue.NewDefaultConfig(), tt.batchCfg, sink.Export)
347368
require.NoError(t, err)
348369
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
349370

exporter/exporterhelper/internal/queuebatch/disabled_batcher_test.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/stretchr/testify/assert"
1313
"github.com/stretchr/testify/require"
1414

15+
"go.opentelemetry.io/collector/component"
1516
"go.opentelemetry.io/collector/component/componenttest"
1617
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1718
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
@@ -44,14 +45,21 @@ func TestDisabledBatcher_Basic(t *testing.T) {
4445
ba, err := NewBatcher(cfg, sink.Export, tt.maxWorkers)
4546
require.NoError(t, err)
4647

47-
q := NewQueue[request.Request](
48+
q, err := NewQueue[request.Request](
4849
context.Background(),
4950
QueueSettings[request.Request]{
50-
Signal: pipeline.SignalTraces,
51-
ExporterSettings: exportertest.NewNopSettings(exportertest.NopType),
51+
Signal: pipeline.SignalTraces,
52+
ID: component.NewID(exportertest.NopType),
53+
Telemetry: componenttest.NewNopTelemetrySettings(),
54+
Settings: Settings[request.Request]{
55+
Sizers: map[exporterbatcher.SizerType]Sizer[request.Request]{
56+
exporterbatcher.SizerTypeRequests: RequestsSizer[request.Request]{},
57+
},
58+
},
5259
},
5360
exporterqueue.NewDefaultConfig(),
5461
ba.Consume)
62+
require.NoError(t, err)
5563

5664
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
5765
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))

exporter/exporterhelper/internal/queuebatch/disabled_queue.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ var donePool = sync.Pool{
1919

2020
func newDisabledQueue[T any](consumeFunc ConsumeFunc[T]) Queue[T] {
2121
return &disabledQueue[T]{
22-
sizer: &requestSizer[T]{},
22+
sizer: RequestsSizer[T]{},
2323
consumeFunc: consumeFunc,
2424
size: &atomic.Int64{},
2525
}
@@ -29,7 +29,7 @@ type disabledQueue[T any] struct {
2929
component.StartFunc
3030
component.ShutdownFunc
3131
consumeFunc ConsumeFunc[T]
32-
sizer sizer[T]
32+
sizer Sizer[T]
3333
size *atomic.Int64
3434
}
3535

exporter/exporterhelper/internal/queuebatch/memory_queue.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@ var errInvalidSize = errors.New("invalid element size")
2121

2222
// memoryQueueSettings defines internal parameters for boundedMemoryQueue creation.
2323
type memoryQueueSettings[T any] struct {
24-
sizer sizer[T]
24+
sizer Sizer[T]
2525
capacity int64
2626
blocking bool
2727
}
2828

2929
// memoryQueue is an in-memory implementation of a Queue.
3030
type memoryQueue[T any] struct {
3131
component.StartFunc
32-
sizer sizer[T]
32+
sizer Sizer[T]
3333
cap int64
3434

3535
mu sync.Mutex

exporter/exporterhelper/internal/queuebatch/persistent_queue.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"go.uber.org/zap"
1515

1616
"go.opentelemetry.io/collector/component"
17-
"go.opentelemetry.io/collector/exporter"
1817
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
1918
"go.opentelemetry.io/collector/exporter/exporterqueue"
2019
"go.opentelemetry.io/collector/extension/xextension/storage"
@@ -46,13 +45,14 @@ var indexDonePool = sync.Pool{
4645
}
4746

4847
type persistentQueueSettings[T any] struct {
49-
sizer sizer[T]
48+
sizer Sizer[T]
5049
capacity int64
5150
blocking bool
5251
signal pipeline.Signal
5352
storageID component.ID
5453
encoding exporterqueue.Encoding[T]
55-
set exporter.Settings
54+
id component.ID
55+
telemetry component.TelemetrySettings
5656
}
5757

5858
// persistentQueue provides a persistent queue implementation backed by file storage extension
@@ -99,10 +99,10 @@ type persistentQueue[T any] struct {
9999

100100
// newPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
101101
func newPersistentQueue[T any](set persistentQueueSettings[T]) readableQueue[T] {
102-
_, isRequestSized := set.sizer.(*requestSizer[T])
102+
_, isRequestSized := set.sizer.(RequestsSizer[T])
103103
pq := &persistentQueue[T]{
104104
set: set,
105-
logger: set.set.Logger,
105+
logger: set.telemetry.Logger,
106106
isRequestSized: isRequestSized,
107107
}
108108
pq.hasMoreElements = sync.NewCond(&pq.mu)
@@ -112,7 +112,7 @@ func newPersistentQueue[T any](set persistentQueueSettings[T]) readableQueue[T]
112112

113113
// Start starts the persistentQueue with the given number of consumers.
114114
func (pq *persistentQueue[T]) Start(ctx context.Context, host component.Host) error {
115-
storageClient, err := toStorageClient(ctx, pq.set.storageID, host, pq.set.set.ID, pq.set.signal)
115+
storageClient, err := toStorageClient(ctx, pq.set.storageID, host, pq.set.id, pq.set.signal)
116116
if err != nil {
117117
return err
118118
}

0 commit comments

Comments
 (0)