Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 125 additions & 1 deletion ballista/scheduler/src/state/execution_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,12 @@ impl RunningStage {
/// Update the TaskInfo for task partition
pub fn update_task_info(&mut self, partition_id: usize, status: TaskStatus) -> bool {
debug!("Updating TaskInfo for partition {partition_id}");
let task_info = self.task_infos[partition_id].as_ref().unwrap();
let Some(task_info) = self.task_infos[partition_id].as_ref() else {
warn!(
"Ignore TaskStatus update for partition {partition_id} because the task was already reset (executor lost)"
);
return false;
};
let task_id = task_info.task_id;
if (status.task_id as usize) < task_id {
warn!(
Expand Down Expand Up @@ -1049,3 +1054,122 @@ impl StageOutput {
partition_locations
}
}

#[cfg(test)]
mod tests {
use super::*;
use ballista_core::serde::protobuf::{SuccessfulTask, TaskStatus, task_status};
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::prelude::SessionConfig;
use std::collections::HashMap;

fn make_running_stage(partitions: usize) -> RunningStage {
let schema = Arc::new(datafusion::arrow::datatypes::Schema::empty());
let plan: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(schema));
RunningStage::new(
1,
0,
plan,
partitions,
vec![],
HashMap::new(),
Arc::new(SessionConfig::default()),
)
}

fn make_task_status(task_id: u32, partition_id: u32) -> TaskStatus {
TaskStatus {
task_id,
job_id: "test-job".to_string(),
stage_id: 1,
stage_attempt_num: 0,
partition_id,
launch_time: 100,
start_exec_time: 200,
end_exec_time: 300,
status: Some(task_status::Status::Successful(SuccessfulTask {
executor_id: "executor-1".to_string(),
partitions: vec![],
})),
metrics: vec![],
}
}

/// Regression test: `update_task_info` must not panic when the task slot
/// is `None` (task was reset after executor heartbeat timeout).
#[test]
fn test_update_task_info_after_reset_does_not_panic() {
let mut stage = make_running_stage(2);

// Both task slots start as None (not yet scheduled).
// Simulates receiving a status update for a task that was already
// reset (e.g., executor heartbeat timed out).
let status = make_task_status(0, 0);
let result = stage.update_task_info(0, status);

// Should return false (update rejected), not panic.
assert!(!result);
}

/// Verify that a normal update succeeds when the task slot is populated.
#[test]
fn test_update_task_info_normal_update_succeeds() {
let mut stage = make_running_stage(2);

// Simulate scheduling the task: populate the task slot.
stage.task_infos[0] = Some(TaskInfo {
task_id: 0,
scheduled_time: 50,
launch_time: 0,
start_exec_time: 0,
end_exec_time: 0,
finish_time: 0,
task_status: task_status::Status::Running(RunningTask {
executor_id: "executor-1".to_string(),
}),
});

let status = make_task_status(0, 0);
let result = stage.update_task_info(0, status);

assert!(result);
assert!(matches!(
stage.task_infos[0].as_ref().unwrap().task_status,
task_status::Status::Successful(_)
));
}

/// After reset_tasks sets a slot to None, update_task_info must not panic.
#[test]
fn test_update_task_info_after_executor_lost() {
let mut stage = make_running_stage(2);

// Populate tasks as running on executor-1.
for i in 0..2 {
stage.task_infos[i] = Some(TaskInfo {
task_id: i,
scheduled_time: 50,
launch_time: 100,
start_exec_time: 200,
end_exec_time: 0,
finish_time: 0,
task_status: task_status::Status::Running(RunningTask {
executor_id: "executor-1".to_string(),
}),
});
}

// Executor heartbeat times out - tasks are reset.
let reset_count = stage.reset_tasks("executor-1");
assert_eq!(reset_count, 2);
assert!(stage.task_infos[0].is_none());
assert!(stage.task_infos[1].is_none());

// Executor sends a late status update for partition 0.
let status = make_task_status(0, 0);
let result = stage.update_task_info(0, status);

// Should gracefully reject the update, not panic.
assert!(!result);
}
}
Loading