Skip to content

Commit ee0f0ae

Browse files
authoredMar 21, 2025··
[chore] Move batcher to the new QueueBatcher package (#12693)
Depends on #12689 Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
1 parent 0843f2d commit ee0f0ae

File tree

7 files changed

+16
-35
lines changed

7 files changed

+16
-35
lines changed
 

‎exporter/exporterhelper/internal/batcher/package_test.go

-14
This file was deleted.

‎exporter/exporterhelper/internal/queue_sender.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111

1212
"go.opentelemetry.io/collector/component"
1313
"go.opentelemetry.io/collector/exporter/exporterbatcher"
14-
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/batcher"
1514
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1615
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1716
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
@@ -41,7 +40,7 @@ func NewQueueSender(
4140

4241
type QueueBatch struct {
4342
queue queuebatch.Queue[request.Request]
44-
batcher batcher.Batcher[request.Request]
43+
batcher queuebatch.Batcher[request.Request]
4544
}
4645

4746
func NewQueueBatch(
@@ -50,7 +49,7 @@ func NewQueueBatch(
5049
bCfg exporterbatcher.Config,
5150
next sender.SendFunc[request.Request],
5251
) (*QueueBatch, error) {
53-
b, err := batcher.NewBatcher(bCfg, next, qCfg.NumConsumers)
52+
b, err := queuebatch.NewBatcher(bCfg, next, qCfg.NumConsumers)
5453
if err != nil {
5554
return nil, err
5655
}

‎exporter/exporterhelper/internal/batcher/batcher.go ‎exporter/exporterhelper/internal/queuebatch/batcher.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,21 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package batcher // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/batcher"
4+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
55

66
import (
77
"context"
88

99
"go.opentelemetry.io/collector/component"
1010
"go.opentelemetry.io/collector/exporter/exporterbatcher"
11-
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1211
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1312
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1413
)
1514

1615
// Batcher is in charge of reading items from the queue and send them out asynchronously.
1716
type Batcher[K any] interface {
1817
component.Component
19-
Consume(context.Context, K, queuebatch.Done)
18+
Consume(context.Context, K, Done)
2019
}
2120

2221
func NewBatcher(batchCfg exporterbatcher.Config, exportFunc sender.SendFunc[request.Request], maxWorkers int) (Batcher[request.Request], error) {

‎exporter/exporterhelper/internal/batcher/default_batcher.go ‎exporter/exporterhelper/internal/queuebatch/default_batcher.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package batcher // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/batcher"
4+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
55

66
import (
77
"context"
@@ -12,7 +12,6 @@ import (
1212

1313
"go.opentelemetry.io/collector/component"
1414
"go.opentelemetry.io/collector/exporter/exporterbatcher"
15-
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1615
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1716
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1817
)
@@ -62,7 +61,7 @@ func (qb *defaultBatcher) resetTimer() {
6261
}
6362
}
6463

65-
func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done queuebatch.Done) {
64+
func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done Done) {
6665
qb.currentBatchMu.Lock()
6766

6867
if qb.currentBatch == nil {
@@ -200,7 +199,7 @@ func (qb *defaultBatcher) flushCurrentBatchIfNecessary() {
200199
}
201200

202201
// flush starts a goroutine that calls consumeFunc. It blocks until a worker is available if necessary.
203-
func (qb *defaultBatcher) flush(ctx context.Context, req request.Request, done queuebatch.Done) {
202+
func (qb *defaultBatcher) flush(ctx context.Context, req request.Request, done Done) {
204203
qb.stopWG.Add(1)
205204
if qb.workerPool != nil {
206205
<-qb.workerPool
@@ -223,7 +222,7 @@ func (qb *defaultBatcher) Shutdown(_ context.Context) error {
223222
return nil
224223
}
225224

226-
type multiDone []queuebatch.Done
225+
type multiDone []Done
227226

228227
func (mdc multiDone) OnDone(err error) {
229228
for _, d := range mdc {
@@ -232,13 +231,13 @@ func (mdc multiDone) OnDone(err error) {
232231
}
233232

234233
type refCountDone struct {
235-
done queuebatch.Done
234+
done Done
236235
mu sync.Mutex
237236
refCount int64
238237
err error
239238
}
240239

241-
func newRefCountDone(done queuebatch.Done, refCount int64) queuebatch.Done {
240+
func newRefCountDone(done Done, refCount int64) Done {
242241
return &refCountDone{
243242
done: done,
244243
refCount: refCount,

‎exporter/exporterhelper/internal/batcher/default_batcher_test.go ‎exporter/exporterhelper/internal/queuebatch/default_batcher_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package batcher
4+
package queuebatch
55

66
import (
77
"context"

‎exporter/exporterhelper/internal/batcher/disabled_batcher.go ‎exporter/exporterhelper/internal/queuebatch/disabled_batcher.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package batcher // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/batcher"
4+
package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
55

66
import (
77
"context"
88

99
"go.opentelemetry.io/collector/component"
10-
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1110
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
1211
)
1312

@@ -19,7 +18,7 @@ type disabledBatcher[T any] struct {
1918
consumeFunc sender.SendFunc[T]
2019
}
2120

22-
func (db *disabledBatcher[T]) Consume(ctx context.Context, req T, done queuebatch.Done) {
21+
func (db *disabledBatcher[T]) Consume(ctx context.Context, req T, done Done) {
2322
done.OnDone(db.consumeFunc(ctx, req))
2423
}
2524

‎exporter/exporterhelper/internal/batcher/disabled_batcher_test.go ‎exporter/exporterhelper/internal/queuebatch/disabled_batcher_test.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package batcher
4+
package queuebatch
55

66
import (
77
"context"
@@ -14,7 +14,6 @@ import (
1414

1515
"go.opentelemetry.io/collector/component/componenttest"
1616
"go.opentelemetry.io/collector/exporter/exporterbatcher"
17-
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
1817
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1918
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
2019
"go.opentelemetry.io/collector/exporter/exporterqueue"
@@ -45,9 +44,9 @@ func TestDisabledBatcher_Basic(t *testing.T) {
4544
ba, err := NewBatcher(cfg, sink.Export, tt.maxWorkers)
4645
require.NoError(t, err)
4746

48-
q := queuebatch.NewQueue[request.Request](
47+
q := NewQueue[request.Request](
4948
context.Background(),
50-
queuebatch.QueueSettings[request.Request]{
49+
QueueSettings[request.Request]{
5150
Signal: pipeline.SignalTraces,
5251
ExporterSettings: exportertest.NewNopSettings(exportertest.NopType),
5352
},

0 commit comments

Comments
 (0)
Please sign in to comment.