From 5484363e52a82546b3b26436257a180fd1371006 Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Mon, 15 Sep 2025 20:39:55 -0700 Subject: [PATCH] Make child process wrapper --- databuild/job_run.rs | 106 ++++++++++++++++++++++++++------------ databuild/orchestrator.rs | 2 - 2 files changed, 74 insertions(+), 34 deletions(-) diff --git a/databuild/job_run.rs b/databuild/job_run.rs index 04688cb..4d04939 100644 --- a/databuild/job_run.rs +++ b/databuild/job_run.rs @@ -1,17 +1,48 @@ use crate::build_event_log::{BELStorage, MemoryBELStorage}; use crate::data_build_event::Event; +use crate::data_build_event::Event::{JobRunFailureV1, JobRunSuccessV1}; use crate::{DataBuildEvent, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunSuccessEventV1}; use std::error::Error; use std::io::{BufRead, BufReader}; +use std::ops::{Deref, DerefMut}; use std::process::{Child, Command, ExitStatus, Stdio}; use uuid::Uuid; -use crate::data_build_event::Event::{JobRunFailureV1, JobRunSuccessV1}; + +/** Wrapper type that can be mocked */ +trait JobRunChild { + fn exit_status(&mut self) -> Option; + fn stdout_lines(&mut self) -> Vec; +} #[derive(Debug)] +struct WrappedProcessChild(Child); + +impl JobRunChild for WrappedProcessChild { + fn exit_status(&mut self) -> Option { + self.0.try_wait().expect("coudn't wait") + } + + fn stdout_lines(&mut self) -> Vec { + let mut stdout_lines = Vec::new(); + let stdout = self.0.stdout.take().expect("stdout not piped"); + let reader = BufReader::new(stdout); + for line in reader.lines() { + stdout_lines.push(line.expect("stdout not piped")); + } + stdout_lines + } +} + +impl From for WrappedProcessChild { + fn from(child: Child) -> Self { + Self { 0: child } + } +} + pub struct JobRun { job_run_id: Uuid, events: MemoryBELStorage, - child: Child, + child: Box, unhandled_lines: Vec, } @@ -22,49 +53,58 @@ impl JobRun { Ok(JobRun { job_run_id: Default::default(), events: Default::default(), - child: Command::new(command).args(args).stdout(Stdio::piped()) - .spawn()?, + child: Box::new(WrappedProcessChild::from( + Command::new(command) + .args(args) + .stdout(Stdio::piped()) + .spawn()?, + )), unhandled_lines: Default::default(), }) } pub fn visit(&mut self, since_idx: u64) -> Result, Box> { // Collect new lines from child process - let stdout = self.child.stdout.take().expect("stdout not piped"); - let reader = BufReader::new(stdout); - for line in reader.lines() { - self.unhandled_lines.push(line?); - } // Parse BEL events from child process let new_events = Self::process_lines(self.job_run_id, &self.unhandled_lines); - for event in new_events { self.events.append_event(event)?; } + for event in new_events { + self.events.append_event(event)?; + } self.unhandled_lines.drain(..); // Potentially react to job completion match self.exit_status() { - None => {}, // No exit -> no harm + None => {} // No exit -> no harm Some(status) => { if status.success() { - self.events.append_event(JobRunSuccessV1(JobRunSuccessEventV1 { - job_run_id: self.job_run_id.into(), - }))?; + self.events + .append_event(JobRunSuccessV1(JobRunSuccessEventV1 { + job_run_id: self.job_run_id.into(), + }))?; } else { - self.events.append_event(JobRunFailureV1(JobRunFailureEventV1 { - job_run_id: self.job_run_id.into(), - }))?; + self.events + .append_event(JobRunFailureV1(JobRunFailureEventV1 { + job_run_id: self.job_run_id.into(), + }))?; } } } // Return BEL events since provided idx - self.events.list_events(since_idx, EVENT_SIZE_LIMIT).and_then(|events| { - if events.len() as u64 == EVENT_SIZE_LIMIT { - Err(format!("Returned {} events - that's way too many.", EVENT_SIZE_LIMIT).into()) - } else { - Ok(events) - } - } ) + self.events + .list_events(since_idx, EVENT_SIZE_LIMIT) + .and_then(|events| { + if events.len() as u64 == EVENT_SIZE_LIMIT { + Err(format!( + "Returned {} events - that's way too many.", + EVENT_SIZE_LIMIT + ) + .into()) + } else { + Ok(events) + } + }) } pub fn cancel(&mut self) { @@ -82,12 +122,15 @@ impl JobRun { if lines.len() > 0 { // If any lines were written to stdout, we should heartbeat - events.push(Event::JobRunHeartbeatV1( - JobRunHeartbeatEventV1 { job_run_id: job_run_id.clone().into() } - )); + events.push(Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 { + job_run_id: job_run_id.clone().into(), + })); } - for event in lines.iter().flat_map(|line| Self::event_for_line(line.clone())) { + for event in lines + .iter() + .flat_map(|line| Self::event_for_line(line.clone())) + { events.push(event); } @@ -95,7 +138,7 @@ impl JobRun { } pub fn exit_status(&mut self) -> Option { - self.child.try_wait().expect("Failed to wait on child") + self.child.exit_status() } } @@ -111,13 +154,12 @@ mod tests { #[test] fn test_process_lines_heartbeat() { - let lines_1 = vec!("Hello, salem".to_string()); + let lines_1 = vec!["Hello, salem".to_string()]; let events_1 = JobRun::process_lines(Default::default(), &lines_1); assert_eq!(events_1.len(), 1); - let lines_2 = vec!("Hello, salem".to_string(), "Hello, pippin".to_string()); + let lines_2 = vec!["Hello, salem".to_string(), "Hello, pippin".to_string()]; let events_2 = JobRun::process_lines(Default::default(), &lines_2); assert_eq!(events_2.len(), 1); } } - diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index 1919a6a..a49f1fa 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -12,14 +12,12 @@ the visitor pattern to monitor job exec progress and liveness, and adds */ -#[derive(Debug)] struct Orchestrator { bel: BuildEventLog, job_runs: Vec, config: OrchestratorConfig, } -#[derive(Debug)] struct JobRunHandle { job_run: JobRun, bel_idx: u64,