@@ -5,25 +5,72 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe
5
5
6
6
import (
7
7
"context"
8
+ "errors"
8
9
9
10
"go.uber.org/zap"
10
11
12
+ "go.opentelemetry.io/collector/component"
11
13
"go.opentelemetry.io/collector/exporter/exporterbatcher"
12
14
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
13
15
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
14
16
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
15
- "go.opentelemetry.io/collector/exporter/exporterqueue"
16
17
)
17
18
18
19
// QueueBatchSettings is a subset of the queuebatch.Settings that are needed when used within an Exporter.
19
20
type QueueBatchSettings [K any ] struct {
20
- Encoding exporterqueue .Encoding [K ]
21
+ Encoding queuebatch .Encoding [K ]
21
22
Sizers map [exporterbatcher.SizerType ]queuebatch.Sizer [K ]
22
23
}
23
24
25
+ // NewDefaultQueueConfig returns the default config for QueueConfig.
26
+ func NewDefaultQueueConfig () QueueConfig {
27
+ return QueueConfig {
28
+ Enabled : true ,
29
+ NumConsumers : 10 ,
30
+ // By default, batches are 8192 spans, for a total of up to 8 million spans in the queue
31
+ // This can be estimated at 1-4 GB worth of maximum memory usage
32
+ // This default is probably still too high, and may be adjusted further down in a future release
33
+ QueueSize : 1_000 ,
34
+ Blocking : false ,
35
+ }
36
+ }
37
+
38
+ // QueueConfig defines configuration for queueing requests before exporting.
39
+ // It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
40
+ // Experimental: This API is at the early stage of development and may change without backward compatibility
41
+ // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
42
+ type QueueConfig struct {
43
+ // Enabled indicates whether to not enqueue batches before exporting.
44
+ Enabled bool `mapstructure:"enabled"`
45
+ // NumConsumers is the number of consumers from the queue.
46
+ NumConsumers int `mapstructure:"num_consumers"`
47
+ // QueueSize is the maximum number of requests allowed in queue at any given time.
48
+ QueueSize int `mapstructure:"queue_size"`
49
+ // Blocking controls the queue behavior when full.
50
+ // If true it blocks until enough space to add the new request to the queue.
51
+ Blocking bool `mapstructure:"blocking"`
52
+ // StorageID if not empty, enables the persistent storage and uses the component specified
53
+ // as a storage extension for the persistent queue
54
+ StorageID * component.ID `mapstructure:"storage"`
55
+ }
56
+
57
+ // Validate checks if the Config is valid
58
+ func (qCfg * QueueConfig ) Validate () error {
59
+ if ! qCfg .Enabled {
60
+ return nil
61
+ }
62
+ if qCfg .NumConsumers <= 0 {
63
+ return errors .New ("`num_consumers` must be positive" )
64
+ }
65
+ if qCfg .QueueSize <= 0 {
66
+ return errors .New ("`queue_size` must be positive" )
67
+ }
68
+ return nil
69
+ }
70
+
24
71
func NewQueueSender (
25
72
qSet queuebatch.Settings [request.Request ],
26
- qCfg exporterqueue. Config ,
73
+ qCfg QueueConfig ,
27
74
bCfg exporterbatcher.Config ,
28
75
exportFailureMessage string ,
29
76
next sender.Sender [request.Request ],
@@ -43,7 +90,7 @@ func NewQueueSender(
43
90
return queuebatch .NewQueueBatch (qSet , newQueueBatchConfig (qCfg , bCfg ), exportFunc )
44
91
}
45
92
46
- func newQueueBatchConfig (qCfg exporterqueue. Config , bCfg exporterbatcher.Config ) queuebatch.Config {
93
+ func newQueueBatchConfig (qCfg QueueConfig , bCfg exporterbatcher.Config ) queuebatch.Config {
47
94
qbCfg := queuebatch.Config {
48
95
Enabled : true ,
49
96
WaitForResult : ! qCfg .Enabled ,
0 commit comments