Skip to content

Commit 54222b6

Browse files
authored
Define the final stable config for QueueBatch component (#12660)
Add this as a separate config than exporterqueue.Config until the queue and batcher are changed to support this new config. Once approved, will merge and in sub-sequent PRs will change the components to accept this new config and expose it to the users. Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
1 parent 41a9ea7 commit 54222b6

1 file changed

Lines changed: 104 additions & 0 deletions

File tree

  • exporter/exporterhelper/internal/queuebatch
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
5+
6+
import (
7+
"errors"
8+
"time"
9+
10+
"go.opentelemetry.io/collector/component"
11+
"go.opentelemetry.io/collector/exporter/exporterbatcher"
12+
)
13+
14+
// Config defines configuration for queueing and batching incoming requests.
15+
type Config struct {
16+
// Enabled indicates whether to not enqueue and batch before exporting.
17+
Enabled bool `mapstructure:"enabled"`
18+
19+
// WaitForResult determines if incoming requests are blocked until the request is processed or not.
20+
// Currently, this option is not available when persistent queue is configured using the storage configuration.
21+
WaitForResult bool `mapstructure:"wait_for_result"`
22+
23+
// Sizer determines the type of size measurement used by this component.
24+
// It accepts "requests", "items", or "bytes".
25+
Sizer exporterbatcher.SizerType `mapstructure:"sizer"`
26+
27+
// QueueSize represents the maximum data size allowed for concurrent storage and processing.
28+
QueueSize int `mapstructure:"queue_size"`
29+
30+
// BlockOnOverflow determines the behavior when the component's TotalSize limit is reached.
31+
// If true, the component will wait for space; otherwise, operations will immediately return a retryable error.
32+
BlockOnOverflow bool `mapstructure:"block_on_overflow"`
33+
34+
// StorageID if not empty, enables the persistent storage and uses the component specified
35+
// as a storage extension for the persistent queue.
36+
// TODO: This will be changed to Optional when available.
37+
StorageID *component.ID `mapstructure:"storage"`
38+
39+
// NumConsumers is the maximum number of concurrent consumers from the queue.
40+
// This applies across all different optional configurations from above (e.g. wait_for_result, blocking, persistent, etc.).
41+
// TODO: This will also control the maximum number of shards, when supported:
42+
// https://github.com/open-telemetry/opentelemetry-collector/issues/12473.
43+
NumConsumers int `mapstructure:"num_consumers"`
44+
45+
// BatchConfig it configures how the requests are consumed from the queue and batch together during consumption.
46+
// TODO: This will be changed to Optional when available.
47+
BatchConfig *BatchConfig `mapstructure:"batch"`
48+
}
49+
50+
// Validate checks if the Config is valid
51+
func (cfg *Config) Validate() error {
52+
if !cfg.Enabled {
53+
return nil
54+
}
55+
56+
if cfg.NumConsumers <= 0 {
57+
return errors.New("`num_consumers` must be positive")
58+
}
59+
60+
if cfg.QueueSize <= 0 {
61+
return errors.New("`queue_size` must be positive")
62+
}
63+
64+
if cfg.StorageID != nil && cfg.WaitForResult {
65+
return errors.New("`wait_for_result` is not supported with a persistent queue configured with `storage`")
66+
}
67+
return nil
68+
}
69+
70+
// BatchConfig defines a configuration for batching requests based on a timeout and a minimum number of items.
71+
type BatchConfig struct {
72+
// FlushTimeout sets the time after which a batch will be sent regardless of its size.
73+
FlushTimeout time.Duration `mapstructure:"flush_timeout"`
74+
75+
// MinSize defines the configuration for the minimum size of a batch.
76+
MinSize int `mapstructure:"min_size"`
77+
78+
// MaxSize defines the configuration for the maximum size of a batch.
79+
MaxSize int `mapstructure:"max_size"`
80+
}
81+
82+
func (cfg *BatchConfig) Validate() error {
83+
if cfg == nil {
84+
return nil
85+
}
86+
87+
if cfg.FlushTimeout <= 0 {
88+
return errors.New("`flush_timeout` must be positive")
89+
}
90+
91+
if cfg.MinSize < 0 {
92+
return errors.New("`min_size` must be non-negative")
93+
}
94+
95+
if cfg.MaxSize < 0 {
96+
return errors.New("`max_size` must be non-negative")
97+
}
98+
99+
if cfg.MaxSize > 0 && cfg.MaxSize < cfg.MinSize {
100+
return errors.New("`max_size` must be greater or equal to `min_size`")
101+
}
102+
103+
return nil
104+
}

0 commit comments

Comments
 (0)