Skip to content

Commit 5c9a025

Browse files
authored
Merge pull request #33 from pratyush618/feat/prefork-worker-pool
feat: multi-process prefork worker pool
2 parents 2175495 + 82378ab commit 5c9a025

19 files changed

Lines changed: 832 additions & 19 deletions

File tree

crates/taskito-async/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "taskito-async"
3-
version = "0.8.0"
3+
version = "0.9.0"
44
edition = "2021"
55

66
[dependencies]

crates/taskito-core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "taskito-core"
3-
version = "0.8.0"
3+
version = "0.9.0"
44
edition = "2021"
55

66
[features]

crates/taskito-python/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "taskito-python"
3-
version = "0.8.0"
3+
version = "0.9.0"
44
edition = "2021"
55

66
[features]
@@ -23,4 +23,6 @@ uuid = { workspace = true }
2323
async-trait = { workspace = true }
2424
taskito-async = { path = "../taskito-async", optional = true }
2525
serde_json = { workspace = true }
26+
serde = { workspace = true }
27+
base64 = "0.22"
2628
log = { workspace = true }

crates/taskito-python/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use pyo3::prelude::*;
22

33
#[cfg(not(feature = "native-async"))]
44
mod async_worker;
5+
mod prefork;
56
mod py_config;
67
mod py_job;
78
mod py_queue;
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
//! Child process handle — spawn, write jobs, read results.
2+
//!
3+
//! A child is split into two halves after spawning:
4+
//! - `ChildWriter`: sends jobs to the child's stdin (owned by dispatch thread)
5+
//! - `ChildReader`: reads results from the child's stdout (owned by reader thread)
6+
//! - `ChildProcess`: holds the process handle for lifecycle management
7+
8+
use std::io::{BufRead, BufReader, BufWriter, Write};
9+
use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
10+
11+
use super::protocol::{ChildMessage, ParentMessage};
12+
13+
/// Writer half — sends job messages to the child process via stdin.
14+
pub struct ChildWriter {
15+
writer: BufWriter<ChildStdin>,
16+
}
17+
18+
impl ChildWriter {
19+
/// Send a message to the child process.
20+
pub fn send(&mut self, msg: &ParentMessage) -> std::io::Result<()> {
21+
let json = serde_json::to_string(msg)
22+
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
23+
self.writer.write_all(json.as_bytes())?;
24+
self.writer.write_all(b"\n")?;
25+
self.writer.flush()
26+
}
27+
28+
/// Send a shutdown message. Errors are silently ignored (child may already be gone).
29+
pub fn send_shutdown(&mut self) {
30+
let _ = self.send(&ParentMessage::Shutdown);
31+
}
32+
}
33+
34+
/// Reader half — reads result messages from the child process via stdout.
35+
pub struct ChildReader {
36+
reader: BufReader<ChildStdout>,
37+
}
38+
39+
impl ChildReader {
40+
/// Read one message from the child's stdout. Blocks until a line is available.
41+
pub fn read(&mut self) -> Result<ChildMessage, String> {
42+
let mut line = String::new();
43+
match self.reader.read_line(&mut line) {
44+
Ok(0) => Err("child process closed stdout".into()),
45+
Ok(_) => serde_json::from_str(&line)
46+
.map_err(|e| format!("failed to parse child message: {e}")),
47+
Err(e) => Err(format!("failed to read from child stdout: {e}")),
48+
}
49+
}
50+
}
51+
52+
/// Process handle for lifecycle management.
53+
pub struct ChildProcess {
54+
process: Child,
55+
}
56+
57+
impl ChildProcess {
58+
/// Check if the child process is still alive.
59+
#[allow(dead_code)]
60+
pub fn is_alive(&mut self) -> bool {
61+
matches!(self.process.try_wait(), Ok(None))
62+
}
63+
64+
/// Wait for the child to exit, with a timeout. Kills if it doesn't exit in time.
65+
pub fn wait_or_kill(&mut self, timeout: std::time::Duration) {
66+
let start = std::time::Instant::now();
67+
loop {
68+
match self.process.try_wait() {
69+
Ok(Some(_)) => return,
70+
Ok(None) if start.elapsed() >= timeout => {
71+
let _ = self.process.kill();
72+
let _ = self.process.wait();
73+
return;
74+
}
75+
Ok(None) => std::thread::sleep(std::time::Duration::from_millis(100)),
76+
Err(_) => return,
77+
}
78+
}
79+
}
80+
}
81+
82+
/// Spawn a child worker process and wait for its `ready` signal.
83+
///
84+
/// Returns the three split halves: writer, reader, and process handle.
85+
pub fn spawn_child(
86+
python: &str,
87+
app_path: &str,
88+
) -> Result<(ChildWriter, ChildReader, ChildProcess), String> {
89+
let mut process = Command::new(python)
90+
.args(["-m", "taskito.prefork", app_path])
91+
.stdin(Stdio::piped())
92+
.stdout(Stdio::piped())
93+
.stderr(Stdio::inherit())
94+
.spawn()
95+
.map_err(|e| format!("failed to spawn child: {e}"))?;
96+
97+
let stdin = process.stdin.take().expect("stdin should be piped");
98+
let stdout = process.stdout.take().expect("stdout should be piped");
99+
100+
let mut reader = ChildReader {
101+
reader: BufReader::new(stdout),
102+
};
103+
104+
// Wait for ready signal
105+
match reader.read()? {
106+
ChildMessage::Ready => {}
107+
other => {
108+
return Err(format!(
109+
"expected ready message, got: {:?}",
110+
std::any::type_name_of_val(&other)
111+
));
112+
}
113+
}
114+
115+
Ok((
116+
ChildWriter {
117+
writer: BufWriter::new(stdin),
118+
},
119+
reader,
120+
ChildProcess { process },
121+
))
122+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
//! Job dispatch strategies for distributing work across child processes.
2+
3+
/// Selects the child with the fewest in-flight jobs (least-loaded).
4+
/// Falls back to round-robin if all children have equal load.
5+
pub fn least_loaded(in_flight_counts: &[u32]) -> usize {
6+
in_flight_counts
7+
.iter()
8+
.enumerate()
9+
.min_by_key(|(_, &count)| count)
10+
.map(|(idx, _)| idx)
11+
.unwrap_or(0)
12+
}
13+
14+
#[cfg(test)]
15+
mod tests {
16+
use super::*;
17+
18+
#[test]
19+
fn test_least_loaded_picks_idle() {
20+
assert_eq!(least_loaded(&[3, 0, 2]), 1);
21+
}
22+
23+
#[test]
24+
fn test_least_loaded_picks_first_on_tie() {
25+
assert_eq!(least_loaded(&[1, 1, 1]), 0);
26+
}
27+
28+
#[test]
29+
fn test_least_loaded_single() {
30+
assert_eq!(least_loaded(&[5]), 0);
31+
}
32+
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
//! Prefork worker pool — dispatches jobs to child Python processes via IPC.
2+
//!
3+
//! Each child is an independent Python interpreter with its own GIL,
4+
//! enabling true parallelism for CPU-bound tasks. The parent process
5+
//! runs the Rust scheduler and dispatches serialized jobs over stdin
6+
//! pipes; children send results back over stdout pipes.
7+
//!
8+
//! Architecture:
9+
//! - One dispatch thread: receives `Job` from scheduler, sends to children via stdin
10+
//! - N reader threads: one per child, reads results from stdout, sends to `result_tx`
11+
//! - Child processes: run `python -m taskito.prefork <app_path>`
12+
13+
mod child;
14+
mod dispatch;
15+
pub mod protocol;
16+
17+
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
18+
use std::sync::Arc;
19+
use std::thread;
20+
21+
use async_trait::async_trait;
22+
use crossbeam_channel::Sender;
23+
24+
use taskito_core::job::Job;
25+
use taskito_core::scheduler::JobResult;
26+
use taskito_core::worker::WorkerDispatcher;
27+
28+
use child::{spawn_child, ChildWriter};
29+
use protocol::ParentMessage;
30+
31+
/// Multi-process worker pool that dispatches jobs to child Python processes.
32+
pub struct PreforkPool {
33+
num_workers: usize,
34+
app_path: String,
35+
python: String,
36+
shutdown: AtomicBool,
37+
}
38+
39+
impl PreforkPool {
40+
pub fn new(num_workers: usize, app_path: String) -> Self {
41+
let python = std::env::var("TASKITO_PYTHON").unwrap_or_else(|_| "python".to_string());
42+
43+
Self {
44+
num_workers,
45+
app_path,
46+
python,
47+
shutdown: AtomicBool::new(false),
48+
}
49+
}
50+
}
51+
52+
#[async_trait]
53+
impl WorkerDispatcher for PreforkPool {
54+
async fn run(
55+
&self,
56+
mut job_rx: tokio::sync::mpsc::Receiver<Job>,
57+
result_tx: Sender<JobResult>,
58+
) {
59+
let num_workers = self.num_workers;
60+
let app_path = self.app_path.clone();
61+
let python = self.python.clone();
62+
let shutdown = &self.shutdown;
63+
64+
// Spawn all children and split into writers + readers
65+
let mut writers: Vec<ChildWriter> = Vec::with_capacity(num_workers);
66+
let in_flight: Arc<Vec<AtomicU32>> =
67+
Arc::new((0..num_workers).map(|_| AtomicU32::new(0)).collect());
68+
let mut reader_handles: Vec<thread::JoinHandle<()>> = Vec::new();
69+
let mut process_handles: Vec<child::ChildProcess> = Vec::new();
70+
71+
for i in 0..num_workers {
72+
match spawn_child(&python, &app_path) {
73+
Ok((writer, mut reader, process)) => {
74+
log::info!("[taskito] prefork child {i} ready");
75+
writers.push(writer);
76+
process_handles.push(process);
77+
78+
// Spawn a reader thread for this child
79+
let tx = result_tx.clone();
80+
let in_flight_counter = in_flight.clone();
81+
let child_idx = i;
82+
reader_handles.push(thread::spawn(move || {
83+
loop {
84+
match reader.read() {
85+
Ok(msg) => {
86+
if let Some(job_result) = msg.into_job_result() {
87+
in_flight_counter[child_idx]
88+
.fetch_sub(1, Ordering::Relaxed);
89+
if tx.send(job_result).is_err() {
90+
break; // result channel closed
91+
}
92+
}
93+
}
94+
Err(e) => {
95+
log::warn!(
96+
"[taskito] prefork child {child_idx} reader error: {e}"
97+
);
98+
break;
99+
}
100+
}
101+
}
102+
}));
103+
}
104+
Err(e) => {
105+
log::error!("[taskito] failed to spawn prefork child {i}: {e}");
106+
}
107+
}
108+
}
109+
110+
if writers.is_empty() {
111+
log::error!("[taskito] no prefork children started, aborting");
112+
return;
113+
}
114+
115+
log::info!(
116+
"[taskito] prefork pool running with {} children",
117+
writers.len()
118+
);
119+
120+
// Dispatch loop: receive jobs from scheduler, send to least-loaded child
121+
while let Some(job) = job_rx.recv().await {
122+
if shutdown.load(Ordering::Relaxed) {
123+
break;
124+
}
125+
126+
let counts: Vec<u32> = in_flight
127+
.iter()
128+
.map(|c| c.load(Ordering::Relaxed))
129+
.collect();
130+
let idx = dispatch::least_loaded(&counts);
131+
132+
let msg = ParentMessage::from(&job);
133+
if let Err(e) = writers[idx].send(&msg) {
134+
log::error!(
135+
"[taskito] failed to send job {} to child {idx}: {e}",
136+
job.id
137+
);
138+
// Job will be reaped by the scheduler's stale job reaper
139+
continue;
140+
}
141+
in_flight[idx].fetch_add(1, Ordering::Relaxed);
142+
}
143+
144+
// Graceful shutdown: tell all children to stop
145+
for (i, writer) in writers.iter_mut().enumerate() {
146+
writer.send_shutdown();
147+
log::info!("[taskito] sent shutdown to prefork child {i}");
148+
}
149+
150+
// Wait for children to exit
151+
let drain_timeout = std::time::Duration::from_secs(30);
152+
for (i, process) in process_handles.iter_mut().enumerate() {
153+
process.wait_or_kill(drain_timeout);
154+
log::info!("[taskito] prefork child {i} exited");
155+
}
156+
157+
// Wait for reader threads
158+
for handle in reader_handles {
159+
let _ = handle.join();
160+
}
161+
}
162+
163+
fn shutdown(&self) {
164+
self.shutdown.store(true, Ordering::SeqCst);
165+
}
166+
}

0 commit comments

Comments
 (0)