Skip to content

Commit 7580eca

Browse files
committed
Replace timer with ticker
1 parent f9197ad commit 7580eca

File tree

2 files changed

+24
-26
lines changed

2 files changed

+24
-26
lines changed

exporter/exporterhelper/internal/queuebatch/default_batcher.go

+22-24
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ import (
1616
)
1717

1818
type batch struct {
19-
ctx context.Context
20-
req request.Request
21-
done multiDone
19+
ctx context.Context
20+
req request.Request
21+
done multiDone
22+
created time.Time
2223
}
2324

2425
type batcherSettings[K any] struct {
@@ -38,7 +39,7 @@ type defaultBatcher struct {
3839
stopWG sync.WaitGroup
3940
currentBatchMu sync.Mutex
4041
currentBatch *batch
41-
timer *time.Timer
42+
ticker *time.Ticker
4243
shutdownCh chan struct{}
4344
}
4445

@@ -62,12 +63,6 @@ func newDefaultBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request])
6263
}
6364
}
6465

65-
func (qb *defaultBatcher) resetTimer() {
66-
if qb.cfg.FlushTimeout > 0 {
67-
qb.timer.Reset(qb.cfg.FlushTimeout)
68-
}
69-
}
70-
7166
func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done Done) {
7267
qb.currentBatchMu.Lock()
7368

@@ -91,11 +86,11 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
9186
// Do not flush the last item and add it to the current batch.
9287
reqList = reqList[:len(reqList)-1]
9388
qb.currentBatch = &batch{
94-
ctx: ctx,
95-
req: lastReq,
96-
done: multiDone{done},
89+
ctx: ctx,
90+
req: lastReq,
91+
done: multiDone{done},
92+
created: time.Now(),
9793
}
98-
qb.resetTimer()
9994
}
10095

10196
qb.currentBatchMu.Unlock()
@@ -146,11 +141,11 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
146141
// Do not flush the last item and add it to the current batch.
147142
reqList = reqList[:len(reqList)-1]
148143
qb.currentBatch = &batch{
149-
ctx: ctx,
150-
req: lastReq,
151-
done: multiDone{done},
144+
ctx: ctx,
145+
req: lastReq,
146+
done: multiDone{done},
147+
created: time.Now(),
152148
}
153-
qb.resetTimer()
154149
}
155150
}
156151

@@ -172,8 +167,8 @@ func (qb *defaultBatcher) startTimeBasedFlushingGoroutine() {
172167
select {
173168
case <-qb.shutdownCh:
174169
return
175-
case <-qb.timer.C:
176-
qb.flushCurrentBatchIfNecessary()
170+
case <-qb.ticker.C:
171+
qb.flushCurrentBatchIfNecessary(false)
177172
}
178173
}
179174
}()
@@ -182,27 +177,30 @@ func (qb *defaultBatcher) startTimeBasedFlushingGoroutine() {
182177
// Start starts the goroutine that reads from the queue and flushes asynchronously.
183178
func (qb *defaultBatcher) Start(_ context.Context, _ component.Host) error {
184179
if qb.cfg.FlushTimeout > 0 {
185-
qb.timer = time.NewTimer(qb.cfg.FlushTimeout)
180+
qb.ticker = time.NewTicker(qb.cfg.FlushTimeout)
186181
qb.startTimeBasedFlushingGoroutine()
187182
}
188183

189184
return nil
190185
}
191186

192187
// flushCurrentBatchIfNecessary sends out the current request batch if it is not nil
193-
func (qb *defaultBatcher) flushCurrentBatchIfNecessary() {
188+
func (qb *defaultBatcher) flushCurrentBatchIfNecessary(forceFlush bool) {
194189
qb.currentBatchMu.Lock()
195190
if qb.currentBatch == nil {
196191
qb.currentBatchMu.Unlock()
197192
return
198193
}
194+
if !forceFlush && time.Since(qb.currentBatch.created) < qb.cfg.FlushTimeout {
195+
qb.currentBatchMu.Unlock()
196+
return
197+
}
199198
batchToFlush := qb.currentBatch
200199
qb.currentBatch = nil
201200
qb.currentBatchMu.Unlock()
202201

203202
// flush() blocks until successfully started a goroutine for flushing.
204203
qb.flush(batchToFlush.ctx, batchToFlush.req, batchToFlush.done)
205-
qb.resetTimer()
206204
}
207205

208206
// flush starts a goroutine that calls consumeFunc. It blocks until a worker is available if necessary.
@@ -224,7 +222,7 @@ func (qb *defaultBatcher) flush(ctx context.Context, req request.Request, done D
224222
func (qb *defaultBatcher) Shutdown(_ context.Context) error {
225223
close(qb.shutdownCh)
226224
// Make sure execute one last flush if necessary.
227-
qb.flushCurrentBatchIfNecessary()
225+
qb.flushCurrentBatchIfNecessary(true)
228226
qb.stopWG.Wait()
229227
return nil
230228
}

exporter/exporterhelper/internal/queuebatch/queue_batch_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -590,8 +590,8 @@ func TestQueueBatchTimerFlush(t *testing.T) {
590590
assert.LessOrEqual(t, 1, sink.RequestsCount())
591591
assert.Equal(t, 8, sink.ItemsCount())
592592

593-
// Confirm that it is flushed after 100ms (using 60+50=110 here to be safe)
594-
time.Sleep(50 * time.Millisecond)
593+
// Confirm that it is flushed after 100ms (using 60+100=160 here to be safe)
594+
time.Sleep(100 * time.Millisecond)
595595
assert.LessOrEqual(t, 2, sink.RequestsCount())
596596
assert.Equal(t, 12, sink.ItemsCount())
597597
require.NoError(t, qb.Shutdown(context.Background()))

0 commit comments

Comments
 (0)