Skip to content

Commit 2abf44d

Browse files
authored
fix: deadlock in main loop (#696)
Fixes an issue I made in #692. Like all concurrency bugs, this one is insidious. At a glance combining `next_event` with `handle_next_invocation` makes a lot of sense. I wish I could blame it on an LLM, but I had the bright idea this morning as I reviewed the code. In the flush_at_end and start cases, it works totally fine. The problem is that in the high-throughput case where we switch to periodic mode, we had a tokio::select! block polling over multiple futures. In #692 with the combined next_event and handle_next_invocation method, we end up polling over a future which both waited for the response from /next_event but then immediately blocked until it could lock the invocation processor and insert the new request. Unfortunately this caused a deadlock because inside the tokio::select! block, we also lock the invocation processor when we need to handle some event bus events. If the response from next_event returns and tokio enters that branch, it may not return if it can't get the lock. But if a request task is inserting data into the invocation processor, we can deadlock and never poll that branch again. ![image](https://github.com/user-attachments/assets/cb5795cb-02d3-4298-8f4c-7f28eaf7c7b7) ![image](https://github.com/user-attachments/assets/361ae154-d3b2-478e-b731-e8ada2ba4b8a)
1 parent a3d3b1f commit 2abf44d

File tree

1 file changed

+7
-8
lines changed

1 file changed

+7
-8
lines changed

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -495,10 +495,11 @@ async fn extension_loop_active(
495495
"Datadog Next-Gen Extension ready in {:}ms",
496496
start_time.elapsed().as_millis().to_string()
497497
);
498+
let next_lambda_response = next_event(client, &r.extension_id).await;
498499
// first invoke we must call next
499500
let mut pending_flush_handles = PendingFlushHandles::new();
500501
let mut last_continuous_flush_error = false;
501-
handle_next_invocation(client, &r.extension_id, invocation_processor.clone()).await;
502+
handle_next_invocation(next_lambda_response, invocation_processor.clone()).await;
502503
loop {
503504
let maybe_shutdown_event;
504505

@@ -541,8 +542,9 @@ async fn extension_loop_active(
541542
&mut race_flush_interval,
542543
)
543544
.await;
545+
let next_response = next_event(client, &r.extension_id).await;
544546
maybe_shutdown_event =
545-
handle_next_invocation(client, &r.extension_id, invocation_processor.clone()).await;
547+
handle_next_invocation(next_response, invocation_processor.clone()).await;
546548
} else {
547549
//Periodic flush scenario, flush at top of invocation
548550
if current_flush_decision == FlushDecision::Continuous && !last_continuous_flush_error {
@@ -592,8 +594,7 @@ async fn extension_loop_active(
592594
// If we get platform.runtimeDone or platform.runtimeReport
593595
// That's fine, we still wait to break until we get the response from next
594596
// and then we break to determine if we'll flush or not
595-
let next_lambda_response =
596-
handle_next_invocation(client, &r.extension_id, invocation_processor.clone());
597+
let next_lambda_response = next_event(client, &r.extension_id);
597598
tokio::pin!(next_lambda_response);
598599
'next_invocation: loop {
599600
tokio::select! {
@@ -607,7 +608,7 @@ async fn extension_loop_active(
607608
race_flush_interval.reset();
608609
// Thank you for not removing race_flush_interval.reset();
609610

610-
maybe_shutdown_event= next_response;
611+
maybe_shutdown_event = handle_next_invocation(next_response, invocation_processor.clone()).await;
611612
// Need to break here to re-call next
612613
break 'next_invocation;
613614
}
@@ -766,11 +767,9 @@ async fn handle_event_bus_event(
766767
}
767768

768769
async fn handle_next_invocation(
769-
client: &Client,
770-
ext_id: &str,
770+
next_response: Result<NextEventResponse>,
771771
invocation_processor: Arc<TokioMutex<InvocationProcessor>>,
772772
) -> NextEventResponse {
773-
let next_response = next_event(client, ext_id).await;
774773
match next_response {
775774
Ok(NextEventResponse::Invoke {
776775
ref request_id,

0 commit comments

Comments
 (0)