diff --git a/databuild/job_run.rs b/databuild/job_run.rs index a776b0b..04688cb 100644 --- a/databuild/job_run.rs +++ b/databuild/job_run.rs @@ -1,10 +1,11 @@ use crate::build_event_log::{BELStorage, MemoryBELStorage}; use crate::data_build_event::Event; -use crate::{DataBuildEvent, JobRunHeartbeatEventV1}; +use crate::{DataBuildEvent, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunSuccessEventV1}; use std::error::Error; use std::io::{BufRead, BufReader}; -use std::process::{Child, Command, Stdio}; +use std::process::{Child, Command, ExitStatus, Stdio}; use uuid::Uuid; +use crate::data_build_event::Event::{JobRunFailureV1, JobRunSuccessV1}; #[derive(Debug)] pub struct JobRun { @@ -40,6 +41,22 @@ impl JobRun { for event in new_events { self.events.append_event(event)?; } self.unhandled_lines.drain(..); + // Potentially react to job completion + match self.exit_status() { + None => {}, // No exit -> no harm + Some(status) => { + if status.success() { + self.events.append_event(JobRunSuccessV1(JobRunSuccessEventV1 { + job_run_id: self.job_run_id.into(), + }))?; + } else { + self.events.append_event(JobRunFailureV1(JobRunFailureEventV1 { + job_run_id: self.job_run_id.into(), + }))?; + } + } + } + // Return BEL events since provided idx self.events.list_events(since_idx, EVENT_SIZE_LIMIT).and_then(|events| { if events.len() as u64 == EVENT_SIZE_LIMIT { @@ -50,6 +67,10 @@ impl JobRun { } ) } + pub fn cancel(&mut self) { + todo!() + } + pub fn event_for_line(line: String) -> Option { // TODO parse missing data dep event // TODO parse job state @@ -73,8 +94,8 @@ impl JobRun { events } - pub fn is_complete(&mut self) -> bool { - self.child.try_wait().expect("Failed to wait on child").is_some() + pub fn exit_status(&mut self) -> Option { + self.child.try_wait().expect("Failed to wait on child") } } diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index 220a3c5..1919a6a 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -69,7 +69,8 @@ impl Orchestrator { self.bel.append_event(event.clone()).expect("Failed to append event"); }); - jr.job_run.is_complete() + // Retain job run if it doesn't yet have an exit code (still running) + jr.job_run.exit_status().is_none() }); Ok(())