Skip to content

Commit 12dd09f

Browse files
authored
Deprecate exportehelper WithRequestQueue in favor of WithQueueBatch (#12679)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 69908f4 commit 12dd09f

File tree

11 files changed

+124
-61
lines changed

11 files changed

+124
-61
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: deprecation
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Deprecate exportehelper WithRequestQueue in favor of WithQueueBatch
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12679]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

exporter/exporterhelper/common.go

-9
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"go.opentelemetry.io/collector/consumer"
1010
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1111
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
12-
"go.opentelemetry.io/collector/exporter/exporterqueue"
1312
)
1413

1514
// Option apply changes to BaseExporter.
@@ -46,14 +45,6 @@ func WithQueue(config QueueConfig) Option {
4645
return internal.WithQueue(config)
4746
}
4847

49-
// WithRequestQueue enables queueing for an exporter.
50-
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
51-
// Experimental: This API is at the early stage of development and may change without backward compatibility
52-
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
53-
func WithRequestQueue(cfg exporterqueue.Config, encoding exporterqueue.Encoding[Request]) Option {
54-
return internal.WithRequestQueue(cfg, encoding)
55-
}
56-
5748
// WithCapabilities overrides the default Capabilities() function for a Consumer.
5849
// The default is non-mutable data.
5950
// TODO: Verify if we can change the default to be mutable as we do for processors.

exporter/exporterhelper/internal/base_exporter.go

+18-17
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"go.opentelemetry.io/collector/consumer"
1616
"go.opentelemetry.io/collector/exporter"
1717
"go.opentelemetry.io/collector/exporter/exporterbatcher"
18+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1819
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1920
"go.opentelemetry.io/collector/exporter/exporterqueue" // BaseExporter contains common fields between different exporter types.
2021
"go.opentelemetry.io/collector/pipeline"
@@ -27,8 +28,6 @@ type BaseExporter struct {
2728
component.StartFunc
2829
component.ShutdownFunc
2930

30-
encoding exporterqueue.Encoding[request.Request]
31-
3231
Set exporter.Settings
3332

3433
// Message for the user to be added with an export failure message.
@@ -46,8 +45,10 @@ type BaseExporter struct {
4645

4746
timeoutCfg TimeoutConfig
4847
retryCfg configretry.BackOffConfig
49-
queueCfg exporterqueue.Config
50-
batcherCfg exporterbatcher.Config
48+
49+
queueBatchSettings queuebatch.Settings[request.Request]
50+
queueCfg exporterqueue.Config
51+
batcherCfg exporterbatcher.Config
5152
}
5253

5354
func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher func(context.Context, request.Request) error, options ...Option) (*BaseExporter, error) {
@@ -96,7 +97,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher func(
9697
qSet := exporterqueue.Settings[request.Request]{
9798
Signal: signal,
9899
ExporterSettings: set,
99-
Encoding: be.encoding,
100+
Encoding: be.queueBatchSettings.Encoding,
100101
}
101102
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
102103
if err != nil {
@@ -197,27 +198,27 @@ func WithRetry(config configretry.BackOffConfig) Option {
197198
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
198199
func WithQueue(cfg exporterqueue.Config) Option {
199200
return func(o *BaseExporter) error {
200-
if o.encoding == nil {
201-
return errors.New("WithQueue option is not available for the new request exporters, use WithRequestQueue instead")
201+
if o.queueBatchSettings.Encoding == nil {
202+
return errors.New("WithQueue option is not available for the new request exporters, use WithQueueBatch instead")
202203
}
203-
return WithRequestQueue(cfg, o.encoding)(o)
204+
return WithQueueBatch(cfg, o.queueBatchSettings)(o)
204205
}
205206
}
206207

207-
// WithRequestQueue enables queueing for an exporter.
208+
// WithQueueBatch enables queueing for an exporter.
208209
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
209210
// Experimental: This API is at the early stage of development and may change without backward compatibility
210211
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
211-
func WithRequestQueue(cfg exporterqueue.Config, encoding exporterqueue.Encoding[request.Request]) Option {
212+
func WithQueueBatch(cfg exporterqueue.Config, set queuebatch.Settings[request.Request]) Option {
212213
return func(o *BaseExporter) error {
213-
if cfg.Enabled && cfg.StorageID != nil && encoding == nil {
214-
return errors.New("`encoding` must not be nil when persistent queue is enabled")
215-
}
216-
o.encoding = encoding
217214
if !cfg.Enabled {
218215
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
219216
return nil
220217
}
218+
if cfg.StorageID != nil && set.Encoding == nil {
219+
return errors.New("`QueueBatchSettings.Encoding` must not be nil when persistent queue is enabled")
220+
}
221+
o.queueBatchSettings = set
221222
o.queueCfg = cfg
222223
return nil
223224
}
@@ -245,11 +246,11 @@ func WithBatcher(cfg exporterbatcher.Config) Option {
245246
}
246247
}
247248

248-
// WithEncoding is used to set the request encoding for the new exporter helper.
249+
// WithQueueBatchSettings is used to set the queuebatch.Settings for the new request based exporter helper.
249250
// It must be provided as the first option when creating a new exporter helper.
250-
func WithEncoding(encoding exporterqueue.Encoding[request.Request]) Option {
251+
func WithQueueBatchSettings(set queuebatch.Settings[request.Request]) Option {
251252
return func(o *BaseExporter) error {
252-
o.encoding = encoding
253+
o.queueBatchSettings = set
253254
return nil
254255
}
255256
}

exporter/exporterhelper/internal/base_exporter_test.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"go.opentelemetry.io/collector/config/configretry"
1919
"go.opentelemetry.io/collector/exporter"
2020
"go.opentelemetry.io/collector/exporter/exporterbatcher"
21+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
2122
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2223
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
2324
"go.opentelemetry.io/collector/exporter/exporterqueue"
@@ -60,7 +61,7 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
6061
bs, err := NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal, noopExport,
6162
WithRetry(configretry.NewDefaultBackOffConfig()))
6263
require.NoError(t, err)
63-
require.Nil(t, bs.encoding)
64+
require.Nil(t, bs.queueBatchSettings.Encoding)
6465
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal, noopExport,
6566
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(exporterqueue.NewDefaultConfig()))
6667
require.Error(t, err)
@@ -69,9 +70,9 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
6970
storageID := component.NewID(component.MustNewType("test"))
7071
qCfg.StorageID = &storageID
7172
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal, noopExport,
72-
WithEncoding(newFakeEncoding(&requesttest.FakeRequest{Items: 1})),
73+
WithQueueBatchSettings(newFakeQueueBatch(&requesttest.FakeRequest{Items: 1})),
7374
WithRetry(configretry.NewDefaultBackOffConfig()),
74-
WithRequestQueue(qCfg, nil))
75+
WithQueueBatch(qCfg, queuebatch.Settings[request.Request]{}))
7576
require.Error(t, err)
7677
}
7778

@@ -84,7 +85,7 @@ func TestBaseExporterLogging(t *testing.T) {
8485
qCfg := exporterqueue.NewDefaultConfig()
8586
qCfg.Enabled = false
8687
bs, err := NewBaseExporter(set, defaultSignal, errExport,
87-
WithRequestQueue(qCfg, newFakeEncoding(&requesttest.FakeRequest{})),
88+
WithQueueBatch(qCfg, newFakeQueueBatch(&requesttest.FakeRequest{})),
8889
WithBatcher(exporterbatcher.NewDefaultConfig()),
8990
WithRetry(rCfg))
9091
require.NoError(t, err)
@@ -108,7 +109,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
108109
{
109110
name: "WithQueue",
110111
queueOptions: []Option{
111-
WithEncoding(newFakeEncoding(&requesttest.FakeRequest{Items: 1})),
112+
WithQueueBatchSettings(newFakeQueueBatch(&requesttest.FakeRequest{Items: 1})),
112113
func() Option {
113114
qs := exporterqueue.NewDefaultConfig()
114115
qs.Enabled = false
@@ -127,7 +128,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
127128
func() Option {
128129
qs := exporterqueue.NewDefaultConfig()
129130
qs.Enabled = false
130-
return WithRequestQueue(qs, newFakeEncoding(&requesttest.FakeRequest{Items: 1}))
131+
return WithQueueBatch(qs, newFakeQueueBatch(&requesttest.FakeRequest{Items: 1}))
131132
}(),
132133
func() Option {
133134
bs := exporterbatcher.NewDefaultConfig()
@@ -162,3 +163,7 @@ func errExport(context.Context, request.Request) error {
162163
func noopExport(context.Context, request.Request) error {
163164
return nil
164165
}
166+
167+
func newFakeQueueBatch(mr request.Request) queuebatch.Settings[request.Request] {
168+
return queuebatch.Settings[request.Request]{Encoding: &fakeEncoding{mr: mr}}
169+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
5+
6+
import (
7+
"go.opentelemetry.io/collector/exporter/exporterqueue"
8+
)
9+
10+
type Settings[K any] struct {
11+
Encoding exporterqueue.Encoding[K]
12+
}

exporter/exporterhelper/logs.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"go.opentelemetry.io/collector/consumer/consumererror"
1515
"go.opentelemetry.io/collector/exporter"
1616
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
17+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1718
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
1819
"go.opentelemetry.io/collector/pdata/plog"
1920
"go.opentelemetry.io/collector/pipeline"
@@ -92,7 +93,8 @@ func NewLogs(
9293
if pusher == nil {
9394
return nil, errNilPushLogs
9495
}
95-
return NewLogsRequest(ctx, set, requestFromLogs(), requestConsumeFromLogs(pusher), append([]Option{internal.WithEncoding(logsEncoding{})}, options...)...)
96+
return NewLogsRequest(ctx, set, requestFromLogs(), requestConsumeFromLogs(pusher),
97+
append([]Option{internal.WithQueueBatchSettings(queuebatch.Settings[Request]{Encoding: logsEncoding{}})}, options...)...)
9698
}
9799

98100
// Deprecated: [v0.122.0] use RequestConverterFunc[plog.Logs].

exporter/exporterhelper/metrics.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"go.opentelemetry.io/collector/consumer/consumererror"
1515
"go.opentelemetry.io/collector/exporter"
1616
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
17+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1718
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
1819
"go.opentelemetry.io/collector/pdata/pmetric"
1920
"go.opentelemetry.io/collector/pipeline"
@@ -92,7 +93,8 @@ func NewMetrics(
9293
if pusher == nil {
9394
return nil, errNilPushMetrics
9495
}
95-
return NewMetricsRequest(ctx, set, requestFromMetrics(), requestConsumeFromMetrics(pusher), append([]Option{internal.WithEncoding(metricsEncoding{})}, options...)...)
96+
return NewMetricsRequest(ctx, set, requestFromMetrics(), requestConsumeFromMetrics(pusher),
97+
append([]Option{internal.WithQueueBatchSettings(queuebatch.Settings[Request]{Encoding: metricsEncoding{}})}, options...)...)
9698
}
9799

98100
// Deprecated: [v0.122.0] use RequestConverterFunc[pmetric.Metrics].
+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper"
5+
6+
import (
7+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
8+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
9+
"go.opentelemetry.io/collector/exporter/exporterqueue"
10+
)
11+
12+
// QueueConfig defines configuration for queueing batches before sending to the consumerSender.
13+
type QueueConfig = exporterqueue.Config
14+
15+
// Deprecated: [v0.123.0] use WithQueueBatch.
16+
func WithRequestQueue(cfg exporterqueue.Config, encoding exporterqueue.Encoding[Request]) Option {
17+
return WithQueueBatch(cfg, QueueBatchSettings{Encoding: encoding})
18+
}
19+
20+
// QueueBatchConfig defines configuration for queueing and batching for the exporter.
21+
type QueueBatchConfig = exporterqueue.Config
22+
23+
// QueueBatchSettings are settings for the QueueBatch component.
24+
// They include things line Encoding to be used with persistent queue, or the available Sizers, etc.
25+
type QueueBatchSettings = queuebatch.Settings[Request]
26+
27+
// WithQueueBatch enables queueing and batching for an exporter.
28+
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
29+
// Experimental: This API is at the early stage of development and may change without backward compatibility
30+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
31+
func WithQueueBatch(cfg QueueBatchConfig, set QueueBatchSettings) Option {
32+
return internal.WithQueueBatch(cfg, set)
33+
}
34+
35+
// NewDefaultQueueConfig returns the default config for QueueConfig.
36+
func NewDefaultQueueConfig() QueueConfig {
37+
return exporterqueue.Config{
38+
Enabled: true,
39+
NumConsumers: 10,
40+
// By default, batches are 8192 spans, for a total of up to 8 million spans in the queue
41+
// This can be estimated at 1-4 GB worth of maximum memory usage
42+
// This default is probably still too high, and may be adjusted further down in a future release
43+
QueueSize: 1_000,
44+
Blocking: false,
45+
}
46+
}

exporter/exporterhelper/queue_sender.go

-24
This file was deleted.

exporter/exporterhelper/traces.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"go.opentelemetry.io/collector/consumer/consumererror"
1515
"go.opentelemetry.io/collector/exporter"
1616
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
17+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1718
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
1819
"go.opentelemetry.io/collector/pdata/ptrace"
1920
"go.opentelemetry.io/collector/pipeline"
@@ -92,7 +93,8 @@ func NewTraces(
9293
if pusher == nil {
9394
return nil, errNilPushTraces
9495
}
95-
return NewTracesRequest(ctx, set, requestFromTraces(), requestConsumeFromTraces(pusher), append([]Option{internal.WithEncoding(tracesEncoding{})}, options...)...)
96+
return NewTracesRequest(ctx, set, requestFromTraces(), requestConsumeFromTraces(pusher),
97+
append([]Option{internal.WithQueueBatchSettings(queuebatch.Settings[Request]{Encoding: tracesEncoding{}})}, options...)...)
9698
}
9799

98100
// Deprecated: [v0.122.0] use RequestConverterFunc[ptrace.Traces].

exporter/exporterhelper/xexporterhelper/profiles.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"go.opentelemetry.io/collector/exporter"
1717
"go.opentelemetry.io/collector/exporter/exporterhelper"
1818
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
19+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1920
"go.opentelemetry.io/collector/exporter/xexporter"
2021
"go.opentelemetry.io/collector/pdata/pprofile"
2122
"go.opentelemetry.io/collector/pipeline/xpipeline"
@@ -87,8 +88,8 @@ func NewProfilesExporter(
8788
if pusher == nil {
8889
return nil, errNilPushProfileData
8990
}
90-
opts := []exporterhelper.Option{internal.WithEncoding(profilesEncoding{})}
91-
return NewProfilesRequestExporter(ctx, set, requestFromProfiles(), requestConsumeFromProfiles(pusher), append(opts, options...)...)
91+
return NewProfilesRequestExporter(ctx, set, requestFromProfiles(), requestConsumeFromProfiles(pusher),
92+
append([]exporterhelper.Option{internal.WithQueueBatchSettings(queuebatch.Settings[exporterhelper.Request]{Encoding: profilesEncoding{}})}, options...)...)
9293
}
9394

9495
// Deprecated: [v0.122.0] use exporterhelper.RequestConverterFunc[pprofile.Profiles].

0 commit comments

Comments
 (0)