This commit is contained in:
Stuart Axelbrooke 2025-09-15 20:21:21 -07:00
parent 9342ae6816
commit 2be5b016eb
2 changed files with 27 additions and 5 deletions

View file

@ -1,10 +1,11 @@
use crate::build_event_log::{BELStorage, MemoryBELStorage}; use crate::build_event_log::{BELStorage, MemoryBELStorage};
use crate::data_build_event::Event; use crate::data_build_event::Event;
use crate::{DataBuildEvent, JobRunHeartbeatEventV1}; use crate::{DataBuildEvent, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunSuccessEventV1};
use std::error::Error; use std::error::Error;
use std::io::{BufRead, BufReader}; use std::io::{BufRead, BufReader};
use std::process::{Child, Command, Stdio}; use std::process::{Child, Command, ExitStatus, Stdio};
use uuid::Uuid; use uuid::Uuid;
use crate::data_build_event::Event::{JobRunFailureV1, JobRunSuccessV1};
#[derive(Debug)] #[derive(Debug)]
pub struct JobRun { pub struct JobRun {
@ -40,6 +41,22 @@ impl JobRun {
for event in new_events { self.events.append_event(event)?; } for event in new_events { self.events.append_event(event)?; }
self.unhandled_lines.drain(..); self.unhandled_lines.drain(..);
// Potentially react to job completion
match self.exit_status() {
None => {}, // No exit -> no harm
Some(status) => {
if status.success() {
self.events.append_event(JobRunSuccessV1(JobRunSuccessEventV1 {
job_run_id: self.job_run_id.into(),
}))?;
} else {
self.events.append_event(JobRunFailureV1(JobRunFailureEventV1 {
job_run_id: self.job_run_id.into(),
}))?;
}
}
}
// Return BEL events since provided idx // Return BEL events since provided idx
self.events.list_events(since_idx, EVENT_SIZE_LIMIT).and_then(|events| { self.events.list_events(since_idx, EVENT_SIZE_LIMIT).and_then(|events| {
if events.len() as u64 == EVENT_SIZE_LIMIT { if events.len() as u64 == EVENT_SIZE_LIMIT {
@ -50,6 +67,10 @@ impl JobRun {
} ) } )
} }
pub fn cancel(&mut self) {
todo!()
}
pub fn event_for_line(line: String) -> Option<Event> { pub fn event_for_line(line: String) -> Option<Event> {
// TODO parse missing data dep event // TODO parse missing data dep event
// TODO parse job state // TODO parse job state
@ -73,8 +94,8 @@ impl JobRun {
events events
} }
pub fn is_complete(&mut self) -> bool { pub fn exit_status(&mut self) -> Option<ExitStatus> {
self.child.try_wait().expect("Failed to wait on child").is_some() self.child.try_wait().expect("Failed to wait on child")
} }
} }

View file

@ -69,7 +69,8 @@ impl<B: BELStorage + Debug> Orchestrator<B> {
self.bel.append_event(event.clone()).expect("Failed to append event"); self.bel.append_event(event.clone()).expect("Failed to append event");
}); });
jr.job_run.is_complete() // Retain job run if it doesn't yet have an exit code (still running)
jr.job_run.exit_status().is_none()
}); });
Ok(()) Ok(())