Skip to content

Commit 2bc783c

Browse files
authored
Deprecated everything in exporterqueue, alias from exporterhelper (#12706)
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 622ce90 commit 2bc783c

14 files changed

+156
-122
lines changed
+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: deprecation
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterqueue
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Deprecated Config, NewDefaultConfig, Encoding, ErrQueueFull. Use alias from exporterhelper.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12706]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

exporter/exporterhelper/common.go

-17
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"go.opentelemetry.io/collector/component"
88
"go.opentelemetry.io/collector/config/configretry"
99
"go.opentelemetry.io/collector/consumer"
10-
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1110
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
1211
)
1312

@@ -38,25 +37,9 @@ func WithRetry(config configretry.BackOffConfig) Option {
3837
return internal.WithRetry(config)
3938
}
4039

41-
// WithQueue overrides the default QueueConfig for an exporter.
42-
// The default QueueConfig is to disable queueing.
43-
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
44-
func WithQueue(config QueueConfig) Option {
45-
return internal.WithQueue(config)
46-
}
47-
4840
// WithCapabilities overrides the default Capabilities() function for a Consumer.
4941
// The default is non-mutable data.
5042
// TODO: Verify if we can change the default to be mutable as we do for processors.
5143
func WithCapabilities(capabilities consumer.Capabilities) Option {
5244
return internal.WithCapabilities(capabilities)
5345
}
54-
55-
// WithBatcher enables batching for an exporter based on custom request types.
56-
// For now, it can be used only with the New[Traces|Metrics|Logs]RequestExporter exporter helpers and
57-
// WithRequestBatchFuncs provided.
58-
// This API is at the early stage of development and may change without backward compatibility
59-
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
60-
func WithBatcher(cfg exporterbatcher.Config) Option {
61-
return internal.WithBatcher(cfg)
62-
}

exporter/exporterhelper/internal/base_exporter.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1919
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2020
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
21-
"go.opentelemetry.io/collector/exporter/exporterqueue"
2221
"go.opentelemetry.io/collector/pipeline"
2322
)
2423

@@ -49,7 +48,7 @@ type BaseExporter struct {
4948
retryCfg configretry.BackOffConfig
5049

5150
queueBatchSettings QueueBatchSettings[request.Request]
52-
queueCfg exporterqueue.Config
51+
queueCfg QueueConfig
5352
batcherCfg exporterbatcher.Config
5453
}
5554

@@ -195,7 +194,7 @@ func WithRetry(config configretry.BackOffConfig) Option {
195194
// WithQueue overrides the default QueueConfig for an exporter.
196195
// The default QueueConfig is to disable queueing.
197196
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
198-
func WithQueue(cfg exporterqueue.Config) Option {
197+
func WithQueue(cfg QueueConfig) Option {
199198
return func(o *BaseExporter) error {
200199
if o.queueBatchSettings.Encoding == nil {
201200
return errors.New("WithQueue option is not available for the new request exporters, use WithQueueBatch instead")
@@ -208,7 +207,7 @@ func WithQueue(cfg exporterqueue.Config) Option {
208207
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
209208
// Experimental: This API is at the early stage of development and may change without backward compatibility
210209
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
211-
func WithQueueBatch(cfg exporterqueue.Config, set QueueBatchSettings[request.Request]) Option {
210+
func WithQueueBatch(cfg QueueConfig, set QueueBatchSettings[request.Request]) Option {
212211
return func(o *BaseExporter) error {
213212
if !cfg.Enabled {
214213
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."

exporter/exporterhelper/internal/base_exporter_test.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
2121
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2222
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
23-
"go.opentelemetry.io/collector/exporter/exporterqueue"
2423
"go.opentelemetry.io/collector/exporter/exportertest"
2524
"go.opentelemetry.io/collector/pipeline"
2625
)
@@ -51,10 +50,10 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
5150
require.NoError(t, err)
5251
require.Nil(t, bs.queueBatchSettings.Encoding)
5352
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
54-
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(exporterqueue.NewDefaultConfig()))
53+
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig()))
5554
require.Error(t, err)
5655

57-
qCfg := exporterqueue.NewDefaultConfig()
56+
qCfg := NewDefaultQueueConfig()
5857
storageID := component.NewID(component.MustNewType("test"))
5958
qCfg.StorageID = &storageID
6059
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
@@ -70,7 +69,7 @@ func TestBaseExporterLogging(t *testing.T) {
7069
set.Logger = zap.New(logger)
7170
rCfg := configretry.NewDefaultBackOffConfig()
7271
rCfg.Enabled = false
73-
qCfg := exporterqueue.NewDefaultConfig()
72+
qCfg := NewDefaultQueueConfig()
7473
qCfg.Enabled = false
7574
bs, err := NewBaseExporter(set, pipeline.SignalMetrics, errExport,
7675
WithQueueBatchSettings(newFakeQueueBatch()),
@@ -100,7 +99,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
10099
queueOptions: []Option{
101100
WithQueueBatchSettings(newFakeQueueBatch()),
102101
func() Option {
103-
qs := exporterqueue.NewDefaultConfig()
102+
qs := NewDefaultQueueConfig()
104103
qs.Enabled = false
105104
return WithQueue(qs)
106105
}(),
@@ -115,7 +114,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
115114
name: "WithRequestQueue",
116115
queueOptions: []Option{
117116
func() Option {
118-
qs := exporterqueue.NewDefaultConfig()
117+
qs := NewDefaultQueueConfig()
119118
qs.Enabled = false
120119
return WithQueueBatch(qs, newFakeQueueBatch())
121120
}(),

exporter/exporterhelper/internal/queue_sender.go

+52-4
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,73 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe
55

66
import (
77
"context"
8+
"errors"
89

910
"go.uber.org/zap"
1011

12+
"go.opentelemetry.io/collector/component"
1113
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1214
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1315
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1416
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
15-
"go.opentelemetry.io/collector/exporter/exporterqueue"
1617
)
1718

1819
// QueueBatchSettings is a subset of the queuebatch.Settings that are needed when used within an Exporter.
1920
type QueueBatchSettings[K any] struct {
20-
Encoding exporterqueue.Encoding[K]
21+
Encoding queuebatch.Encoding[K]
2122
Sizers map[exporterbatcher.SizerType]queuebatch.Sizer[K]
2223
}
2324

25+
// NewDefaultQueueConfig returns the default config for QueueConfig.
26+
// By default, the queue stores 1000 items of telemetry and is non-blocking when full.
27+
func NewDefaultQueueConfig() QueueConfig {
28+
return QueueConfig{
29+
Enabled: true,
30+
NumConsumers: 10,
31+
// By default, batches are 8192 spans, for a total of up to 8 million spans in the queue
32+
// This can be estimated at 1-4 GB worth of maximum memory usage
33+
// This default is probably still too high, and may be adjusted further down in a future release
34+
QueueSize: 1_000,
35+
Blocking: false,
36+
}
37+
}
38+
39+
// QueueConfig defines configuration for queueing requests before exporting.
40+
// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
41+
// Experimental: This API is at the early stage of development and may change without backward compatibility
42+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
43+
type QueueConfig struct {
44+
// Enabled indicates whether to not enqueue batches before exporting.
45+
Enabled bool `mapstructure:"enabled"`
46+
// NumConsumers is the number of consumers from the queue.
47+
NumConsumers int `mapstructure:"num_consumers"`
48+
// QueueSize is the maximum number of requests allowed in queue at any given time.
49+
QueueSize int `mapstructure:"queue_size"`
50+
// Blocking controls the queue behavior when full.
51+
// If true it blocks until enough space to add the new request to the queue.
52+
Blocking bool `mapstructure:"blocking"`
53+
// StorageID if not empty, enables the persistent storage and uses the component specified
54+
// as a storage extension for the persistent queue
55+
StorageID *component.ID `mapstructure:"storage"`
56+
}
57+
58+
// Validate checks if the Config is valid
59+
func (qCfg *QueueConfig) Validate() error {
60+
if !qCfg.Enabled {
61+
return nil
62+
}
63+
if qCfg.NumConsumers <= 0 {
64+
return errors.New("`num_consumers` must be positive")
65+
}
66+
if qCfg.QueueSize <= 0 {
67+
return errors.New("`queue_size` must be positive")
68+
}
69+
return nil
70+
}
71+
2472
func NewQueueSender(
2573
qSet queuebatch.Settings[request.Request],
26-
qCfg exporterqueue.Config,
74+
qCfg QueueConfig,
2775
bCfg exporterbatcher.Config,
2876
exportFailureMessage string,
2977
next sender.Sender[request.Request],
@@ -43,7 +91,7 @@ func NewQueueSender(
4391
return queuebatch.NewQueueBatch(qSet, newQueueBatchConfig(qCfg, bCfg), exportFunc)
4492
}
4593

46-
func newQueueBatchConfig(qCfg exporterqueue.Config, bCfg exporterbatcher.Config) queuebatch.Config {
94+
func newQueueBatchConfig(qCfg QueueConfig, bCfg exporterbatcher.Config) queuebatch.Config {
4795
qbCfg := queuebatch.Config{
4896
Enabled: true,
4997
WaitForResult: !qCfg.Enabled,

exporter/exporterhelper/internal/queue_sender_test.go

+17-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2121
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
2222
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
23-
"go.opentelemetry.io/collector/exporter/exporterqueue"
2423
"go.opentelemetry.io/collector/exporter/exportertest"
2524
"go.opentelemetry.io/collector/pipeline"
2625
)
@@ -37,7 +36,7 @@ func TestNewQueueSenderFailedRequestDropped(t *testing.T) {
3736
logger, observed := observer.New(zap.ErrorLevel)
3837
qSet.Telemetry.Logger = zap.New(logger)
3938
be, err := NewQueueSender(
40-
qSet, exporterqueue.NewDefaultConfig(), exporterbatcher.Config{}, "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") }))
39+
qSet, NewDefaultQueueConfig(), exporterbatcher.Config{}, "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") }))
4140

4241
require.NoError(t, err)
4342
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
@@ -46,3 +45,19 @@ func TestNewQueueSenderFailedRequestDropped(t *testing.T) {
4645
assert.Len(t, observed.All(), 1)
4746
assert.Equal(t, "Exporting failed. Dropping data.", observed.All()[0].Message)
4847
}
48+
49+
func TestQueueConfig_Validate(t *testing.T) {
50+
qCfg := NewDefaultQueueConfig()
51+
require.NoError(t, qCfg.Validate())
52+
53+
qCfg.NumConsumers = 0
54+
require.EqualError(t, qCfg.Validate(), "`num_consumers` must be positive")
55+
56+
qCfg = NewDefaultQueueConfig()
57+
qCfg.QueueSize = 0
58+
require.EqualError(t, qCfg.Validate(), "`queue_size` must be positive")
59+
60+
// Confirm Validate doesn't return error with invalid config when feature is disabled
61+
qCfg.Enabled = false
62+
assert.NoError(t, qCfg.Validate())
63+
}

exporter/exporterhelper/logs.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1717
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
1818
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
19-
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2019
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
2120
"go.opentelemetry.io/collector/pdata/plog"
2221
"go.opentelemetry.io/collector/pipeline"
@@ -37,7 +36,7 @@ func NewLogsQueueBatchSettings() QueueBatchSettings {
3736
exporterbatcher.SizerTypeRequests: NewRequestsSizer(),
3837
exporterbatcher.SizerTypeItems: queuebatch.NewItemsSizer(),
3938
exporterbatcher.SizerTypeBytes: queuebatch.BaseSizer{
40-
SizeofFunc: func(req request.Request) int64 {
39+
SizeofFunc: func(req Request) int64 {
4140
return int64(logsMarshaler.LogsSize(req.(*logsRequest).ld))
4241
},
4342
},

exporter/exporterhelper/metrics.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1717
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
1818
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
19-
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2019
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
2120
"go.opentelemetry.io/collector/pdata/pmetric"
2221
"go.opentelemetry.io/collector/pipeline"
@@ -37,7 +36,7 @@ func NewMetricsQueueBatchSettings() QueueBatchSettings {
3736
exporterbatcher.SizerTypeRequests: NewRequestsSizer(),
3837
exporterbatcher.SizerTypeItems: queuebatch.NewItemsSizer(),
3938
exporterbatcher.SizerTypeBytes: queuebatch.BaseSizer{
40-
SizeofFunc: func(req request.Request) int64 {
39+
SizeofFunc: func(req Request) int64 {
4140
return int64(metricsMarshaler.MetricsSize(req.(*metricsRequest).md))
4241
},
4342
},

exporter/exporterhelper/queue_batch.go

+35-15
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,49 @@
44
package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper"
55

66
import (
7+
"go.opentelemetry.io/collector/exporter/exporterbatcher"
78
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
8-
"go.opentelemetry.io/collector/exporter/exporterqueue"
9+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
910
)
1011

1112
// QueueConfig defines configuration for queueing batches before sending to the consumerSender.
12-
type QueueConfig = exporterqueue.Config
13+
type QueueConfig = internal.QueueConfig
1314

1415
// Deprecated: [v0.123.0] use WithQueueBatch.
15-
func WithRequestQueue(cfg exporterqueue.Config, encoding exporterqueue.Encoding[Request]) Option {
16+
func WithRequestQueue(cfg QueueConfig, encoding QueueBatchEncoding[Request]) Option {
1617
return WithQueueBatch(cfg, QueueBatchSettings{Encoding: encoding})
1718
}
1819

20+
// WithQueue overrides the default QueueConfig for an exporter.
21+
// The default QueueConfig is to disable queueing.
22+
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
23+
func WithQueue(config QueueConfig) Option {
24+
return internal.WithQueue(config)
25+
}
26+
27+
// WithBatcher enables batching for an exporter based on custom request types.
28+
// For now, it can be used only with the New[Traces|Metrics|Logs]RequestExporter exporter helpers and
29+
// WithRequestBatchFuncs provided.
30+
// This API is at the early stage of development and may change without backward compatibility
31+
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
32+
func WithBatcher(cfg exporterbatcher.Config) Option {
33+
return internal.WithBatcher(cfg)
34+
}
35+
1936
// QueueBatchConfig defines configuration for queueing and batching for the exporter.
20-
type QueueBatchConfig = exporterqueue.Config
37+
type QueueBatchConfig = internal.QueueConfig
38+
39+
// QueueBatchEncoding defines the encoding to be used if persistent queue is configured.
40+
// Duplicate definition with queuebatch.Encoding since aliasing generics is not supported by default.
41+
type QueueBatchEncoding[T any] interface {
42+
// Marshal is a function that can marshal a request into bytes.
43+
Marshal(T) ([]byte, error)
44+
45+
// Unmarshal is a function that can unmarshal bytes into a request.
46+
Unmarshal([]byte) (T, error)
47+
}
48+
49+
var ErrQueueIsFull = queuebatch.ErrQueueIsFull
2150

2251
// QueueBatchSettings are settings for the QueueBatch component.
2352
// They include things line Encoding to be used with persistent queue, or the available Sizers, etc.
@@ -32,14 +61,5 @@ func WithQueueBatch(cfg QueueBatchConfig, set QueueBatchSettings) Option {
3261
}
3362

3463
// NewDefaultQueueConfig returns the default config for QueueConfig.
35-
func NewDefaultQueueConfig() QueueConfig {
36-
return exporterqueue.Config{
37-
Enabled: true,
38-
NumConsumers: 10,
39-
// By default, batches are 8192 spans, for a total of up to 8 million spans in the queue
40-
// This can be estimated at 1-4 GB worth of maximum memory usage
41-
// This default is probably still too high, and may be adjusted further down in a future release
42-
QueueSize: 1_000,
43-
Blocking: false,
44-
}
45-
}
64+
// By default, the queue stores 1000 items of telemetry and is non-blocking when full.
65+
var NewDefaultQueueConfig = internal.NewDefaultQueueConfig

exporter/exporterhelper/request.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ type RequestConverterFunc[K any] func(context.Context, K) (Request, error)
3131

3232
// RequestConsumeFunc processes the request. After the function returns, the request is no longer accessible,
3333
// and accessing it is considered undefined behavior.
34-
type RequestConsumeFunc = sender.SendFunc[request.Request]
34+
type RequestConsumeFunc = sender.SendFunc[Request]
3535

3636
// RequestSizer is an interface that returns the size of the given request.
37-
type RequestSizer = queuebatch.Sizer[request.Request]
37+
type RequestSizer = queuebatch.Sizer[Request]
3838

3939
// NewRequestsSizer returns a RequestSizer that counts the requests by the number of requests, always returning 1.
4040
func NewRequestsSizer() RequestSizer {
41-
return queuebatch.RequestsSizer[request.Request]{}
41+
return queuebatch.RequestsSizer[Request]{}
4242
}

0 commit comments

Comments
 (0)