@@ -5,10 +5,12 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe
5
5
6
6
import (
7
7
"context"
8
+ "errors"
9
+ "fmt"
10
+ "time"
8
11
9
12
"go.uber.org/zap"
10
13
11
- "go.opentelemetry.io/collector/exporter/exporterbatcher"
12
14
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
13
15
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
14
16
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
@@ -17,14 +19,14 @@ import (
17
19
18
20
// QueueBatchSettings is a subset of the queuebatch.Settings that are needed when used within an Exporter.
19
21
type QueueBatchSettings [K any ] struct {
20
- Encoding exporterqueue .Encoding [K ]
21
- Sizers map [exporterbatcher .SizerType ]queuebatch .Sizer [K ]
22
+ Encoding queuebatch .Encoding [K ]
23
+ Sizers map [request .SizerType ]request .Sizer [K ]
22
24
}
23
25
24
26
func NewQueueSender (
25
27
qSet queuebatch.Settings [request.Request ],
26
28
qCfg exporterqueue.Config ,
27
- bCfg exporterbatcher. Config ,
29
+ bCfg BatcherConfig ,
28
30
exportFailureMessage string ,
29
31
next sender.Sender [request.Request ],
30
32
) (sender.Sender [request.Request ], error ) {
@@ -43,11 +45,11 @@ func NewQueueSender(
43
45
return queuebatch .NewQueueBatch (qSet , newQueueBatchConfig (qCfg , bCfg ), exportFunc )
44
46
}
45
47
46
- func newQueueBatchConfig (qCfg exporterqueue.Config , bCfg exporterbatcher. Config ) queuebatch.Config {
48
+ func newQueueBatchConfig (qCfg exporterqueue.Config , bCfg BatcherConfig ) queuebatch.Config {
47
49
qbCfg := queuebatch.Config {
48
50
Enabled : true ,
49
51
WaitForResult : ! qCfg .Enabled ,
50
- Sizer : exporterbatcher .SizerTypeRequests ,
52
+ Sizer : request .SizerTypeRequests ,
51
53
QueueSize : qCfg .QueueSize ,
52
54
NumConsumers : qCfg .NumConsumers ,
53
55
BlockOnOverflow : qCfg .Blocking ,
@@ -65,3 +67,62 @@ func newQueueBatchConfig(qCfg exporterqueue.Config, bCfg exporterbatcher.Config)
65
67
}
66
68
return qbCfg
67
69
}
70
+
71
+ // BatcherConfig defines a configuration for batching requests based on a timeout and a minimum number of items.
72
+ // Experimental: This API is at the early stage of development and may change without backward compatibility
73
+ // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
74
+ type BatcherConfig struct {
75
+ // Enabled indicates whether to not enqueue batches before sending to the consumerSender.
76
+ Enabled bool `mapstructure:"enabled"`
77
+
78
+ // FlushTimeout sets the time after which a batch will be sent regardless of its size.
79
+ FlushTimeout time.Duration `mapstructure:"flush_timeout"`
80
+
81
+ // SizeConfig sets the size limits for a batch.
82
+ SizeConfig `mapstructure:",squash"`
83
+ }
84
+
85
+ // SizeConfig sets the size limits for a batch.
86
+ type SizeConfig struct {
87
+ Sizer request.SizerType `mapstructure:"sizer"`
88
+
89
+ // MinSize defines the configuration for the minimum size of a batch.
90
+ MinSize int `mapstructure:"min_size"`
91
+ // MaxSize defines the configuration for the maximum size of a batch.
92
+ MaxSize int `mapstructure:"max_size"`
93
+ }
94
+
95
+ func (c * BatcherConfig ) Validate () error {
96
+ if c .FlushTimeout <= 0 {
97
+ return errors .New ("`flush_timeout` must be greater than zero" )
98
+ }
99
+
100
+ return nil
101
+ }
102
+
103
+ func (c SizeConfig ) Validate () error {
104
+ if c .Sizer != request .SizerTypeItems {
105
+ return fmt .Errorf ("unsupported sizer type: %q" , c .Sizer )
106
+ }
107
+ if c .MinSize < 0 {
108
+ return errors .New ("`min_size` must be greater than or equal to zero" )
109
+ }
110
+ if c .MaxSize < 0 {
111
+ return errors .New ("`max_size` must be greater than or equal to zero" )
112
+ }
113
+ if c .MaxSize != 0 && c .MaxSize < c .MinSize {
114
+ return errors .New ("`max_size` must be greater than or equal to mix_size" )
115
+ }
116
+ return nil
117
+ }
118
+
119
+ func NewDefaultBatcherConfig () BatcherConfig {
120
+ return BatcherConfig {
121
+ Enabled : true ,
122
+ FlushTimeout : 200 * time .Millisecond ,
123
+ SizeConfig : SizeConfig {
124
+ Sizer : request .SizerTypeItems ,
125
+ MinSize : 8192 ,
126
+ },
127
+ }
128
+ }
0 commit comments