Skip to content

Commit 3f33f59

Browse files
committed
fix(tracing): Flush spans on interval instead of idle timeout
Our trace span exporter uses batching logic to avoid making excessive requests to the trace collector. It does this by either waiting for the batch buffer to hit a maximum number of spans (currently 1000), or waiting for no spans to be collected in some timeout window (currently 10s). If a workload emits spans at a slow but consistent rate, say 1 span/s, it would cause the timeout window to never be hit while the buffer slowly fills. At this rate, it would cause spans to not be emitted for a long time. This replaces the timeout window with an interval timer that attempts to send the currently buffered spans on a consistent interval, regardless of if the collector has sat idle. This interval is set to 10s to match the current idle timeout, but this can be tuned in the future along with the buffer size if we feel the need. Signed-off-by: Scott Fleener <[email protected]>
1 parent d7ea041 commit 3f33f59

File tree

1 file changed

+13
-6
lines changed
  • linkerd/opentelemetry/src

1 file changed

+13
-6
lines changed

linkerd/opentelemetry/src/lib.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ use opentelemetry_proto::{
2828
};
2929
use opentelemetry_sdk::trace::{SpanData, SpanLinks};
3030
pub use opentelemetry_sdk::{self as sdk};
31-
use tokio::{sync::mpsc, time};
31+
use tokio::{
32+
sync::mpsc,
33+
time::{self, Instant, MissedTickBehavior},
34+
};
3235
use tonic::{self as grpc, body::Body as TonicBody, client::GrpcService};
3336
use tracing::{debug, info, trace};
3437

@@ -72,7 +75,7 @@ where
7275
S: Stream<Item = ExportSpan> + Unpin,
7376
{
7477
const MAX_BATCH_SIZE: usize = 1000;
75-
const MAX_BATCH_IDLE: time::Duration = time::Duration::from_secs(10);
78+
const BATCH_INTERVAL: time::Duration = time::Duration::from_secs(10);
7679

7780
fn new(client: T, spans: S, resource: ResourceAttributesWithSchema, metrics: Registry) -> Self {
7881
Self {
@@ -191,6 +194,10 @@ where
191194
) -> Result<(), SpanRxClosed> {
192195
let mut input_accum: Vec<SpanData> = vec![];
193196

197+
let mut interval =
198+
time::interval_at(Instant::now() + Self::BATCH_INTERVAL, Self::BATCH_INTERVAL);
199+
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
200+
194201
let res = loop {
195202
if input_accum.len() == Self::MAX_BATCH_SIZE {
196203
trace!(capacity = Self::MAX_BATCH_SIZE, "Batch capacity reached");
@@ -216,11 +223,11 @@ where
216223
None => break Err(SpanRxClosed),
217224
},
218225

219-
// Don't hold spans indefinitely. Return if we hit an idle
220-
// timeout and spans have been collected.
221-
_ = time::sleep(Self::MAX_BATCH_IDLE) => {
226+
// Don't hold spans indefinitely. Return if we hit an interval tick and spans have
227+
// been collected.
228+
_ = interval.tick() => {
222229
if !input_accum.is_empty() {
223-
trace!(spans = input_accum.len(), "Flushing spans due to inactivitiy");
230+
trace!(spans = input_accum.len(), "Flushing spans due to interval tick");
224231
break Ok(());
225232
}
226233
}

0 commit comments

Comments
 (0)