Skip to content

Commit da4512d

Browse files
authored
Replace exporterhelper Request.Export with passing a ConsumeRequest func (#12637)
Fixes #8950 The logic behind this, is that during merging and splitting we should not care about the export func (see complicated logic removed in FakeRequest of in the pdata requests implementations). Also, this simplify what user should carry with the request. Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 6d82b55 commit da4512d

30 files changed

+768
-753
lines changed

.chloggen/rm-request-export.yaml

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Remove the Request.Export function in favor of an equivalent request consume func in the New[Traces|Metrics|Logs|Profiles]Request
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12637]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

exporter/exporterhelper/constants.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ var (
1212
errNilConfig = errors.New("nil config")
1313
// errNilLogger is returned when a logger is nil
1414
errNilLogger = errors.New("nil logger")
15-
// errNilPushTraceData is returned when a nil PushTraces is given.
16-
errNilPushTraceData = errors.New("nil PushTraces")
17-
// errNilPushMetricsData is returned when a nil PushMetrics is given.
18-
errNilPushMetricsData = errors.New("nil PushMetrics")
19-
// errNilPushLogsData is returned when a nil PushLogs is given.
20-
errNilPushLogsData = errors.New("nil PushLogs")
15+
// errNilConsumeRequest is returned when a nil PushTraces is given.
16+
errNilConsumeRequest = errors.New("nil RequestConsumeFunc")
17+
// errNilPushTraces is returned when a nil PushTraces is given.
18+
errNilPushTraces = errors.New("nil PushTraces")
19+
// errNilPushMetrics is returned when a nil PushMetrics is given.
20+
errNilPushMetrics = errors.New("nil PushMetrics")
21+
// errNilPushLogs is returned when a nil PushLogs is given.
22+
errNilPushLogs = errors.New("nil PushLogs")
2123
// errNilTracesConverter is returned when a nil RequestFromTracesFunc is given.
2224
errNilTracesConverter = errors.New("nil RequestFromTracesFunc")
2325
// errNilMetricsConverter is returned when a nil RequestFromMetricsFunc is given.

exporter/exporterhelper/internal/base_exporter.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ type BaseExporter struct {
5050
batcherCfg exporterbatcher.Config
5151
}
5252

53-
func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...Option) (*BaseExporter, error) {
53+
func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher func(context.Context, request.Request) error, options ...Option) (*BaseExporter, error) {
5454
be := &BaseExporter{
5555
Set: set,
5656
timeoutCfg: NewDefaultTimeoutConfig(),
@@ -62,15 +62,13 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...O
6262
}
6363
}
6464

65-
//nolint: staticcheck
65+
//nolint:staticcheck
6666
if be.batcherCfg.MinSizeItems != nil || be.batcherCfg.MaxSizeItems != nil {
6767
set.Logger.Warn("Using of deprecated fields `min_size_items` and `max_size_items`")
6868
}
6969

7070
// Consumer Sender is always initialized.
71-
be.firstSender = newSender(func(ctx context.Context, req request.Request) error {
72-
return req.Export(ctx)
73-
})
71+
be.firstSender = newSender(pusher)
7472

7573
// Next setup the timeout Sender since we want the timeout to control only the export functionality.
7674
// Only initialize if not explicitly disabled.

exporter/exporterhelper/internal/base_exporter_test.go

+17-23
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,8 @@ var (
3636
}()
3737
)
3838

39-
func newNoopExportSender() Sender[request.Request] {
40-
return newSender(func(ctx context.Context, req request.Request) error {
41-
select {
42-
case <-ctx.Done():
43-
return ctx.Err() // Returns the cancellation error
44-
default:
45-
return req.Export(ctx)
46-
}
47-
})
48-
}
49-
5039
func TestBaseExporter(t *testing.T) {
51-
be, err := NewBaseExporter(defaultSettings, defaultSignal)
40+
be, err := NewBaseExporter(defaultSettings, defaultSignal, noopExport)
5241
require.NoError(t, err)
5342
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
5443
require.NoError(t, be.Shutdown(context.Background()))
@@ -57,7 +46,7 @@ func TestBaseExporter(t *testing.T) {
5746
func TestBaseExporterWithOptions(t *testing.T) {
5847
want := errors.New("my error")
5948
be, err := NewBaseExporter(
60-
defaultSettings, defaultSignal,
49+
defaultSettings, defaultSignal, noopExport,
6150
WithStart(func(context.Context, component.Host) error { return want }),
6251
WithShutdown(func(context.Context) error { return want }),
6352
WithTimeout(NewDefaultTimeoutConfig()),
@@ -68,18 +57,18 @@ func TestBaseExporterWithOptions(t *testing.T) {
6857
}
6958

7059
func TestQueueOptionsWithRequestExporter(t *testing.T) {
71-
bs, err := NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
60+
bs, err := NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal, noopExport,
7261
WithRetry(configretry.NewDefaultBackOffConfig()))
7362
require.NoError(t, err)
7463
require.Nil(t, bs.encoding)
75-
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
64+
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal, noopExport,
7665
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(exporterqueue.NewDefaultConfig()))
7766
require.Error(t, err)
7867

7968
qCfg := exporterqueue.NewDefaultConfig()
8069
storageID := component.NewID(component.MustNewType("test"))
8170
qCfg.StorageID = &storageID
82-
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
71+
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal, noopExport,
8372
WithEncoding(newFakeEncoding(&requesttest.FakeRequest{Items: 1})),
8473
WithRetry(configretry.NewDefaultBackOffConfig()),
8574
WithRequestQueue(qCfg, nil))
@@ -94,14 +83,13 @@ func TestBaseExporterLogging(t *testing.T) {
9483
rCfg.Enabled = false
9584
qCfg := exporterqueue.NewDefaultConfig()
9685
qCfg.Enabled = false
97-
bs, err := NewBaseExporter(set, defaultSignal,
86+
bs, err := NewBaseExporter(set, defaultSignal, errExport,
9887
WithRequestQueue(qCfg, newFakeEncoding(&requesttest.FakeRequest{})),
9988
WithBatcher(exporterbatcher.NewDefaultConfig()),
10089
WithRetry(rCfg))
10190
require.NoError(t, err)
10291
require.NoError(t, bs.Start(context.Background(), componenttest.NewNopHost()))
103-
sink := requesttest.NewSink()
104-
sendErr := bs.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink, ExportErr: errors.New("my error")})
92+
sendErr := bs.Send(context.Background(), &requesttest.FakeRequest{Items: 2})
10593
require.Error(t, sendErr)
10694

10795
require.Len(t, observed.FilterLevelExact(zap.ErrorLevel).All(), 2)
@@ -155,16 +143,22 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
155143
set := exportertest.NewNopSettings(exportertest.NopType)
156144
logger, observed := observer.New(zap.ErrorLevel)
157145
set.Logger = zap.New(logger)
158-
be, err := NewBaseExporter(set, pipeline.SignalLogs, tt.queueOptions...)
146+
be, err := NewBaseExporter(set, pipeline.SignalLogs, errExport, tt.queueOptions...)
159147
require.NoError(t, err)
160148
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
161-
sink := requesttest.NewSink()
162-
mockR := &requesttest.FakeRequest{Items: 2, Sink: sink, ExportErr: errors.New("some error")}
149+
mockR := &requesttest.FakeRequest{Items: 2}
163150
require.Error(t, be.Send(context.Background(), mockR))
164151
assert.Len(t, observed.All(), 1)
165152
assert.Equal(t, "Exporting failed. Rejecting data. Try enabling sending_queue to survive temporary failures.", observed.All()[0].Message)
166153
require.NoError(t, be.Shutdown(context.Background()))
167-
assert.Empty(t, 0, sink.RequestsCount())
168154
})
169155
}
170156
}
157+
158+
func errExport(context.Context, request.Request) error {
159+
return errors.New("my error")
160+
}
161+
162+
func noopExport(context.Context, request.Request) error {
163+
return nil
164+
}

0 commit comments

Comments
 (0)