Skip to content

Commit 922a798

Browse files
authored
Add support for wait_for_result, remove disabled_queue (#12742)
In this PR, we merge the functionality from "disabled_queue" within the memory queue. For the moment this functionality is not available for the persistent queue. In the next PR, will merge the `internal.QueueConfig` with `queuebatch.Config`. Signed-off-by: Bogdan Drutu <[email protected]>
1 parent cf49766 commit 922a798

14 files changed

+531
-397
lines changed

.chloggen/add_wait_for_response.yaml

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support for wait_for_result, remove disabled_queue
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12742]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: This has a side effect for users of the experimental BatchConfig with the queue disabled, since not this is |
19+
uses only NumCPU() consumers.
20+
21+
# Optional: The change log or logs in which this entry should be included.
22+
# e.g. '[user]' or '[user, api]'
23+
# Include 'user' if the change is relevant to end users.
24+
# Include 'api' if there is a change to a library API.
25+
# Default: '[user]'
26+
change_logs: [user]

exporter/exporterhelper/internal/queue_sender.go

+48-16
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"context"
88
"errors"
99
"fmt"
10+
"math"
11+
"runtime"
1012
"time"
1113

1214
"go.uber.org/zap"
@@ -47,6 +49,10 @@ type QueueConfig struct {
4749
// Enabled indicates whether to not enqueue batches before exporting.
4850
Enabled bool `mapstructure:"enabled"`
4951

52+
// WaitForResult determines if incoming requests are blocked until the request is processed or not.
53+
// Currently, this option is not available when persistent queue is configured using the storage configuration.
54+
WaitForResult bool `mapstructure:"wait_for_result"`
55+
5056
// Sizer determines the type of size measurement used by this component.
5157
// It accepts "requests", "items", or "bytes".
5258
Sizer request.SizerType `mapstructure:"sizer"`
@@ -99,11 +105,14 @@ func (qCfg *QueueConfig) Validate() error {
99105
return errors.New("`queue_size` must be positive")
100106
}
101107

108+
if qCfg.StorageID != nil && qCfg.WaitForResult {
109+
return errors.New("`wait_for_result` is not supported with a persistent queue configured with `storage`")
110+
}
111+
102112
// Only support request sizer for persistent queue at this moment.
103113
if qCfg.StorageID != nil && qCfg.Sizer != request.SizerTypeRequests {
104-
return errors.New("persistent queue only supports `requests` sizer")
114+
return errors.New("persistent queue configured with `storage` only supports `requests` sizer")
105115
}
106-
107116
return nil
108117
}
109118

@@ -133,20 +142,43 @@ func NewQueueSender(
133142
}
134143

135144
func newQueueBatchConfig(qCfg QueueConfig, bCfg BatcherConfig) queuebatch.Config {
136-
qbCfg := queuebatch.Config{
137-
Enabled: true,
138-
WaitForResult: !qCfg.Enabled,
139-
Sizer: qCfg.Sizer,
140-
QueueSize: qCfg.QueueSize,
141-
NumConsumers: qCfg.NumConsumers,
142-
BlockOnOverflow: qCfg.BlockOnOverflow,
143-
StorageID: qCfg.StorageID,
144-
}
145-
if bCfg.Enabled {
146-
qbCfg.Batch = &queuebatch.BatchConfig{
147-
FlushTimeout: bCfg.FlushTimeout,
148-
MinSize: bCfg.MinSize,
149-
MaxSize: bCfg.MaxSize,
145+
var qbCfg queuebatch.Config
146+
// User configured queueing, copy all config.
147+
if qCfg.Enabled {
148+
qbCfg = queuebatch.Config{
149+
Enabled: true,
150+
WaitForResult: qCfg.WaitForResult,
151+
Sizer: qCfg.Sizer,
152+
QueueSize: qCfg.QueueSize,
153+
NumConsumers: qCfg.NumConsumers,
154+
BlockOnOverflow: qCfg.BlockOnOverflow,
155+
StorageID: qCfg.StorageID,
156+
// TODO: Copy batching configuration as well when available.
157+
}
158+
// TODO: Remove this when WithBatcher is removed.
159+
if bCfg.Enabled {
160+
qbCfg.Batch = &queuebatch.BatchConfig{
161+
FlushTimeout: bCfg.FlushTimeout,
162+
MinSize: bCfg.MinSize,
163+
MaxSize: bCfg.MaxSize,
164+
}
165+
}
166+
} else {
167+
// This can happen only if the deprecated way to configure batching is used with a "disabled" queue.
168+
// TODO: Remove this when WithBatcher is removed.
169+
qbCfg = queuebatch.Config{
170+
Enabled: true,
171+
WaitForResult: true,
172+
Sizer: request.SizerTypeRequests,
173+
QueueSize: math.MaxInt,
174+
NumConsumers: runtime.NumCPU(),
175+
BlockOnOverflow: true,
176+
StorageID: nil,
177+
Batch: &queuebatch.BatchConfig{
178+
FlushTimeout: bCfg.FlushTimeout,
179+
MinSize: bCfg.MinSize,
180+
MaxSize: bCfg.MaxSize,
181+
},
150182
}
151183
}
152184
return qbCfg

exporter/exporterhelper/internal/queuebatch/async_queue_test.go

+113-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestAsyncMemoryQueue(t *testing.T) {
3434
func TestAsyncMemoryQueueBlocking(t *testing.T) {
3535
consumed := &atomic.Int64{}
3636
ac := newAsyncQueue(
37-
newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 100, blocking: true}),
37+
newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 100, blockOnOverflow: true}),
3838
4, func(_ context.Context, _ int64, done Done) {
3939
consumed.Add(1)
4040
done.OnDone(nil)
@@ -58,7 +58,7 @@ func TestAsyncMemoryQueueBlocking(t *testing.T) {
5858
func TestAsyncMemoryQueueBlockingCancelled(t *testing.T) {
5959
stop := make(chan struct{})
6060
ac := newAsyncQueue(
61-
newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 10, blocking: true}),
61+
newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 10, blockOnOverflow: true}),
6262
1, func(_ context.Context, _ int64, done Done) {
6363
<-stop
6464
done.OnDone(nil)
@@ -102,3 +102,114 @@ func BenchmarkAsyncMemoryQueue(b *testing.B) {
102102
require.NoError(b, ac.Shutdown(context.Background()))
103103
assert.EqualValues(b, b.N, consumed.Load())
104104
}
105+
106+
func TestMemoryQueueMultiThread(t *testing.T) {
107+
buf := newBuffer()
108+
wg := sync.WaitGroup{}
109+
q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 1000, blockOnOverflow: true})
110+
buf.start()
111+
ac := newAsyncQueue(q, 3, buf.consume)
112+
require.NoError(t, ac.Start(context.Background(), componenttest.NewNopHost()))
113+
114+
for i := 0; i < 10; i++ {
115+
wg.Add(1)
116+
go func() {
117+
defer wg.Done()
118+
for j := 0; j < 10_000; j++ {
119+
assert.NoError(t, ac.Offer(context.Background(), int64(1)))
120+
}
121+
}()
122+
}
123+
124+
wg.Wait()
125+
require.NoError(t, ac.Shutdown(context.Background()))
126+
buf.shutdown()
127+
assert.Equal(t, int64(10*10_000), buf.consumed())
128+
}
129+
130+
func TestMemoryQueueWaitForResponseMultiThread(t *testing.T) {
131+
buf := newBuffer()
132+
wg := sync.WaitGroup{}
133+
q := newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 1000, waitForResult: true, blockOnOverflow: true})
134+
buf.start()
135+
ac := newAsyncQueue(q, 3, buf.consume)
136+
require.NoError(t, ac.Start(context.Background(), componenttest.NewNopHost()))
137+
138+
for i := 0; i < 10; i++ {
139+
wg.Add(1)
140+
go func() {
141+
defer wg.Done()
142+
for j := 0; j < 10_000; j++ {
143+
assert.NoError(t, ac.Offer(context.Background(), int64(1)))
144+
}
145+
}()
146+
}
147+
148+
wg.Wait()
149+
require.NoError(t, ac.Shutdown(context.Background()))
150+
buf.shutdown()
151+
assert.Equal(t, int64(10*10_000), buf.consumed())
152+
}
153+
154+
const flushNum = 5
155+
156+
type buffer struct {
157+
ch chan Done
158+
nr *atomic.Int64
159+
wg sync.WaitGroup
160+
dones []Done
161+
}
162+
163+
func newBuffer() *buffer {
164+
buf := &buffer{
165+
ch: make(chan Done, 10),
166+
nr: &atomic.Int64{},
167+
dones: make([]Done, 0, flushNum),
168+
}
169+
return buf
170+
}
171+
172+
func (buf *buffer) consume(_ context.Context, _ int64, done Done) {
173+
buf.ch <- done
174+
}
175+
176+
func (buf *buffer) start() {
177+
buf.wg.Add(1)
178+
go func() {
179+
defer buf.wg.Done()
180+
for {
181+
select {
182+
case done, ok := <-buf.ch:
183+
if !ok {
184+
return
185+
}
186+
buf.dones = append(buf.dones, done)
187+
if len(buf.dones) == flushNum {
188+
buf.flush()
189+
}
190+
case <-time.After(10 * time.Millisecond):
191+
buf.flush()
192+
}
193+
}
194+
}()
195+
}
196+
197+
func (buf *buffer) shutdown() {
198+
close(buf.ch)
199+
buf.wg.Wait()
200+
}
201+
202+
func (buf *buffer) flush() {
203+
if len(buf.dones) == 0 {
204+
return
205+
}
206+
buf.nr.Add(int64(len(buf.dones)))
207+
for _, done := range buf.dones {
208+
done.OnDone(nil)
209+
}
210+
buf.dones = buf.dones[:0]
211+
}
212+
213+
func (buf *buffer) consumed() int64 {
214+
return buf.nr.Load()
215+
}

exporter/exporterhelper/internal/queuebatch/config.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type Config struct {
3737
StorageID *component.ID `mapstructure:"storage"`
3838

3939
// NumConsumers is the maximum number of concurrent consumers from the queue.
40-
// This applies across all different optional configurations from above (e.g. wait_for_result, blocking, persistent, etc.).
40+
// This applies across all different optional configurations from above (e.g. wait_for_result, blockOnOverflow, persistent, etc.).
4141
// TODO: This will also control the maximum number of shards, when supported:
4242
// https://github.com/open-telemetry/opentelemetry-collector/issues/12473.
4343
NumConsumers int `mapstructure:"num_consumers"`
@@ -64,6 +64,12 @@ func (cfg *Config) Validate() error {
6464
if cfg.StorageID != nil && cfg.WaitForResult {
6565
return errors.New("`wait_for_result` is not supported with a persistent queue configured with `storage`")
6666
}
67+
68+
// Only support request sizer for persistent queue at this moment.
69+
if cfg.StorageID != nil && cfg.Sizer != request.SizerTypeRequests {
70+
return errors.New("persistent queue configured with `storage` only supports `requests` sizer")
71+
}
72+
6773
return nil
6874
}
6975

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package queuebatch
5+
6+
import (
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
13+
"go.opentelemetry.io/collector/component"
14+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
15+
)
16+
17+
func TestConfig_Validate(t *testing.T) {
18+
cfg := newTestConfig()
19+
require.NoError(t, cfg.Validate())
20+
21+
cfg.NumConsumers = 0
22+
require.EqualError(t, cfg.Validate(), "`num_consumers` must be positive")
23+
24+
cfg = newTestConfig()
25+
cfg.QueueSize = 0
26+
require.EqualError(t, cfg.Validate(), "`queue_size` must be positive")
27+
28+
cfg = newTestConfig()
29+
cfg.QueueSize = 0
30+
require.EqualError(t, cfg.Validate(), "`queue_size` must be positive")
31+
32+
storageID := component.MustNewID("test")
33+
cfg = newTestConfig()
34+
cfg.WaitForResult = true
35+
cfg.StorageID = &storageID
36+
require.EqualError(t, cfg.Validate(), "`wait_for_result` is not supported with a persistent queue configured with `storage`")
37+
38+
cfg = newTestConfig()
39+
cfg.Sizer = request.SizerTypeBytes
40+
cfg.StorageID = &storageID
41+
require.EqualError(t, cfg.Validate(), "persistent queue configured with `storage` only supports `requests` sizer")
42+
43+
// Confirm Validate doesn't return error with invalid config when feature is disabled
44+
cfg.Enabled = false
45+
assert.NoError(t, cfg.Validate())
46+
}
47+
48+
func TestBatchConfig_Validate(t *testing.T) {
49+
cfg := newTestBatchConfig()
50+
require.NoError(t, cfg.Validate())
51+
52+
cfg = newTestBatchConfig()
53+
cfg.FlushTimeout = 0
54+
require.EqualError(t, cfg.Validate(), "`flush_timeout` must be positive")
55+
56+
cfg = newTestBatchConfig()
57+
cfg.MinSize = -1
58+
require.EqualError(t, cfg.Validate(), "`min_size` must be non-negative")
59+
60+
cfg = newTestBatchConfig()
61+
cfg.MaxSize = -1
62+
require.EqualError(t, cfg.Validate(), "`max_size` must be non-negative")
63+
64+
cfg = newTestBatchConfig()
65+
cfg.MinSize = 2048
66+
cfg.MaxSize = 1024
67+
require.EqualError(t, cfg.Validate(), "`max_size` must be greater or equal to `min_size`")
68+
}
69+
70+
func newTestBatchConfig() BatchConfig {
71+
return BatchConfig{
72+
FlushTimeout: 200 * time.Millisecond,
73+
MinSize: 2048,
74+
MaxSize: 0,
75+
}
76+
}

exporter/exporterhelper/internal/queuebatch/disabled_batcher_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ func TestDisabledBatcher(t *testing.T) {
3737
ba := newDisabledBatcher(sink.Export)
3838

3939
mq := newMemoryQueue[request.Request](memoryQueueSettings[request.Request]{
40-
sizer: request.RequestsSizer[request.Request]{},
41-
capacity: 1000,
42-
blocking: true,
40+
sizer: request.RequestsSizer[request.Request]{},
41+
capacity: 1000,
42+
blockOnOverflow: true,
4343
})
4444
q := newAsyncQueue(mq, tt.maxWorkers, ba.Consume)
4545

0 commit comments

Comments
 (0)