Skip to content

Commit bff1857

Browse files
authored
fix: request idle timeout should be applied differently for worker kind (#636)
1 parent 7beedc6 commit bff1857

File tree

5 files changed

+59
-22
lines changed

5 files changed

+59
-22
lines changed

cli/src/flags.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,15 @@ fn get_start_command() -> Command {
208208
.value_parser(value_parser!(u64)),
209209
)
210210
.arg(
211-
arg!(--"request-idle-timeout" <MILLISECONDS>)
211+
arg!(--"main-worker-request-idle-timeout" <MILLISECONDS>)
212+
.help(concat!(
213+
"Maximum time in milliseconds that can be waited from when a ",
214+
"worker takes over the request (disabled by default)"
215+
))
216+
.value_parser(value_parser!(u64)),
217+
)
218+
.arg(
219+
arg!(--"user-worker-request-idle-timeout" <MILLISECONDS>)
212220
.help(concat!(
213221
"Maximum time in milliseconds that can be waited from when a ",
214222
"worker takes over the request (disabled by default)"

cli/src/main.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use anyhow::Context;
1313
use anyhow::Error;
1414
use base::server;
1515
use base::server::Builder;
16+
use base::server::RequestIdleTimeout;
1617
use base::server::ServerFlags;
1718
use base::server::Tls;
1819
use base::utils::units::percentage_value;
@@ -180,8 +181,12 @@ fn main() -> Result<ExitCode, anyhow::Error> {
180181
sub_matches.get_one::<usize>("max-parallelism").cloned();
181182
let maybe_request_wait_timeout =
182183
sub_matches.get_one::<u64>("request-wait-timeout").cloned();
183-
let maybe_request_idle_timeout =
184-
sub_matches.get_one::<u64>("request-idle-timeout").cloned();
184+
let maybe_main_worker_request_idle_timeout = sub_matches
185+
.get_one::<u64>("main-worker-request-idle-timeout")
186+
.cloned();
187+
let maybe_user_worker_request_idle_timeout = sub_matches
188+
.get_one::<u64>("user-worker-request-idle-timeout")
189+
.cloned();
185190
let maybe_request_read_timeout =
186191
sub_matches.get_one::<u64>("request-read-timeout").cloned();
187192

@@ -249,7 +254,10 @@ fn main() -> Result<ExitCode, anyhow::Error> {
249254
graceful_exit_keepalive_deadline_ms,
250255
event_worker_exit_deadline_sec,
251256
request_wait_timeout_ms: maybe_request_wait_timeout,
252-
request_idle_timeout_ms: maybe_request_idle_timeout,
257+
request_idle_timeout: RequestIdleTimeout::from_millis(
258+
maybe_main_worker_request_idle_timeout,
259+
maybe_user_worker_request_idle_timeout,
260+
),
253261
request_read_timeout_ms: maybe_request_read_timeout,
254262
request_buffer_size: Some(request_buffer_size),
255263

crates/base/src/server.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use either::Either::Left;
2222
use either::Either::Right;
2323
use enum_as_inner::EnumAsInner;
2424
use ext_runtime::SharedMetricSource;
25+
use ext_workers::context::WorkerKind;
2526
use ext_workers::context::WorkerRequestMsg;
2627
use futures_util::future::poll_fn;
2728
use futures_util::future::BoxFuture;
@@ -278,6 +279,29 @@ pub enum OtelKind {
278279
Both,
279280
}
280281

282+
#[derive(Debug, Clone, Copy, Default)]
283+
pub struct RequestIdleTimeout {
284+
main: Option<Duration>,
285+
user: Option<Duration>,
286+
}
287+
288+
impl RequestIdleTimeout {
289+
pub fn from_millis(main: Option<u64>, user: Option<u64>) -> Self {
290+
Self {
291+
main: main.map(Duration::from_millis),
292+
user: user.map(Duration::from_millis),
293+
}
294+
}
295+
296+
pub fn get(&self, kind: WorkerKind) -> Option<Duration> {
297+
match kind {
298+
WorkerKind::MainWorker => self.main,
299+
WorkerKind::UserWorker => self.user,
300+
WorkerKind::EventsWorker => None,
301+
}
302+
}
303+
}
304+
281305
#[derive(Debug, Default, Clone, Copy)]
282306
pub struct ServerFlags {
283307
pub otel: Option<OtelKind>,
@@ -291,8 +315,8 @@ pub struct ServerFlags {
291315
pub graceful_exit_keepalive_deadline_ms: Option<u64>,
292316
pub event_worker_exit_deadline_sec: u64,
293317
pub request_wait_timeout_ms: Option<u64>,
294-
pub request_idle_timeout_ms: Option<u64>,
295318
pub request_read_timeout_ms: Option<u64>,
319+
pub request_idle_timeout: RequestIdleTimeout,
296320
pub request_buffer_size: Option<u64>,
297321

298322
pub beforeunload_wall_clock_pct: Option<u8>,

crates/base/src/worker/worker_surface_creation.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ mod request {
7070
duplex_stream_tx: mpsc::UnboundedSender<DuplexStreamEntry>,
7171
msg: WorkerRequestMsg,
7272
) -> Result<(), anyhow::Error> {
73-
let request_idle_timeout_ms = flags.request_idle_timeout_ms;
73+
let request_idle_timeout_dur = flags.request_idle_timeout.get(worker_kind);
7474
let request_buf_size = flags.request_buffer_size.unwrap_or_else(|| {
7575
const KIB: usize = 1024;
7676
static CHECK: Lazy<AtomicFlag> = Lazy::new(AtomicFlag::default);
@@ -125,7 +125,7 @@ mod request {
125125
tokio::spawn(relay_upgraded_request_and_response(
126126
req_upgrade,
127127
parts,
128-
request_idle_timeout_ms,
128+
request_idle_timeout_dur,
129129
));
130130

131131
return;
@@ -144,8 +144,8 @@ mod request {
144144
tokio::task::yield_now().await;
145145

146146
let maybe_cancel_fut = async move {
147-
if let Some(timeout_ms) = request_idle_timeout_ms {
148-
sleep(Duration::from_millis(timeout_ms)).await;
147+
if let Some(dur) = request_idle_timeout_dur {
148+
sleep(dur).await;
149149
} else {
150150
pending::<()>().await;
151151
unreachable!()
@@ -185,18 +185,17 @@ mod request {
185185
}
186186
}
187187

188-
if let Some(timeout_ms) = flags.request_idle_timeout_ms {
188+
if let Some(dur) = request_idle_timeout_dur {
189189
let headers = res.headers();
190190
let is_streamed_response =
191191
!headers.contains_key(http_v02::header::CONTENT_LENGTH);
192192

193193
if is_streamed_response {
194-
let duration = Duration::from_millis(timeout_ms);
195194
let (parts, body) = res.into_parts();
196195

197196
drop(res_tx.send(Ok(Response::from_parts(
198197
parts,
199-
Body::wrap_stream(CancelOnWriteTimeout::new(body, duration)),
198+
Body::wrap_stream(CancelOnWriteTimeout::new(body, dur)),
200199
))));
201200

202201
return Ok(());
@@ -210,14 +209,11 @@ mod request {
210209
async fn relay_upgraded_request_and_response(
211210
downstream: OnUpgrade,
212211
parts: http1::Parts<io::DuplexStream>,
213-
maybe_idle_timeout: Option<u64>,
212+
maybe_idle_timeout: Option<Duration>,
214213
) {
215214
let upstream = Upgraded2::new(parts.io, parts.read_buf);
216-
let mut upstream = if let Some(timeout_ms) = maybe_idle_timeout {
217-
ReadTimeoutStream::with_timeout(
218-
upstream,
219-
Duration::from_millis(timeout_ms),
220-
)
215+
let mut upstream = if let Some(dur) = maybe_idle_timeout {
216+
ReadTimeoutStream::with_timeout(upstream, dur)
221217
} else {
222218
ReadTimeoutStream::with_bypass(upstream)
223219
};

crates/base/tests/integration_tests.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use base::integration_test;
2121
use base::integration_test_listen_fut;
2222
use base::integration_test_with_server_flag;
2323
use base::server::Builder;
24+
use base::server::RequestIdleTimeout;
2425
use base::server::ServerEvent;
2526
use base::server::ServerFlags;
2627
use base::server::ServerHealth;
@@ -1860,7 +1861,7 @@ async fn test_request_idle_timeout_no_streamed_response(
18601861

18611862
integration_test_with_server_flag!(
18621863
ServerFlags {
1863-
request_idle_timeout_ms: Some(1000),
1864+
request_idle_timeout: RequestIdleTimeout::from_millis(None, Some(1000)),
18641865
..Default::default()
18651866
},
18661867
"./test_cases/main",
@@ -1918,7 +1919,7 @@ async fn test_request_idle_timeout_streamed_response(maybe_tls: Option<Tls>) {
19181919

19191920
integration_test_with_server_flag!(
19201921
ServerFlags {
1921-
request_idle_timeout_ms: Some(2000),
1922+
request_idle_timeout: RequestIdleTimeout::from_millis(None, Some(2000)),
19221923
..Default::default()
19231924
},
19241925
"./test_cases/main",
@@ -1991,7 +1992,7 @@ async fn test_request_idle_timeout_streamed_response_first_chunk_timeout(
19911992

19921993
integration_test_with_server_flag!(
19931994
ServerFlags {
1994-
request_idle_timeout_ms: Some(1000),
1995+
request_idle_timeout: RequestIdleTimeout::from_millis(None, Some(1000)),
19951996
..Default::default()
19961997
},
19971998
"./test_cases/main",
@@ -2079,7 +2080,7 @@ async fn test_request_idle_timeout_websocket_deno(
20792080

20802081
integration_test_with_server_flag!(
20812082
ServerFlags {
2082-
request_idle_timeout_ms: Some(1000),
2083+
request_idle_timeout: RequestIdleTimeout::from_millis(None, Some(1000)),
20832084
..Default::default()
20842085
},
20852086
"./test_cases/main",

0 commit comments

Comments
 (0)