Make child process wrapper

This commit is contained in:
Stuart Axelbrooke 2025-09-15 20:39:55 -07:00
parent 2be5b016eb
commit 5484363e52
2 changed files with 74 additions and 34 deletions

View file

@ -1,17 +1,48 @@
use crate::build_event_log::{BELStorage, MemoryBELStorage};
use crate::data_build_event::Event;
use crate::data_build_event::Event::{JobRunFailureV1, JobRunSuccessV1};
use crate::{DataBuildEvent, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunSuccessEventV1};
use std::error::Error;
use std::io::{BufRead, BufReader};
use std::ops::{Deref, DerefMut};
use std::process::{Child, Command, ExitStatus, Stdio};
use uuid::Uuid;
use crate::data_build_event::Event::{JobRunFailureV1, JobRunSuccessV1};
/** Wrapper type that can be mocked */
trait JobRunChild {
fn exit_status(&mut self) -> Option<ExitStatus>;
fn stdout_lines(&mut self) -> Vec<String>;
}
#[derive(Debug)]
struct WrappedProcessChild(Child);
impl JobRunChild for WrappedProcessChild {
fn exit_status(&mut self) -> Option<ExitStatus> {
self.0.try_wait().expect("coudn't wait")
}
fn stdout_lines(&mut self) -> Vec<String> {
let mut stdout_lines = Vec::new();
let stdout = self.0.stdout.take().expect("stdout not piped");
let reader = BufReader::new(stdout);
for line in reader.lines() {
stdout_lines.push(line.expect("stdout not piped"));
}
stdout_lines
}
}
impl From<Child> for WrappedProcessChild {
fn from(child: Child) -> Self {
Self { 0: child }
}
}
pub struct JobRun {
job_run_id: Uuid,
events: MemoryBELStorage,
child: Child,
child: Box<dyn JobRunChild>,
unhandled_lines: Vec<String>,
}
@ -22,49 +53,58 @@ impl JobRun {
Ok(JobRun {
job_run_id: Default::default(),
events: Default::default(),
child: Command::new(command).args(args).stdout(Stdio::piped())
.spawn()?,
child: Box::new(WrappedProcessChild::from(
Command::new(command)
.args(args)
.stdout(Stdio::piped())
.spawn()?,
)),
unhandled_lines: Default::default(),
})
}
pub fn visit(&mut self, since_idx: u64) -> Result<Vec<DataBuildEvent>, Box<dyn Error>> {
// Collect new lines from child process
let stdout = self.child.stdout.take().expect("stdout not piped");
let reader = BufReader::new(stdout);
for line in reader.lines() {
self.unhandled_lines.push(line?);
}
// Parse BEL events from child process
let new_events = Self::process_lines(self.job_run_id, &self.unhandled_lines);
for event in new_events { self.events.append_event(event)?; }
for event in new_events {
self.events.append_event(event)?;
}
self.unhandled_lines.drain(..);
// Potentially react to job completion
match self.exit_status() {
None => {}, // No exit -> no harm
None => {} // No exit -> no harm
Some(status) => {
if status.success() {
self.events.append_event(JobRunSuccessV1(JobRunSuccessEventV1 {
job_run_id: self.job_run_id.into(),
}))?;
self.events
.append_event(JobRunSuccessV1(JobRunSuccessEventV1 {
job_run_id: self.job_run_id.into(),
}))?;
} else {
self.events.append_event(JobRunFailureV1(JobRunFailureEventV1 {
job_run_id: self.job_run_id.into(),
}))?;
self.events
.append_event(JobRunFailureV1(JobRunFailureEventV1 {
job_run_id: self.job_run_id.into(),
}))?;
}
}
}
// Return BEL events since provided idx
self.events.list_events(since_idx, EVENT_SIZE_LIMIT).and_then(|events| {
if events.len() as u64 == EVENT_SIZE_LIMIT {
Err(format!("Returned {} events - that's way too many.", EVENT_SIZE_LIMIT).into())
} else {
Ok(events)
}
} )
self.events
.list_events(since_idx, EVENT_SIZE_LIMIT)
.and_then(|events| {
if events.len() as u64 == EVENT_SIZE_LIMIT {
Err(format!(
"Returned {} events - that's way too many.",
EVENT_SIZE_LIMIT
)
.into())
} else {
Ok(events)
}
})
}
pub fn cancel(&mut self) {
@ -82,12 +122,15 @@ impl JobRun {
if lines.len() > 0 {
// If any lines were written to stdout, we should heartbeat
events.push(Event::JobRunHeartbeatV1(
JobRunHeartbeatEventV1 { job_run_id: job_run_id.clone().into() }
));
events.push(Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 {
job_run_id: job_run_id.clone().into(),
}));
}
for event in lines.iter().flat_map(|line| Self::event_for_line(line.clone())) {
for event in lines
.iter()
.flat_map(|line| Self::event_for_line(line.clone()))
{
events.push(event);
}
@ -95,7 +138,7 @@ impl JobRun {
}
pub fn exit_status(&mut self) -> Option<ExitStatus> {
self.child.try_wait().expect("Failed to wait on child")
self.child.exit_status()
}
}
@ -111,13 +154,12 @@ mod tests {
#[test]
fn test_process_lines_heartbeat() {
let lines_1 = vec!("Hello, salem".to_string());
let lines_1 = vec!["Hello, salem".to_string()];
let events_1 = JobRun::process_lines(Default::default(), &lines_1);
assert_eq!(events_1.len(), 1);
let lines_2 = vec!("Hello, salem".to_string(), "Hello, pippin".to_string());
let lines_2 = vec!["Hello, salem".to_string(), "Hello, pippin".to_string()];
let events_2 = JobRun::process_lines(Default::default(), &lines_2);
assert_eq!(events_2.len(), 1);
}
}

View file

@ -12,14 +12,12 @@ the visitor pattern to monitor job exec progress and liveness, and adds
*/
#[derive(Debug)]
struct Orchestrator<B: BELStorage + Debug> {
bel: BuildEventLog<B>,
job_runs: Vec<JobRunHandle>,
config: OrchestratorConfig,
}
#[derive(Debug)]
struct JobRunHandle {
job_run: JobRun,
bel_idx: u64,