Skip to content

Commit d2a6b3b

Browse files
lalitbcijothomas
andauthored
BatchSpanProcessor optimizations - Separate control signal queue, and wake up background thread only when required. (#2526)
Co-authored-by: Cijo Thomas <[email protected]>
1 parent 78db32c commit d2a6b3b

File tree

1 file changed

+180
-35
lines changed

1 file changed

+180
-35
lines changed

opentelemetry-sdk/src/trace/span_processor.rs

+180-35
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
8585
/// `on_end` is called after a `Span` is ended (i.e., the end timestamp is
8686
/// already set). This method is called synchronously within the `Span::end`
8787
/// API, therefore it should not block or throw an exception.
88+
/// TODO - This method should take reference to `SpanData`
8889
fn on_end(&self, span: SpanData);
8990
/// Force the spans lying in the cache to be exported.
9091
fn force_flush(&self) -> TraceResult<()>;
@@ -163,6 +164,7 @@ impl SpanProcessor for SimpleSpanProcessor {
163164
}
164165
}
165166

167+
use crate::export::trace::ExportResult;
166168
/// The `BatchSpanProcessor` collects finished spans in a buffer and exports them
167169
/// in batches to the configured `SpanExporter`. This processor is ideal for
168170
/// high-throughput environments, as it minimizes the overhead of exporting spans
@@ -217,16 +219,17 @@ impl SpanProcessor for SimpleSpanProcessor {
217219
/// provider.shutdown();
218220
/// }
219221
/// ```
220-
use futures_executor::block_on;
221222
use std::sync::mpsc::sync_channel;
223+
use std::sync::mpsc::Receiver;
222224
use std::sync::mpsc::RecvTimeoutError;
223225
use std::sync::mpsc::SyncSender;
224226

225227
/// Messages exchanged between the main thread and the background thread.
226228
#[allow(clippy::large_enum_variant)]
227229
#[derive(Debug)]
228230
enum BatchMessage {
229-
ExportSpan(SpanData),
231+
//ExportSpan(SpanData),
232+
ExportSpan(Arc<AtomicBool>),
230233
ForceFlush(SyncSender<TraceResult<()>>),
231234
Shutdown(SyncSender<TraceResult<()>>),
232235
SetResource(Arc<Resource>),
@@ -235,12 +238,17 @@ enum BatchMessage {
235238
/// A batch span processor with a dedicated background thread.
236239
#[derive(Debug)]
237240
pub struct BatchSpanProcessor {
238-
message_sender: SyncSender<BatchMessage>,
241+
span_sender: SyncSender<SpanData>, // Data channel to store spans
242+
message_sender: SyncSender<BatchMessage>, // Control channel to store control messages.
239243
handle: Mutex<Option<thread::JoinHandle<()>>>,
240244
forceflush_timeout: Duration,
241245
shutdown_timeout: Duration,
242246
is_shutdown: AtomicBool,
243247
dropped_span_count: Arc<AtomicUsize>,
248+
export_span_message_sent: Arc<AtomicBool>,
249+
current_batch_size: Arc<AtomicUsize>,
250+
max_export_batch_size: usize,
251+
max_queue_size: usize,
244252
}
245253

246254
impl BatchSpanProcessor {
@@ -255,7 +263,12 @@ impl BatchSpanProcessor {
255263
where
256264
E: SpanExporter + Send + 'static,
257265
{
258-
let (message_sender, message_receiver) = sync_channel(config.max_queue_size);
266+
let (span_sender, span_receiver) = sync_channel::<SpanData>(config.max_queue_size);
267+
let (message_sender, message_receiver) = sync_channel::<BatchMessage>(64); // Is this a reasonable bound?
268+
let max_queue_size = config.max_queue_size;
269+
let max_export_batch_size = config.max_export_batch_size;
270+
let current_batch_size = Arc::new(AtomicUsize::new(0));
271+
let current_batch_size_for_thread = current_batch_size.clone();
259272

260273
let handle = thread::Builder::new()
261274
.name("OpenTelemetry.Traces.BatchProcessor".to_string())
@@ -268,7 +281,7 @@ impl BatchSpanProcessor {
268281
);
269282
let mut spans = Vec::with_capacity(config.max_export_batch_size);
270283
let mut last_export_time = Instant::now();
271-
284+
let current_batch_size = current_batch_size_for_thread;
272285
loop {
273286
let remaining_time_option = config
274287
.scheduled_delay
@@ -279,44 +292,71 @@ impl BatchSpanProcessor {
279292
};
280293
match message_receiver.recv_timeout(remaining_time) {
281294
Ok(message) => match message {
282-
BatchMessage::ExportSpan(span) => {
283-
spans.push(span);
284-
if spans.len() >= config.max_queue_size
285-
|| last_export_time.elapsed() >= config.scheduled_delay
286-
{
287-
if let Err(err) = block_on(exporter.export(spans.split_off(0)))
288-
{
289-
otel_error!(
290-
name: "BatchSpanProcessor.ExportError",
291-
error = format!("{}", err)
292-
);
293-
}
294-
last_export_time = Instant::now();
295-
}
295+
BatchMessage::ExportSpan(export_span_message_sent) => {
296+
// Reset the export span message sent flag now it has has been processed.
297+
export_span_message_sent.store(false, Ordering::Relaxed);
298+
otel_debug!(
299+
name: "BatchSpanProcessor.ExportingDueToBatchSize",
300+
);
301+
let _ = Self::get_spans_and_export(
302+
&span_receiver,
303+
&mut exporter,
304+
&mut spans,
305+
&mut last_export_time,
306+
&current_batch_size,
307+
&config,
308+
);
296309
}
297310
BatchMessage::ForceFlush(sender) => {
298-
let result = block_on(exporter.export(spans.split_off(0)));
311+
otel_debug!(name: "BatchSpanProcessor.ExportingDueToForceFlush");
312+
let result = Self::get_spans_and_export(
313+
&span_receiver,
314+
&mut exporter,
315+
&mut spans,
316+
&mut last_export_time,
317+
&current_batch_size,
318+
&config,
319+
);
299320
let _ = sender.send(result);
300321
}
301322
BatchMessage::Shutdown(sender) => {
302-
let result = block_on(exporter.export(spans.split_off(0)));
323+
otel_debug!(name: "BatchSpanProcessor.ExportingDueToShutdown");
324+
let result = Self::get_spans_and_export(
325+
&span_receiver,
326+
&mut exporter,
327+
&mut spans,
328+
&mut last_export_time,
329+
&current_batch_size,
330+
&config,
331+
);
303332
let _ = sender.send(result);
333+
334+
otel_debug!(
335+
name: "BatchSpanProcessor.ThreadExiting",
336+
reason = "ShutdownRequested"
337+
);
338+
//
339+
// break out the loop and return from the current background thread.
340+
//
304341
break;
305342
}
306343
BatchMessage::SetResource(resource) => {
307344
exporter.set_resource(&resource);
308345
}
309346
},
310347
Err(RecvTimeoutError::Timeout) => {
311-
if last_export_time.elapsed() >= config.scheduled_delay {
312-
if let Err(err) = block_on(exporter.export(spans.split_off(0))) {
313-
otel_error!(
314-
name: "BatchSpanProcessor.ExportError",
315-
error = format!("{}", err)
316-
);
317-
}
318-
last_export_time = Instant::now();
319-
}
348+
otel_debug!(
349+
name: "BatchSpanProcessor.ExportingDueToTimer",
350+
);
351+
352+
let _ = Self::get_spans_and_export(
353+
&span_receiver,
354+
&mut exporter,
355+
&mut spans,
356+
&mut last_export_time,
357+
&current_batch_size,
358+
&config,
359+
);
320360
}
321361
Err(RecvTimeoutError::Disconnected) => {
322362
// Channel disconnected, only thing to do is break
@@ -336,12 +376,17 @@ impl BatchSpanProcessor {
336376
.expect("Failed to spawn thread"); //TODO: Handle thread spawn failure
337377

338378
Self {
379+
span_sender,
339380
message_sender,
340381
handle: Mutex::new(Some(handle)),
341382
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
342383
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
343384
is_shutdown: AtomicBool::new(false),
344385
dropped_span_count: Arc::new(AtomicUsize::new(0)),
386+
max_queue_size,
387+
export_span_message_sent: Arc::new(AtomicBool::new(false)),
388+
current_batch_size,
389+
max_export_batch_size,
345390
}
346391
}
347392

@@ -355,6 +400,72 @@ impl BatchSpanProcessor {
355400
config: BatchConfig::default(),
356401
}
357402
}
403+
404+
// This method gets upto `max_export_batch_size` amount of spans from the channel and exports them.
405+
// It returns the result of the export operation.
406+
// It expects the span vec to be empty when it's called.
407+
#[inline]
408+
fn get_spans_and_export<E>(
409+
spans_receiver: &Receiver<SpanData>,
410+
exporter: &mut E,
411+
spans: &mut Vec<SpanData>,
412+
last_export_time: &mut Instant,
413+
current_batch_size: &AtomicUsize,
414+
config: &BatchConfig,
415+
) -> ExportResult
416+
where
417+
E: SpanExporter + Send + Sync + 'static,
418+
{
419+
// Get upto `max_export_batch_size` amount of spans from the channel and push them to the span vec
420+
while let Ok(span) = spans_receiver.try_recv() {
421+
spans.push(span);
422+
if spans.len() == config.max_export_batch_size {
423+
break;
424+
}
425+
}
426+
427+
let count_of_spans = spans.len(); // Count of spans that will be exported
428+
let result = Self::export_with_timeout_sync(
429+
config.max_export_timeout,
430+
exporter,
431+
spans,
432+
last_export_time,
433+
); // This method clears the spans vec after exporting
434+
435+
current_batch_size.fetch_sub(count_of_spans, Ordering::Relaxed);
436+
result
437+
}
438+
439+
#[allow(clippy::vec_box)]
440+
fn export_with_timeout_sync<E>(
441+
_: Duration, // TODO, enforcing timeout in exporter.
442+
exporter: &mut E,
443+
batch: &mut Vec<SpanData>,
444+
last_export_time: &mut Instant,
445+
) -> ExportResult
446+
where
447+
E: SpanExporter + Send + Sync + 'static,
448+
{
449+
*last_export_time = Instant::now();
450+
451+
if batch.is_empty() {
452+
return TraceResult::Ok(());
453+
}
454+
455+
let export = exporter.export(batch.split_off(0));
456+
let export_result = futures_executor::block_on(export);
457+
458+
match export_result {
459+
Ok(_) => TraceResult::Ok(()),
460+
Err(err) => {
461+
otel_error!(
462+
name: "BatchSpanProcessor.ExportError",
463+
error = format!("{}", err)
464+
);
465+
TraceResult::Err(err)
466+
}
467+
}
468+
}
358469
}
359470

360471
impl SpanProcessor for BatchSpanProcessor {
@@ -369,10 +480,11 @@ impl SpanProcessor for BatchSpanProcessor {
369480
// this is a warning, as the user is trying to emit after the processor has been shutdown
370481
otel_warn!(
371482
name: "BatchSpanProcessor.Emit.ProcessorShutdown",
483+
message = "BatchSpanProcessor has been shutdown. No further spans will be emitted."
372484
);
373485
return;
374486
}
375-
let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));
487+
let result = self.span_sender.try_send(span);
376488

377489
if result.is_err() {
378490
// Increment dropped span count. The first time we have to drop a span,
@@ -382,6 +494,36 @@ impl SpanProcessor for BatchSpanProcessor {
382494
message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped.");
383495
}
384496
}
497+
// At this point, sending the span to the data channel was successful.
498+
// Increment the current batch size and check if it has reached the max export batch size.
499+
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size
500+
{
501+
// Check if the a control message for exporting spans is already sent to the worker thread.
502+
// If not, send a control message to export spans.
503+
// `export_span_message_sent` is set to false ONLY when the worker thread has processed the control message.
504+
505+
if !self.export_span_message_sent.load(Ordering::Relaxed) {
506+
// This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
507+
// Perform atomic swap to `export_span_message_sent` ONLY when the atomic load operation above returns false.
508+
// Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
509+
// We could have used compare_exchange as well here, but it's more verbose than swap.
510+
if !self.export_span_message_sent.swap(true, Ordering::Relaxed) {
511+
match self.message_sender.try_send(BatchMessage::ExportSpan(
512+
self.export_span_message_sent.clone(),
513+
)) {
514+
Ok(_) => {
515+
// Control message sent successfully.
516+
}
517+
Err(_err) => {
518+
// TODO: Log error
519+
// If the control message could not be sent, reset the `export_span_message_sent` flag.
520+
self.export_span_message_sent
521+
.store(false, Ordering::Relaxed);
522+
}
523+
}
524+
}
525+
}
526+
}
385527
}
386528

387529
/// Flushes all pending spans.
@@ -401,17 +543,20 @@ impl SpanProcessor for BatchSpanProcessor {
401543

402544
/// Shuts down the processor.
403545
fn shutdown(&self) -> TraceResult<()> {
546+
if self.is_shutdown.swap(true, Ordering::Relaxed) {
547+
return Err(TraceError::Other("Processor already shutdown".into()));
548+
}
404549
let dropped_spans = self.dropped_span_count.load(Ordering::Relaxed);
550+
let max_queue_size = self.max_queue_size;
405551
if dropped_spans > 0 {
406552
otel_warn!(
407-
name: "BatchSpanProcessor.LogsDropped",
553+
name: "BatchSpanProcessor.SpansDropped",
408554
dropped_span_count = dropped_spans,
555+
max_queue_size = max_queue_size,
409556
message = "Spans were dropped due to a queue being full or other error. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
410557
);
411558
}
412-
if self.is_shutdown.swap(true, Ordering::Relaxed) {
413-
return Err(TraceError::Other("Processor already shutdown".into()));
414-
}
559+
415560
let (sender, receiver) = sync_channel(1);
416561
self.message_sender
417562
.try_send(BatchMessage::Shutdown(sender))

0 commit comments

Comments
 (0)