Skip to content

Commit e5bbc40

Browse files
authoredMar 22, 2025··
[chore] Move queue-batch from internal to internal/queuebatch (#12698)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description <!-- Issue number if applicable --> #### Link to tracking issue Fixes # <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.--> Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
1 parent 8b79707 commit e5bbc40

File tree

7 files changed

+731
-716
lines changed

7 files changed

+731
-716
lines changed
 

‎exporter/exporterhelper/internal/base_exporter_test.go

+18-18
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"go.opentelemetry.io/collector/component"
1717
"go.opentelemetry.io/collector/component/componenttest"
1818
"go.opentelemetry.io/collector/config/configretry"
19-
"go.opentelemetry.io/collector/exporter"
2019
"go.opentelemetry.io/collector/exporter/exporterbatcher"
2120
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
2221
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
@@ -26,19 +25,8 @@ import (
2625
"go.opentelemetry.io/collector/pipeline"
2726
)
2827

29-
var (
30-
defaultType = component.MustNewType("test")
31-
defaultSignal = pipeline.SignalMetrics
32-
defaultID = component.NewID(defaultType)
33-
defaultSettings = func() exporter.Settings {
34-
set := exportertest.NewNopSettings(exportertest.NopType)
35-
set.ID = defaultID
36-
return set
37-
}()
38-
)
39-
4028
func TestBaseExporter(t *testing.T) {
41-
be, err := NewBaseExporter(defaultSettings, defaultSignal, noopExport)
29+
be, err := NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport)
4230
require.NoError(t, err)
4331
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
4432
require.NoError(t, be.Shutdown(context.Background()))
@@ -47,7 +35,7 @@ func TestBaseExporter(t *testing.T) {
4735
func TestBaseExporterWithOptions(t *testing.T) {
4836
want := errors.New("my error")
4937
be, err := NewBaseExporter(
50-
defaultSettings, defaultSignal, noopExport,
38+
exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
5139
WithStart(func(context.Context, component.Host) error { return want }),
5240
WithShutdown(func(context.Context) error { return want }),
5341
WithTimeout(NewDefaultTimeoutConfig()),
@@ -58,18 +46,18 @@ func TestBaseExporterWithOptions(t *testing.T) {
5846
}
5947

6048
func TestQueueOptionsWithRequestExporter(t *testing.T) {
61-
bs, err := NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal, noopExport,
49+
bs, err := NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
6250
WithRetry(configretry.NewDefaultBackOffConfig()))
6351
require.NoError(t, err)
6452
require.Nil(t, bs.queueBatchSettings.Encoding)
65-
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal, noopExport,
53+
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
6654
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(exporterqueue.NewDefaultConfig()))
6755
require.Error(t, err)
6856

6957
qCfg := exporterqueue.NewDefaultConfig()
7058
storageID := component.NewID(component.MustNewType("test"))
7159
qCfg.StorageID = &storageID
72-
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal, noopExport,
60+
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
7361
WithQueueBatchSettings(newFakeQueueBatch(&requesttest.FakeRequest{Items: 1})),
7462
WithRetry(configretry.NewDefaultBackOffConfig()),
7563
WithQueueBatch(qCfg, queuebatch.Settings[request.Request]{}))
@@ -84,7 +72,7 @@ func TestBaseExporterLogging(t *testing.T) {
8472
rCfg.Enabled = false
8573
qCfg := exporterqueue.NewDefaultConfig()
8674
qCfg.Enabled = false
87-
bs, err := NewBaseExporter(set, defaultSignal, errExport,
75+
bs, err := NewBaseExporter(set, pipeline.SignalMetrics, errExport,
8876
WithQueueBatch(qCfg, newFakeQueueBatch(&requesttest.FakeRequest{})),
8977
WithBatcher(exporterbatcher.NewDefaultConfig()),
9078
WithRetry(rCfg))
@@ -167,3 +155,15 @@ func noopExport(context.Context, request.Request) error {
167155
func newFakeQueueBatch(mr request.Request) queuebatch.Settings[request.Request] {
168156
return queuebatch.Settings[request.Request]{Encoding: &fakeEncoding{mr: mr}}
169157
}
158+
159+
type fakeEncoding struct {
160+
mr request.Request
161+
}
162+
163+
func (f fakeEncoding) Marshal(request.Request) ([]byte, error) {
164+
return []byte("mockRequest"), nil
165+
}
166+
167+
func (f fakeEncoding) Unmarshal([]byte) (request.Request, error) {
168+
return f.mr, nil
169+
}

‎exporter/exporterhelper/internal/queue_sender.go

+1-54
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,9 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe
55

66
import (
77
"context"
8-
"errors"
98

109
"go.uber.org/zap"
1110

12-
"go.opentelemetry.io/collector/component"
1311
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1412
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1513
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
@@ -35,57 +33,6 @@ func NewQueueSender(
3533
}
3634
return nil
3735
}
38-
return NewQueueBatch(qSet, qCfg, bCfg, exportFunc)
39-
}
40-
41-
type QueueBatch struct {
42-
queue queuebatch.Queue[request.Request]
43-
batcher queuebatch.Batcher[request.Request]
44-
}
45-
46-
func NewQueueBatch(
47-
qSet queuebatch.QueueSettings[request.Request],
48-
qCfg exporterqueue.Config,
49-
bCfg exporterbatcher.Config,
50-
next sender.SendFunc[request.Request],
51-
) (*QueueBatch, error) {
52-
b, err := queuebatch.NewBatcher(bCfg, next, qCfg.NumConsumers)
53-
if err != nil {
54-
return nil, err
55-
}
56-
// TODO: https://github.com/open-telemetry/opentelemetry-collector/issues/12244
57-
if bCfg.Enabled {
58-
qCfg.NumConsumers = 1
59-
}
60-
q, err := queuebatch.NewQueue(context.Background(), qSet, qCfg, b.Consume)
61-
if err != nil {
62-
return nil, err
63-
}
64-
oq, err := newObsQueue(qSet, q)
65-
if err != nil {
66-
return nil, err
67-
}
68-
69-
return &QueueBatch{queue: oq, batcher: b}, nil
70-
}
71-
72-
// Start is invoked during service startup.
73-
func (qs *QueueBatch) Start(ctx context.Context, host component.Host) error {
74-
if err := qs.queue.Start(ctx, host); err != nil {
75-
return err
76-
}
77-
78-
return qs.batcher.Start(ctx, host)
79-
}
80-
81-
// Shutdown is invoked during service shutdown.
82-
func (qs *QueueBatch) Shutdown(ctx context.Context) error {
83-
// Stop the queue and batcher, this will drain the queue and will call the retry (which is stopped) that will only
84-
// try once every request.
85-
return errors.Join(qs.queue.Shutdown(ctx), qs.batcher.Shutdown(ctx))
86-
}
8736

88-
// Send implements the requestSender interface. It puts the request in the queue.
89-
func (qs *QueueBatch) Send(ctx context.Context, req request.Request) error {
90-
return qs.queue.Offer(ctx, req)
37+
return queuebatch.NewQueueBatch(qSet, qCfg, bCfg, exportFunc)
9138
}

0 commit comments

Comments
 (0)