Skip to content

Commit 6aad9d1

Browse files
authored
Move Queue to internal, don't expect other implementations (#12680)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 12dd09f commit 6aad9d1

26 files changed

+99
-65
lines changed

.chloggen/move-queue-to-internal.yaml

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterqueue
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Move Queue interface to internal, disallow alternative implementations
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12680]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

exporter/exporterhelper/internal/base_exporter.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher func(
9494
}
9595

9696
if be.queueCfg.Enabled || be.batcherCfg.Enabled {
97-
qSet := exporterqueue.Settings[request.Request]{
97+
qSet := queuebatch.QueueSettings[request.Request]{
9898
Signal: signal,
9999
ExporterSettings: set,
100100
Encoding: be.queueBatchSettings.Encoding,

exporter/exporterhelper/internal/batcher/batcher.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ import (
88

99
"go.opentelemetry.io/collector/component"
1010
"go.opentelemetry.io/collector/exporter/exporterbatcher"
11+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1112
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
12-
"go.opentelemetry.io/collector/exporter/exporterqueue"
1313
)
1414

1515
// Batcher is in charge of reading items from the queue and send them out asynchronously.
1616
type Batcher interface {
1717
component.Component
18-
Consume(context.Context, request.Request, exporterqueue.Done)
18+
Consume(context.Context, request.Request, queuebatch.Done)
1919
}
2020

2121
func NewBatcher(batchCfg exporterbatcher.Config,

exporter/exporterhelper/internal/batcher/default_batcher.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import (
1212

1313
"go.opentelemetry.io/collector/component"
1414
"go.opentelemetry.io/collector/exporter/exporterbatcher"
15+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1516
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
16-
"go.opentelemetry.io/collector/exporter/exporterqueue"
1717
)
1818

1919
type batch struct {
@@ -61,7 +61,7 @@ func (qb *defaultBatcher) resetTimer() {
6161
}
6262
}
6363

64-
func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done exporterqueue.Done) {
64+
func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done queuebatch.Done) {
6565
qb.currentBatchMu.Lock()
6666

6767
if qb.currentBatch == nil {
@@ -199,7 +199,7 @@ func (qb *defaultBatcher) flushCurrentBatchIfNecessary() {
199199
}
200200

201201
// flush starts a goroutine that calls exportFunc. It blocks until a worker is available if necessary.
202-
func (qb *defaultBatcher) flush(ctx context.Context, req request.Request, done exporterqueue.Done) {
202+
func (qb *defaultBatcher) flush(ctx context.Context, req request.Request, done queuebatch.Done) {
203203
qb.stopWG.Add(1)
204204
if qb.workerPool != nil {
205205
<-qb.workerPool
@@ -222,7 +222,7 @@ func (qb *defaultBatcher) Shutdown(_ context.Context) error {
222222
return nil
223223
}
224224

225-
type multiDone []exporterqueue.Done
225+
type multiDone []queuebatch.Done
226226

227227
func (mdc multiDone) OnDone(err error) {
228228
for _, d := range mdc {
@@ -231,13 +231,13 @@ func (mdc multiDone) OnDone(err error) {
231231
}
232232

233233
type refCountDone struct {
234-
done exporterqueue.Done
234+
done queuebatch.Done
235235
mu sync.Mutex
236236
refCount int64
237237
err error
238238
}
239239

240-
func newRefCountDone(done exporterqueue.Done, refCount int64) exporterqueue.Done {
240+
func newRefCountDone(done queuebatch.Done, refCount int64) queuebatch.Done {
241241
return &refCountDone{
242242
done: done,
243243
refCount: refCount,

exporter/exporterhelper/internal/batcher/disabled_batcher.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import (
77
"context"
88

99
"go.opentelemetry.io/collector/component"
10+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1011
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
11-
"go.opentelemetry.io/collector/exporter/exporterqueue"
1212
)
1313

1414
// disabledBatcher is a special-case of Batcher that has no size limit for sending. Any items read from the queue will
@@ -19,7 +19,7 @@ type disabledBatcher[T any] struct {
1919
exportFunc func(context.Context, T) error
2020
}
2121

22-
func (db *disabledBatcher[T]) Consume(ctx context.Context, req T, done exporterqueue.Done) {
22+
func (db *disabledBatcher[T]) Consume(ctx context.Context, req T, done queuebatch.Done) {
2323
done.OnDone(db.exportFunc(ctx, req))
2424
}
2525

exporter/exporterhelper/internal/batcher/disabled_batcher_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"go.opentelemetry.io/collector/component/componenttest"
1616
"go.opentelemetry.io/collector/exporter/exporterbatcher"
17+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1718
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1819
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
1920
"go.opentelemetry.io/collector/exporter/exporterqueue"
@@ -44,9 +45,9 @@ func TestDisabledBatcher_Basic(t *testing.T) {
4445
ba, err := NewBatcher(cfg, sink.Export, tt.maxWorkers)
4546
require.NoError(t, err)
4647

47-
q := exporterqueue.NewQueue[request.Request](
48+
q := queuebatch.NewQueue[request.Request](
4849
context.Background(),
49-
exporterqueue.Settings[request.Request]{
50+
queuebatch.QueueSettings[request.Request]{
5051
Signal: pipeline.SignalTraces,
5152
ExporterSettings: exportertest.NewNopSettings(exportertest.NopType),
5253
},

exporter/exporterhelper/internal/obs_queue.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,20 @@ import (
1010
"go.opentelemetry.io/otel/metric"
1111

1212
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
13+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1314
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
14-
"go.opentelemetry.io/collector/exporter/exporterqueue"
1515
"go.opentelemetry.io/collector/pipeline"
1616
)
1717

1818
// obsQueue is a helper to add observability to a queue.
1919
type obsQueue[T request.Request] struct {
20-
exporterqueue.Queue[T]
20+
queuebatch.Queue[T]
2121
tb *metadata.TelemetryBuilder
2222
metricAttr metric.MeasurementOption
2323
enqueueFailedInst metric.Int64Counter
2424
}
2525

26-
func newObsQueue[T request.Request](set exporterqueue.Settings[T], delegate exporterqueue.Queue[T]) (exporterqueue.Queue[T], error) {
26+
func newObsQueue[T request.Request](set queuebatch.QueueSettings[T], delegate queuebatch.Queue[T]) (queuebatch.Queue[T], error) {
2727
tb, err := metadata.NewTelemetryBuilder(set.ExporterSettings.TelemetrySettings)
2828
if err != nil {
2929
return nil, err

exporter/exporterhelper/internal/obs_queue_test.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@ import (
1717
"go.opentelemetry.io/collector/component/componenttest"
1818
"go.opentelemetry.io/collector/exporter"
1919
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadatatest"
20+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
2021
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2122
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
22-
"go.opentelemetry.io/collector/exporter/exporterqueue"
2323
"go.opentelemetry.io/collector/pipeline"
2424
)
2525

2626
type fakeQueue[T any] struct {
27-
exporterqueue.Queue[T]
27+
queuebatch.Queue[T]
2828
offerErr error
2929
size int64
3030
capacity int64
@@ -42,15 +42,15 @@ func (fq *fakeQueue[T]) Offer(context.Context, T) error {
4242
return fq.offerErr
4343
}
4444

45-
func newFakeQueue[T request.Request](offerErr error, size, capacity int64) exporterqueue.Queue[T] {
45+
func newFakeQueue[T request.Request](offerErr error, size, capacity int64) queuebatch.Queue[T] {
4646
return &fakeQueue[T]{offerErr: offerErr, size: size, capacity: capacity}
4747
}
4848

4949
func TestObsQueueLogsSizeCapacity(t *testing.T) {
5050
tt := componenttest.NewTelemetry()
5151
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
5252

53-
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
53+
te, err := newObsQueue[request.Request](queuebatch.QueueSettings[request.Request]{
5454
Signal: pipeline.SignalLogs,
5555
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
5656
}, newFakeQueue[request.Request](nil, 7, 9))
@@ -80,7 +80,7 @@ func TestObsQueueLogsFailure(t *testing.T) {
8080
tt := componenttest.NewTelemetry()
8181
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
8282

83-
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
83+
te, err := newObsQueue[request.Request](queuebatch.QueueSettings[request.Request]{
8484
Signal: pipeline.SignalLogs,
8585
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
8686
}, newFakeQueue[request.Request](errors.New("my error"), 7, 9))
@@ -100,7 +100,7 @@ func TestObsQueueTracesSizeCapacity(t *testing.T) {
100100
tt := componenttest.NewTelemetry()
101101
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
102102

103-
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
103+
te, err := newObsQueue[request.Request](queuebatch.QueueSettings[request.Request]{
104104
Signal: pipeline.SignalTraces,
105105
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
106106
}, newFakeQueue[request.Request](nil, 17, 19))
@@ -130,7 +130,7 @@ func TestObsQueueTracesFailure(t *testing.T) {
130130
tt := componenttest.NewTelemetry()
131131
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
132132

133-
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
133+
te, err := newObsQueue[request.Request](queuebatch.QueueSettings[request.Request]{
134134
Signal: pipeline.SignalTraces,
135135
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
136136
}, newFakeQueue[request.Request](errors.New("my error"), 0, 0))
@@ -150,7 +150,7 @@ func TestObsQueueMetrics(t *testing.T) {
150150
tt := componenttest.NewTelemetry()
151151
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
152152

153-
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
153+
te, err := newObsQueue[request.Request](queuebatch.QueueSettings[request.Request]{
154154
Signal: pipeline.SignalMetrics,
155155
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
156156
}, newFakeQueue[request.Request](nil, 27, 29))
@@ -180,7 +180,7 @@ func TestObsQueueMetricsFailure(t *testing.T) {
180180
tt := componenttest.NewTelemetry()
181181
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
182182

183-
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
183+
te, err := newObsQueue[request.Request](queuebatch.QueueSettings[request.Request]{
184184
Signal: pipeline.SignalMetrics,
185185
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
186186
}, newFakeQueue[request.Request](errors.New("my error"), 0, 0))

exporter/exporterhelper/internal/queue_sender.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"go.opentelemetry.io/collector/component"
1313
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1414
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/batcher"
15+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1516
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1617
"go.opentelemetry.io/collector/exporter/exporterqueue"
1718
"go.opentelemetry.io/collector/featuregate"
@@ -26,12 +27,12 @@ var _ = featuregate.GlobalRegistry().MustRegister(
2627
)
2728

2829
type QueueSender struct {
29-
queue exporterqueue.Queue[request.Request]
30+
queue queuebatch.Queue[request.Request]
3031
batcher component.Component
3132
}
3233

3334
func NewQueueSender(
34-
qSet exporterqueue.Settings[request.Request],
35+
qSet queuebatch.QueueSettings[request.Request],
3536
qCfg exporterqueue.Config,
3637
bCfg exporterbatcher.Config,
3738
exportFailureMessage string,
@@ -57,7 +58,7 @@ func NewQueueSender(
5758
if bCfg.Enabled {
5859
qCfg.NumConsumers = 1
5960
}
60-
q, err := newObsQueue(qSet, exporterqueue.NewQueue(context.Background(), qSet, qCfg, b.Consume))
61+
q, err := newObsQueue(qSet, queuebatch.NewQueue(context.Background(), qSet, qCfg, b.Consume))
6162
if err != nil {
6263
return nil, err
6364
}

exporter/exporterhelper/internal/queue_sender_test.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"go.opentelemetry.io/collector/component/componenttest"
2121
"go.opentelemetry.io/collector/config/configretry"
2222
"go.opentelemetry.io/collector/exporter/exporterbatcher"
23+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
2324
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2425
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
2526
"go.opentelemetry.io/collector/exporter/exporterqueue"
@@ -42,7 +43,7 @@ func newFakeEncoding(mr request.Request) exporterqueue.Encoding[request.Request]
4243
return &fakeEncoding{mr: mr}
4344
}
4445

45-
var defaultQueueSettings = exporterqueue.Settings[request.Request]{
46+
var defaultQueueSettings = queuebatch.QueueSettings[request.Request]{
4647
Signal: defaultSignal,
4748
ExporterSettings: defaultSettings,
4849
}
@@ -126,7 +127,7 @@ func TestQueueFailedRequestDropped(t *testing.T) {
126127
}
127128

128129
func TestQueueBatcherPersistenceEnabled(t *testing.T) {
129-
qSet := exporterqueue.Settings[request.Request]{
130+
qSet := queuebatch.QueueSettings[request.Request]{
130131
Signal: defaultSignal,
131132
ExporterSettings: defaultSettings,
132133
Encoding: newFakeEncoding(&requesttest.FakeRequest{}),
@@ -150,7 +151,7 @@ func TestQueueBatcherPersistenceEnabled(t *testing.T) {
150151
func TestQueueBatcherPersistenceEnabledStorageError(t *testing.T) {
151152
storageError := errors.New("could not get storage client")
152153

153-
qSet := exporterqueue.Settings[request.Request]{
154+
qSet := queuebatch.QueueSettings[request.Request]{
154155
Signal: defaultSignal,
155156
ExporterSettings: defaultSettings,
156157
Encoding: newFakeEncoding(&requesttest.FakeRequest{}),
@@ -183,7 +184,7 @@ func TestQueueBatcherPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
183184
require.NoError(t, rs.Start(context.Background(), componenttest.NewNopHost()))
184185

185186
mockReq := &requesttest.FakeRequest{Items: 2}
186-
qSet := exporterqueue.Settings[request.Request]{
187+
qSet := queuebatch.QueueSettings[request.Request]{
187188
Signal: defaultSignal,
188189
ExporterSettings: defaultSettings,
189190
Encoding: newFakeEncoding(mockReq),
@@ -338,7 +339,7 @@ func TestQueueBatcher_BatchExportError(t *testing.T) {
338339
for _, tt := range tests {
339340
t.Run(tt.name, func(t *testing.T) {
340341
sink := requesttest.NewSink()
341-
qSet := exporterqueue.Settings[request.Request]{
342+
qSet := queuebatch.QueueSettings[request.Request]{
342343
Signal: defaultSignal,
343344
ExporterSettings: defaultSettings,
344345
}

exporter/exporterqueue/async_queue.go exporter/exporterhelper/internal/queuebatch/async_queue.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"
4+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
55

66
import (
77
"context"

exporter/exporterqueue/async_queue_test.go exporter/exporterhelper/internal/queuebatch/async_queue_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package exporterqueue
4+
package queuebatch
55

66
import (
77
"context"

exporter/exporterqueue/cond.go exporter/exporterhelper/internal/queuebatch/cond.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"
4+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
55

66
import (
77
"context"

exporter/exporterqueue/disabled_queue.go exporter/exporterhelper/internal/queuebatch/disabled_queue.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"
4+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
55

66
import (
77
"context"

exporter/exporterqueue/disabled_queue_test.go exporter/exporterhelper/internal/queuebatch/disabled_queue_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package exporterqueue
4+
package queuebatch
55

66
import (
77
"context"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//go:generate mdatagen metadata.yaml
5+
6+
// Package queuebatch provides helper functions for exporter's queueing and batching.
7+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"

exporter/exporterqueue/generated_package_test.go exporter/exporterhelper/internal/queuebatch/generated_package_test.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)