refactor job run state to separate state types

This commit is contained in:
Stuart Axelbrooke 2025-10-16 18:19:24 -07:00
parent cfcb201285
commit 6572d4e3bd

View file

@ -1,5 +1,4 @@
use std::collections::HashMap; use std::collections::HashMap;
use crate::build_event_log::{BELStorage, MemoryBELStorage};
use crate::data_build_event::Event; use crate::data_build_event::Event;
use crate::{JobRunHeartbeatEventV1, JobRunStatus, JobRunStatusCode, JobRunSuccessEventV1, JobRunFailureEventV1, JobRunCancelEventV1, EventSource}; use crate::{JobRunHeartbeatEventV1, JobRunStatus, JobRunStatusCode, JobRunSuccessEventV1, JobRunFailureEventV1, JobRunCancelEventV1, EventSource};
use std::error::Error; use std::error::Error;
@ -36,14 +35,30 @@ pub fn spawn_job_run(config: JobRunConfig) -> Result<Box<dyn JobRun>, Box<dyn Er
pub struct SubProcessJobRun { pub struct SubProcessJobRun {
pub job_run_id: Uuid, pub job_run_id: Uuid,
pub process: Option<Child>,
pub storage: MemoryBELStorage,
pub running: bool,
pub entry_point: String, pub entry_point: String,
pub args: Vec<String>, pub args: Vec<String>,
pub stdout_buffer: Vec<String>, // Buffered stdout lines pub state: JobRunState,
pub exit_code: Option<i32>, // Cached exit code }
pub cancel_source: Option<EventSource>, // Source of cancellation if canceled
enum JobRunState {
NotStarted,
Running {
process: Child,
stdout_buffer: Vec<String>,
},
Completed {
exit_code: i32,
stdout_buffer: Vec<String>,
},
Failed {
exit_code: i32,
reason: String,
stdout_buffer: Vec<String>,
},
Canceled {
source: EventSource,
stdout_buffer: Vec<String>,
},
} }
impl JobRun for SubProcessJobRun { impl JobRun for SubProcessJobRun {
@ -54,111 +69,142 @@ impl JobRun for SubProcessJobRun {
fn visit(&mut self) -> Result<JobRunPollResult, Box<dyn Error>> { fn visit(&mut self) -> Result<JobRunPollResult, Box<dyn Error>> {
let mut new_events = Vec::new(); let mut new_events = Vec::new();
// visit() should only be called on running jobs match &mut self.state {
if !self.running { JobRunState::Running { process, stdout_buffer } => {
return Err("visit() called on non-running job".into()); // Non-blocking check for exit status
} if let Some(exit_status) = process.try_wait()? {
// Read any remaining stdout
if let Some(stdout) = process.stdout.take() {
let reader = BufReader::new(stdout);
for line in reader.lines() {
// TODO we should write lines to the job's file logs
if let Ok(line) = line {
stdout_buffer.push(line);
}
}
}
// Process must be present if running // Take ownership of the current state to transition
let child = self.process.as_mut() let old_state = std::mem::replace(&mut self.state, JobRunState::NotStarted);
.ok_or("visit() called but no process present")?; let stdout_buf = if let JobRunState::Running { stdout_buffer, .. } = old_state {
stdout_buffer
} else {
Vec::new()
};
// Non-blocking check for exit status // Check exit status and transition to terminal state
if let Some(exit_status) = child.try_wait()? { match exit_status.code() {
self.running = false; Some(0) => {
// Success case
// Read any remaining stdout self.state = JobRunState::Completed {
if let Some(stdout) = child.stdout.take() { exit_code: 0,
let reader = BufReader::new(stdout); stdout_buffer: stdout_buf,
for line in reader.lines() { };
// TODO we should write lines to the job's file logs new_events.push(Event::JobRunSuccessV1(JobRunSuccessEventV1 {
if let Ok(line) = line { job_run_id: self.job_run_id.to_string(),
self.stdout_buffer.push(line); }));
return Ok(JobRunPollResult {
new_events,
status: JobRunStatusCode::JobRunSucceeded.into(),
});
}
Some(code) => {
// Failed with exit code
let reason = format!("Job failed with exit code {}", code);
self.state = JobRunState::Failed {
exit_code: code,
reason: reason.clone(),
stdout_buffer: stdout_buf,
};
new_events.push(Event::JobRunFailureV1(JobRunFailureEventV1 {
job_run_id: self.job_run_id.to_string(),
reason,
}));
return Ok(JobRunPollResult {
new_events,
status: JobRunStatusCode::JobRunFailed.into(),
});
}
None => {
// Terminated by signal (Unix) - treat as failure
let reason = format!("Job terminated by signal: {}", exit_status);
self.state = JobRunState::Failed {
exit_code: -1,
reason: reason.clone(),
stdout_buffer: stdout_buf,
};
new_events.push(Event::JobRunFailureV1(JobRunFailureEventV1 {
job_run_id: self.job_run_id.to_string(),
reason,
}));
return Ok(JobRunPollResult {
new_events,
status: JobRunStatusCode::JobRunFailed.into(),
});
}
} }
} }
}
// Still running
// Check exit status Ok(JobRunPollResult {
match exit_status.code() { new_events,
Some(0) => { status: JobRunStatusCode::JobRunRunning.into(),
// Success case })
self.exit_code = Some(0);
new_events.push(Event::JobRunSuccessV1(JobRunSuccessEventV1 {
job_run_id: self.job_run_id.to_string(),
}));
return Ok(JobRunPollResult {
new_events,
status: JobRunStatusCode::JobRunSucceeded.into(),
});
}
Some(code) => {
// Failed with exit code
self.exit_code = Some(code);
new_events.push(Event::JobRunFailureV1(JobRunFailureEventV1 {
job_run_id: self.job_run_id.to_string(),
reason: format!("Job failed with exit code {}", code),
}));
return Ok(JobRunPollResult {
new_events,
status: JobRunStatusCode::JobRunFailed.into(),
});
}
None => {
// Terminated by signal (Unix) - treat as failure
new_events.push(Event::JobRunFailureV1(JobRunFailureEventV1 {
job_run_id: self.job_run_id.to_string(),
reason: format!("Job terminated by signal: {}", exit_status),
}));
return Ok(JobRunPollResult {
new_events,
status: JobRunStatusCode::JobRunFailed.into(),
});
}
} }
_ => Err("visit() called on non-running job".into()),
} }
// Still running
Ok(JobRunPollResult {
new_events,
status: JobRunStatusCode::JobRunRunning.into(),
})
} }
fn cancel(&mut self, source: EventSource) -> Result<JobRunCancelEventV1, Box<dyn Error>> { fn cancel(&mut self, source: EventSource) -> Result<JobRunCancelEventV1, Box<dyn Error>> {
if !self.running { match std::mem::replace(&mut self.state, JobRunState::NotStarted) {
return Err("cancel() called on non-running job".into()); JobRunState::Running { mut process, stdout_buffer } => {
// Kill the process
process.kill()?;
// Wait for it to actually terminate
process.wait()?;
// Transition to Canceled state
self.state = JobRunState::Canceled {
source: source.clone(),
stdout_buffer,
};
Ok(JobRunCancelEventV1 {
job_run_id: self.job_run_id.to_string(),
source: Some(source),
comment: Some("Job was canceled".to_string()),
})
}
other_state => {
// Restore the state and error
self.state = other_state;
Err("cancel() called on non-running job".into())
}
} }
let child = self.process.as_mut()
.ok_or("cancel() called but no process present")?;
// Kill the process
child.kill()?;
// Wait for it to actually terminate
child.wait()?;
self.running = false;
Ok(JobRunCancelEventV1 {
job_run_id: self.job_run_id.to_string(),
source: Some(source),
comment: Some("Job was canceled".to_string()),
})
} }
/// Mostly for test purposes /// Mostly for test purposes
fn run_with_env(&mut self, env: Option<HashMap<String, String>>) -> Result<JobRunHeartbeatEventV1, Box<dyn Error>> { fn run_with_env(&mut self, env: Option<HashMap<String, String>>) -> Result<JobRunHeartbeatEventV1, Box<dyn Error>> {
self.process = Some(Command::new(self.entry_point.clone()) match &self.state {
.args(self.args.clone()) JobRunState::NotStarted => {
.stdout(Stdio::piped()) let process = Command::new(self.entry_point.clone())
.stderr(Stdio::piped()) .args(self.args.clone())
.envs(env.unwrap_or_default()) .stdout(Stdio::piped())
.spawn()?); .stderr(Stdio::piped())
self.running = true; .envs(env.unwrap_or_default())
// TODO should this return the event now? Or enqueue it? No sense in waiting I suppose, and orchestrator should just handle it? .spawn()?;
Ok(JobRunHeartbeatEventV1 { job_run_id: self.id().to_string() })
self.state = JobRunState::Running {
process,
stdout_buffer: Vec::new(),
};
// TODO should this return the event now? Or enqueue it? No sense in waiting I suppose, and orchestrator should just handle it?
Ok(JobRunHeartbeatEventV1 { job_run_id: self.id().to_string() })
}
_ => Err("run() called on already-running or completed job".into())
}
} }
} }
@ -166,14 +212,9 @@ impl SubProcessJobRun {
pub fn spawn(entry_point: String, args: Vec<String>) -> Result<Box<dyn JobRun>, Box<dyn Error>> { pub fn spawn(entry_point: String, args: Vec<String>) -> Result<Box<dyn JobRun>, Box<dyn Error>> {
Ok(Box::new(SubProcessJobRun { Ok(Box::new(SubProcessJobRun {
job_run_id: Uuid::new_v4(), job_run_id: Uuid::new_v4(),
process: None,
storage: MemoryBELStorage::default(),
running: false,
entry_point, entry_point,
args, args,
stdout_buffer: Vec::new(), state: JobRunState::NotStarted,
exit_code: None,
cancel_source: None,
})) }))
} }
} }