Skip to content

Commit 1ebb9b2

Browse files
authored
[chore] [exporterhelper] Revert moving StorageID to configpoptional (#13376)
Partially reverts #13345 `configoptional` cannot unmarshal a scalar preset value yet. See https://github.com/open-telemetry/opentelemetry-collector-contrib/actions/runs/16199280314/job/45733970146?pr=41234
1 parent 78fbbf6 commit 1ebb9b2

File tree

14 files changed

+25
-28
lines changed

14 files changed

+25
-28
lines changed

.chloggen/fix-todo-optional.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ change_type: breaking
77
component: exporterhelper
88

99
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10-
note: Use configoptional for optional fields in exporterhelper
10+
note: Use configoptional for sending_queue::batch field
1111

1212
# One or more tracking issues or pull requests related to the change
1313
issues: [13345]

exporter/exporterhelper/internal/base_exporter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func WithQueueBatch(cfg queuebatch.Config, set QueueBatchSettings[request.Reques
213213
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
214214
return nil
215215
}
216-
if cfg.StorageID.HasValue() && set.Encoding == nil {
216+
if cfg.StorageID != nil && set.Encoding == nil {
217217
return errors.New("`QueueBatchSettings.Encoding` must not be nil when persistent queue is enabled")
218218
}
219219
o.queueBatchSettings = set

exporter/exporterhelper/internal/base_exporter_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515

1616
"go.opentelemetry.io/collector/component"
1717
"go.opentelemetry.io/collector/component/componenttest"
18-
"go.opentelemetry.io/collector/config/configoptional"
1918
"go.opentelemetry.io/collector/config/configretry"
2019
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2120
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
@@ -53,7 +52,8 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
5352
require.Error(t, err)
5453

5554
qCfg := NewDefaultQueueConfig()
56-
qCfg.StorageID = configoptional.Some(component.MustNewID("test"))
55+
storageID := component.NewID(component.MustNewType("test"))
56+
qCfg.StorageID = &storageID
5757
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
5858
WithQueueBatchSettings(newFakeQueueBatch()),
5959
WithRetry(configretry.NewDefaultBackOffConfig()),

exporter/exporterhelper/internal/queue/persistent_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func newPersistentQueue[T any](set Settings[T]) readableQueue[T] {
102102
activeSizer: set.activeSizer(),
103103
itemsSizer: set.ItemsSizer,
104104
bytesSizer: set.BytesSizer,
105-
storageID: *set.StorageID.Get(),
105+
storageID: *set.StorageID,
106106
id: set.ID,
107107
signal: set.Signal,
108108
blockOnOverflow: set.BlockOnOverflow,

exporter/exporterhelper/internal/queue/persistent_queue_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020

2121
"go.opentelemetry.io/collector/component"
2222
"go.opentelemetry.io/collector/component/componenttest"
23-
"go.opentelemetry.io/collector/config/configoptional"
2423
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
2524
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/hosttest"
2625
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
@@ -232,7 +231,8 @@ func newSettings(sizerType request.SizerType, capacity int64) Settings[int64] {
232231

233232
func newSettingsWithStorage(sizerType request.SizerType, capacity int64) Settings[int64] {
234233
set := newSettings(sizerType, capacity)
235-
set.StorageID = configoptional.Some(component.ID{})
234+
storageID := component.ID{}
235+
set.StorageID = &storageID
236236
return set
237237
}
238238

@@ -513,7 +513,8 @@ func TestInvalidStorageExtensionType(t *testing.T) {
513513
}
514514

515515
func TestPersistentQueue_StopAfterBadStart(t *testing.T) {
516-
pq := newPersistentQueue[int64](Settings[int64]{StorageID: configoptional.Some(component.ID{})})
516+
storageID := component.ID{}
517+
pq := newPersistentQueue[int64](Settings[int64]{StorageID: &storageID})
517518
// verify that stopping a un-start/started w/error queue does not panic
518519
assert.NoError(t, pq.Shutdown(context.Background()))
519520
}

exporter/exporterhelper/internal/queue/queue.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"errors"
99

1010
"go.opentelemetry.io/collector/component"
11-
"go.opentelemetry.io/collector/config/configoptional"
1211
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1312
"go.opentelemetry.io/collector/pipeline"
1413
)
@@ -64,7 +63,7 @@ type Settings[T any] struct {
6463
WaitForResult bool
6564
BlockOnOverflow bool
6665
Signal pipeline.Signal
67-
StorageID configoptional.Optional[component.ID]
66+
StorageID *component.ID
6867
Encoding Encoding[T]
6968
ID component.ID
7069
Telemetry component.TelemetrySettings
@@ -97,7 +96,7 @@ func NewQueue[T request.Request](set Settings[T], next ConsumeFunc[T]) (Queue[T]
9796

9897
func newBaseQueue[T any](set Settings[T]) (readableQueue[T], error) {
9998
// Configure memory queue or persistent based on the config.
100-
if !set.StorageID.HasValue() {
99+
if set.StorageID == nil {
101100
return newMemoryQueue[T](set), nil
102101
}
103102
if set.ItemsSizer == nil {

exporter/exporterhelper/internal/queuebatch/config.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@ type Config struct {
3333
// If true, the component will wait for space; otherwise, operations will immediately return a retryable error.
3434
BlockOnOverflow bool `mapstructure:"block_on_overflow"`
3535

36-
// StorageID, if not empty, enables the persistent storage and uses the component specified
36+
// StorageID if not empty, enables the persistent storage and uses the component specified
3737
// as a storage extension for the persistent queue.
38-
StorageID configoptional.Optional[component.ID] `mapstructure:"storage"`
38+
// TODO: This will be changed to Optional when available.
39+
StorageID *component.ID `mapstructure:"storage"`
3940

4041
// NumConsumers is the maximum number of concurrent consumers from the queue.
4142
// This applies across all different optional configurations from above (e.g. wait_for_result, blockOnOverflow, persistent, etc.).
@@ -74,7 +75,7 @@ func (cfg *Config) Validate() error {
7475
}
7576

7677
// Only support request sizer for persistent queue at this moment.
77-
if cfg.StorageID.HasValue() && cfg.WaitForResult {
78+
if cfg.StorageID != nil && cfg.WaitForResult {
7879
return errors.New("`wait_for_result` is not supported with a persistent queue configured with `storage`")
7980
}
8081

exporter/exporterhelper/internal/queuebatch/config_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/stretchr/testify/require"
1212

1313
"go.opentelemetry.io/collector/component"
14-
"go.opentelemetry.io/collector/config/configoptional"
1514
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1615
)
1716

@@ -30,9 +29,10 @@ func TestConfig_Validate(t *testing.T) {
3029
cfg.QueueSize = 0
3130
require.EqualError(t, cfg.Validate(), "`queue_size` must be positive")
3231

32+
storageID := component.MustNewID("test")
3333
cfg = newTestConfig()
3434
cfg.WaitForResult = true
35-
cfg.StorageID = configoptional.Some(component.MustNewID("test"))
35+
cfg.StorageID = &storageID
3636
require.EqualError(t, cfg.Validate(), "`wait_for_result` is not supported with a persistent queue configured with `storage`")
3737

3838
cfg = newTestConfig()

exporter/exporterhelper/internal/queuebatch/queue_batch_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func TestQueueBatchDifferentSizers(t *testing.T) {
151151
func TestQueueBatchPersistenceEnabled(t *testing.T) {
152152
cfg := newTestConfig()
153153
storageID := component.MustNewIDWithName("file_storage", "storage")
154-
cfg.StorageID = configoptional.Some(storageID)
154+
cfg.StorageID = &storageID
155155
qb, err := NewQueueBatch(newFakeRequestSettings(), cfg, sendertest.NewNopSenderFunc[request.Request]())
156156
require.NoError(t, err)
157157

@@ -168,7 +168,7 @@ func TestQueueBatchPersistenceEnabledStorageError(t *testing.T) {
168168
storageError := errors.New("could not get storage client")
169169
cfg := newTestConfig()
170170
storageID := component.MustNewIDWithName("file_storage", "storage")
171-
cfg.StorageID = configoptional.Some(storageID)
171+
cfg.StorageID = &storageID
172172
qb, err := NewQueueBatch(newFakeRequestSettings(), cfg, sendertest.NewNopSenderFunc[request.Request]())
173173
require.NoError(t, err)
174174

@@ -184,7 +184,7 @@ func TestQueueBatchPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
184184
cfg := newTestConfig()
185185
cfg.NumConsumers = 1
186186
storageID := component.MustNewIDWithName("file_storage", "storage")
187-
cfg.StorageID = configoptional.Some(storageID)
187+
cfg.StorageID = &storageID
188188

189189
mockReq := &requesttest.FakeRequest{Items: 2}
190190
qSet := newFakeRequestSettings()

exporter/exporterhelper/logs_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222

2323
"go.opentelemetry.io/collector/component"
2424
"go.opentelemetry.io/collector/component/componenttest"
25-
"go.opentelemetry.io/collector/config/configoptional"
2625
"go.opentelemetry.io/collector/consumer"
2726
"go.opentelemetry.io/collector/consumer/consumererror"
2827
"go.opentelemetry.io/collector/consumer/consumertest"
@@ -174,7 +173,7 @@ func TestLogs_WithPersistentQueue(t *testing.T) {
174173
fgOrigWriteState := queue.PersistRequestContextOnWrite
175174
qCfg := NewDefaultQueueConfig()
176175
storageID := component.MustNewIDWithName("file_storage", "storage")
177-
qCfg.StorageID = configoptional.Some(storageID)
176+
qCfg.StorageID = &storageID
178177
set := exportertest.NewNopSettings(exportertest.NopType)
179178
set.ID = component.MustNewIDWithName("test_logs", "with_persistent_queue")
180179
host := hosttest.NewHost(map[component.ID]component.Component{

0 commit comments

Comments
 (0)