diff --git a/databuild/data_deps.rs b/databuild/data_deps.rs index 1b3209a..69f92bb 100644 --- a/databuild/data_deps.rs +++ b/databuild/data_deps.rs @@ -1,7 +1,7 @@ use crate::JobRunMissingDeps; // TODO - how do we version this? -const DATABUILD_JSON: &str = "DATABUILD_MISSING_DEPS_JSON:"; +pub const DATABUILD_JSON: &str = "DATABUILD_MISSING_DEPS_JSON:"; pub fn parse_log_line(line: &str) -> Option { line_matches(line).and_then(json_to_missing_deps) diff --git a/databuild/job_run.rs b/databuild/job_run.rs index 49889e5..4dfb4d5 100644 --- a/databuild/job_run.rs +++ b/databuild/job_run.rs @@ -1,12 +1,15 @@ use crate::data_build_event::Event; -use crate::{EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDeps, JobRunStatus, JobRunSuccessEventV1, MissingDeps}; +use crate::data_deps::parse_log_line; +use crate::{ + EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDeps, JobRunStatus, + JobRunSuccessEventV1, MissingDeps, +}; use std::collections::HashMap; use std::error::Error; use std::io::{BufRead, BufReader}; use std::marker::PhantomData; use std::process::{Child, Command, Stdio}; use uuid::Uuid; -use crate::data_deps::parse_log_line; // TODO log to /var/log/databuild/jobruns/$JOB_RUN_ID/, and rotate over max size (e.g. only ever use 1GB for logs) // Leave door open to background log processor that tails job logs, but don't include in jobrun concept @@ -30,18 +33,21 @@ pub trait JobRunBackend: Sized { /// Transition from NotStarted to Running fn start( not_started: Self::NotStartedState, - env: Option> + env: Option>, ) -> Result>; /// Poll a running job for state changes fn poll( - running: &mut Self::RunningState - ) -> Result, Box>; + running: &mut Self::RunningState, + ) -> Result< + PollResult, + Box, + >; /// Cancel a running job fn cancel_job( running: Self::RunningState, - source: EventSource + source: EventSource, ) -> Result>; } @@ -91,7 +97,7 @@ impl NotStartedJobRun { pub fn run_with_env( self, - env: Option> + env: Option>, ) -> Result, Box> { let running_state = B::start(self.state, env)?; Ok(JobRun { @@ -153,7 +159,10 @@ pub enum JobRunVisitResult { } pub enum JobRunConfig { - SubProcess { entry_point: String, args: Vec }, + SubProcess { + entry_point: String, + args: Vec, + }, } // ===== SubProcess Backend Implementation ===== @@ -211,7 +220,7 @@ impl JobRunBackend for SubProcessBackend { fn start( not_started: Self::NotStartedState, - env: Option> + env: Option>, ) -> Result> { let process = Command::new(not_started.entry_point) .args(not_started.args) @@ -227,10 +236,12 @@ impl JobRunBackend for SubProcessBackend { } fn poll( - running: &mut Self::RunningState - ) -> Result, Box> { + running: &mut Self::RunningState, + ) -> Result< + PollResult, + Box, + > { // Non-blocking check for exit status - let mut missing_deps: Option = None; if let Some(exit_status) = running.process.try_wait()? { // Read any remaining stdout if let Some(stdout) = running.process.stdout.take() { @@ -238,9 +249,6 @@ impl JobRunBackend for SubProcessBackend { for line in reader.lines() { // TODO we should write lines to the job's file logs if let Ok(line) = line { - if let Some(dep_miss) = parse_log_line(&line) { - missing_deps = Some(dep_miss); - } running.stdout_buffer.push(line); } } @@ -259,8 +267,10 @@ impl JobRunBackend for SubProcessBackend { })) } Some(code) => { + let missing_deps = stdout_buffer.iter().flat_map(|s| parse_log_line(&s)); + // Failed with exit code - match missing_deps { + match missing_deps.last() { Some(misses) => Ok(PollResult::DepMiss(SubProcessDepMiss { stdout_buffer, missing_deps: misses.missing_deps, @@ -294,7 +304,7 @@ impl JobRunBackend for SubProcessBackend { fn cancel_job( mut running: Self::RunningState, - source: EventSource + source: EventSource, ) -> Result> { // Kill the process running.process.kill()?; @@ -346,8 +356,9 @@ pub struct JobRunPollResult { mod tests { use crate::data_build_event::Event; + use crate::data_deps::DATABUILD_JSON; use crate::job_run::{JobRunBackend, JobRunVisitResult, SubProcessBackend}; - use crate::ManuallyTriggeredEvent; + use crate::{JobRunMissingDeps, ManuallyTriggeredEvent, MissingDeps}; use std::collections::HashMap; fn test_helper_path() -> String { @@ -396,9 +407,8 @@ mod tests { let job_run = SubProcessBackend::spawn(test_helper_path(), vec![]); // Start the job with an exit code that indicates failure (non-zero) - let env: HashMap = HashMap::from([ - ("DATABUILD_TEST_EXIT_CODE".to_string(), "1".to_string()) - ]); + let env: HashMap = + HashMap::from([("DATABUILD_TEST_EXIT_CODE".to_string(), "1".to_string())]); let mut running_job = job_run.run_with_env(Some(env)).unwrap(); // Poll until we get completion @@ -430,8 +440,8 @@ mod tests { /// - Emitting a JobRunCancelEventV1 event #[test] fn test_job_run_cancel_returns_job_run_cancel_event() { - use std::fs; use crate::ManuallyTriggeredEvent; + use std::fs; use uuid::Uuid; // Create a temp file path for the test @@ -443,7 +453,10 @@ mod tests { let env: HashMap = HashMap::from([ ("DATABUILD_TEST_SLEEP_MS".to_string(), "1000".to_string()), ("DATABUILD_TEST_OUTPUT_FILE".to_string(), temp_file.clone()), - ("DATABUILD_TEST_FILE_CONTENT".to_string(), "completed".to_string()), + ( + "DATABUILD_TEST_FILE_CONTENT".to_string(), + "completed".to_string(), + ), ]); let running_job = job_run.run_with_env(Some(env)).unwrap(); @@ -451,7 +464,14 @@ mod tests { std::thread::sleep(std::time::Duration::from_millis(10)); // Cancel the job before it can complete - this consumes the running job and returns canceled - let canceled_job = running_job.cancel(ManuallyTriggeredEvent { user: "test_user".into() }.into()).unwrap(); + let canceled_job = running_job + .cancel( + ManuallyTriggeredEvent { + user: "test_user".into(), + } + .into(), + ) + .unwrap(); // Generate the cancel event from the canceled state let cancel_event = canceled_job.state.to_event(&canceled_job.id()); @@ -462,8 +482,10 @@ mod tests { assert_eq!(cancel_event.comment, Some("Job was canceled".to_string())); // Verify the output file was NOT written (process was killed before it could complete) - assert!(!std::path::Path::new(&temp_file).exists(), - "Output file should not exist - process should have been killed"); + assert!( + !std::path::Path::new(&temp_file).exists(), + "Output file should not exist - process should have been killed" + ); // Cleanup just in case let _ = fs::remove_file(&temp_file); @@ -471,8 +493,45 @@ mod tests { /// Job run that fails and emits a recognized "dep miss" statement should emit a JobRunMissingDepsEventV1 #[test] - #[ignore] fn test_job_run_fail_on_missing_deps_should_emit_missing_deps_event() { - todo!() + // Spawn a job run that will sleep for 1 second and write a file + let job_run = SubProcessBackend::spawn(test_helper_path(), vec![]); + + let expected_dep_miss = JobRunMissingDeps { + version: "1".into(), + missing_deps: vec![MissingDeps { + impacted: vec!["my_fav_output".into()], + missing: vec!["cool_input_1".into(), "cool_input_2".into()], + }], + }; + let dep_miss_json = + serde_json::to_string(&expected_dep_miss).expect("Failed to serialize dep miss"); + let dep_miss_line = format!("{}{}", DATABUILD_JSON, dep_miss_json); + let env: HashMap = HashMap::from([ + ("DATABUILD_TEST_STDOUT".to_string(), dep_miss_line), + ("DATABUILD_TEST_EXIT_CODE".to_string(), "1".to_string()), + ]); + let mut running_job = job_run.run_with_env(Some(env)).unwrap(); + + // Poll until we get completion + loop { + match running_job.visit().unwrap() { + JobRunVisitResult::Completed(_) => { + panic!("Job succeeded unexpectedly"); + } + JobRunVisitResult::Failed(failed) => { + panic!("Job failed unexpectedly"); + } + JobRunVisitResult::StillRunning => { + // Sleep briefly and poll again + std::thread::sleep(std::time::Duration::from_millis(10)); + continue; + } + JobRunVisitResult::DepMiss(backend) => { + assert_eq!(backend.state.missing_deps, expected_dep_miss.missing_deps); + break; + } + } + } } }