From 6572d4e3bdb770d51baca905c620885c0cfa6fdb Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Thu, 16 Oct 2025 18:19:24 -0700 Subject: [PATCH] refactor job run state to separate state types --- databuild/job_run.rs | 247 +++++++++++++++++++++++++------------------ 1 file changed, 144 insertions(+), 103 deletions(-) diff --git a/databuild/job_run.rs b/databuild/job_run.rs index f93cf6a..c5ed343 100644 --- a/databuild/job_run.rs +++ b/databuild/job_run.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use crate::build_event_log::{BELStorage, MemoryBELStorage}; use crate::data_build_event::Event; use crate::{JobRunHeartbeatEventV1, JobRunStatus, JobRunStatusCode, JobRunSuccessEventV1, JobRunFailureEventV1, JobRunCancelEventV1, EventSource}; use std::error::Error; @@ -36,14 +35,30 @@ pub fn spawn_job_run(config: JobRunConfig) -> Result, Box, - pub storage: MemoryBELStorage, - pub running: bool, pub entry_point: String, pub args: Vec, - pub stdout_buffer: Vec, // Buffered stdout lines - pub exit_code: Option, // Cached exit code - pub cancel_source: Option, // Source of cancellation if canceled + pub state: JobRunState, +} + +enum JobRunState { + NotStarted, + Running { + process: Child, + stdout_buffer: Vec, + }, + Completed { + exit_code: i32, + stdout_buffer: Vec, + }, + Failed { + exit_code: i32, + reason: String, + stdout_buffer: Vec, + }, + Canceled { + source: EventSource, + stdout_buffer: Vec, + }, } impl JobRun for SubProcessJobRun { @@ -54,111 +69,142 @@ impl JobRun for SubProcessJobRun { fn visit(&mut self) -> Result> { let mut new_events = Vec::new(); - // visit() should only be called on running jobs - if !self.running { - return Err("visit() called on non-running job".into()); - } + match &mut self.state { + JobRunState::Running { process, stdout_buffer } => { + // Non-blocking check for exit status + if let Some(exit_status) = process.try_wait()? { + // Read any remaining stdout + if let Some(stdout) = process.stdout.take() { + let reader = BufReader::new(stdout); + for line in reader.lines() { + // TODO we should write lines to the job's file logs + if let Ok(line) = line { + stdout_buffer.push(line); + } + } + } - // Process must be present if running - let child = self.process.as_mut() - .ok_or("visit() called but no process present")?; + // Take ownership of the current state to transition + let old_state = std::mem::replace(&mut self.state, JobRunState::NotStarted); + let stdout_buf = if let JobRunState::Running { stdout_buffer, .. } = old_state { + stdout_buffer + } else { + Vec::new() + }; - // Non-blocking check for exit status - if let Some(exit_status) = child.try_wait()? { - self.running = false; - - // Read any remaining stdout - if let Some(stdout) = child.stdout.take() { - let reader = BufReader::new(stdout); - for line in reader.lines() { - // TODO we should write lines to the job's file logs - if let Ok(line) = line { - self.stdout_buffer.push(line); + // Check exit status and transition to terminal state + match exit_status.code() { + Some(0) => { + // Success case + self.state = JobRunState::Completed { + exit_code: 0, + stdout_buffer: stdout_buf, + }; + new_events.push(Event::JobRunSuccessV1(JobRunSuccessEventV1 { + job_run_id: self.job_run_id.to_string(), + })); + return Ok(JobRunPollResult { + new_events, + status: JobRunStatusCode::JobRunSucceeded.into(), + }); + } + Some(code) => { + // Failed with exit code + let reason = format!("Job failed with exit code {}", code); + self.state = JobRunState::Failed { + exit_code: code, + reason: reason.clone(), + stdout_buffer: stdout_buf, + }; + new_events.push(Event::JobRunFailureV1(JobRunFailureEventV1 { + job_run_id: self.job_run_id.to_string(), + reason, + })); + return Ok(JobRunPollResult { + new_events, + status: JobRunStatusCode::JobRunFailed.into(), + }); + } + None => { + // Terminated by signal (Unix) - treat as failure + let reason = format!("Job terminated by signal: {}", exit_status); + self.state = JobRunState::Failed { + exit_code: -1, + reason: reason.clone(), + stdout_buffer: stdout_buf, + }; + new_events.push(Event::JobRunFailureV1(JobRunFailureEventV1 { + job_run_id: self.job_run_id.to_string(), + reason, + })); + return Ok(JobRunPollResult { + new_events, + status: JobRunStatusCode::JobRunFailed.into(), + }); + } } } - } - - // Check exit status - match exit_status.code() { - Some(0) => { - // Success case - self.exit_code = Some(0); - new_events.push(Event::JobRunSuccessV1(JobRunSuccessEventV1 { - job_run_id: self.job_run_id.to_string(), - })); - return Ok(JobRunPollResult { - new_events, - status: JobRunStatusCode::JobRunSucceeded.into(), - }); - } - Some(code) => { - // Failed with exit code - self.exit_code = Some(code); - new_events.push(Event::JobRunFailureV1(JobRunFailureEventV1 { - job_run_id: self.job_run_id.to_string(), - reason: format!("Job failed with exit code {}", code), - })); - return Ok(JobRunPollResult { - new_events, - status: JobRunStatusCode::JobRunFailed.into(), - }); - } - None => { - // Terminated by signal (Unix) - treat as failure - new_events.push(Event::JobRunFailureV1(JobRunFailureEventV1 { - job_run_id: self.job_run_id.to_string(), - reason: format!("Job terminated by signal: {}", exit_status), - })); - return Ok(JobRunPollResult { - new_events, - status: JobRunStatusCode::JobRunFailed.into(), - }); - } + // Still running + Ok(JobRunPollResult { + new_events, + status: JobRunStatusCode::JobRunRunning.into(), + }) } + _ => Err("visit() called on non-running job".into()), } - - // Still running - Ok(JobRunPollResult { - new_events, - status: JobRunStatusCode::JobRunRunning.into(), - }) } fn cancel(&mut self, source: EventSource) -> Result> { - if !self.running { - return Err("cancel() called on non-running job".into()); + match std::mem::replace(&mut self.state, JobRunState::NotStarted) { + JobRunState::Running { mut process, stdout_buffer } => { + // Kill the process + process.kill()?; + + // Wait for it to actually terminate + process.wait()?; + + // Transition to Canceled state + self.state = JobRunState::Canceled { + source: source.clone(), + stdout_buffer, + }; + + Ok(JobRunCancelEventV1 { + job_run_id: self.job_run_id.to_string(), + source: Some(source), + comment: Some("Job was canceled".to_string()), + }) + } + other_state => { + // Restore the state and error + self.state = other_state; + Err("cancel() called on non-running job".into()) + } } - - let child = self.process.as_mut() - .ok_or("cancel() called but no process present")?; - - // Kill the process - child.kill()?; - - // Wait for it to actually terminate - child.wait()?; - - self.running = false; - - Ok(JobRunCancelEventV1 { - job_run_id: self.job_run_id.to_string(), - source: Some(source), - comment: Some("Job was canceled".to_string()), - }) } /// Mostly for test purposes fn run_with_env(&mut self, env: Option>) -> Result> { - self.process = Some(Command::new(self.entry_point.clone()) - .args(self.args.clone()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .envs(env.unwrap_or_default()) - .spawn()?); - self.running = true; - // TODO should this return the event now? Or enqueue it? No sense in waiting I suppose, and orchestrator should just handle it? - Ok(JobRunHeartbeatEventV1 { job_run_id: self.id().to_string() }) + match &self.state { + JobRunState::NotStarted => { + let process = Command::new(self.entry_point.clone()) + .args(self.args.clone()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .envs(env.unwrap_or_default()) + .spawn()?; + + self.state = JobRunState::Running { + process, + stdout_buffer: Vec::new(), + }; + + // TODO should this return the event now? Or enqueue it? No sense in waiting I suppose, and orchestrator should just handle it? + Ok(JobRunHeartbeatEventV1 { job_run_id: self.id().to_string() }) + } + _ => Err("run() called on already-running or completed job".into()) + } } } @@ -166,14 +212,9 @@ impl SubProcessJobRun { pub fn spawn(entry_point: String, args: Vec) -> Result, Box> { Ok(Box::new(SubProcessJobRun { job_run_id: Uuid::new_v4(), - process: None, - storage: MemoryBELStorage::default(), - running: false, entry_point, args, - stdout_buffer: Vec::new(), - exit_code: None, - cancel_source: None, + state: JobRunState::NotStarted, })) } }