diff --git a/databuild/data_deps.rs b/databuild/data_deps.rs index 24cc638..1b3209a 100644 --- a/databuild/data_deps.rs +++ b/databuild/data_deps.rs @@ -1,14 +1,10 @@ use crate::JobRunMissingDeps; -pub struct LogLine(String); - // TODO - how do we version this? const DATABUILD_JSON: &str = "DATABUILD_MISSING_DEPS_JSON:"; -impl From for Option { - fn from(value: LogLine) -> Self { - line_matches(&value.0).and_then(json_to_missing_deps) - } +pub fn parse_log_line(line: &str) -> Option { + line_matches(line).and_then(json_to_missing_deps) } fn line_matches(line: &str) -> Option<&str> { @@ -25,11 +21,9 @@ mod tests { #[test] fn test_parse_missing_deps_with_1_to_1_and_1_to_n() { - let log_line = LogLine( - r#"DATABUILD_MISSING_DEPS_JSON:{"version":"1","missing_deps":[{"impacted":[{"ref":"output/p1"}],"missing":[{"ref":"input/p1"}]},{"impacted":[{"ref":"output/p2"},{"ref":"output/p3"}],"missing":[{"ref":"input/p2"}]}]}"#.to_string() - ); + let log_line = r#"DATABUILD_MISSING_DEPS_JSON:{"version":"1","missing_deps":[{"impacted":[{"ref":"output/p1"}],"missing":[{"ref":"input/p1"}]},{"impacted":[{"ref":"output/p2"},{"ref":"output/p3"}],"missing":[{"ref":"input/p2"}]}]}"#.to_string(); - let result: Option = log_line.into(); + let result = parse_log_line(&log_line); assert!(result.is_some()); let missing_deps = result.unwrap(); diff --git a/databuild/job_run.rs b/databuild/job_run.rs index 3dd4f14..49889e5 100644 --- a/databuild/job_run.rs +++ b/databuild/job_run.rs @@ -1,12 +1,12 @@ use crate::data_build_event::Event; -use crate::{EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunStatus, JobRunSuccessEventV1}; +use crate::{EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDeps, JobRunStatus, JobRunSuccessEventV1, MissingDeps}; use std::collections::HashMap; use std::error::Error; use std::io::{BufRead, BufReader}; use std::marker::PhantomData; use std::process::{Child, Command, Stdio}; use uuid::Uuid; - +use crate::data_deps::parse_log_line; // TODO log to /var/log/databuild/jobruns/$JOB_RUN_ID/, and rotate over max size (e.g. only ever use 1GB for logs) // Leave door open to background log processor that tails job logs, but don't include in jobrun concept @@ -17,6 +17,7 @@ pub trait JobRunBackend: Sized { type CompletedState; type FailedState; type CanceledState; + type DepMissState; /// Create a new not-started job run fn create(entry_point: String, args: Vec) -> Self::NotStartedState; @@ -35,7 +36,7 @@ pub trait JobRunBackend: Sized { /// Poll a running job for state changes fn poll( running: &mut Self::RunningState - ) -> Result, Box>; + ) -> Result, Box>; /// Cancel a running job fn cancel_job( @@ -45,10 +46,11 @@ pub trait JobRunBackend: Sized { } /// Result of polling a running job -pub enum PollResult { +pub enum PollResult { StillRunning, Completed(C), Failed(F), + DepMiss(D), } /// Generic JobRun that works with any backend, parameterized by state @@ -64,6 +66,7 @@ pub type RunningJobRun = JobRun::RunningState>; pub type CompletedJobRun = JobRun::CompletedState>; pub type FailedJobRun = JobRun::FailedState>; pub type CanceledJobRun = JobRun::CanceledState>; +pub type DepMissJobRun = JobRun::DepMissState>; // Methods available on all JobRun states impl JobRun { @@ -120,6 +123,14 @@ impl RunningJobRun { _backend: PhantomData, })) } + PollResult::DepMiss(state) => { + let job_run_id = self.job_run_id; + Ok(JobRunVisitResult::DepMiss(JobRun { + job_run_id, + state, + _backend: PhantomData, + })) + } } } @@ -138,6 +149,7 @@ pub enum JobRunVisitResult { StillRunning, Completed(CompletedJobRun), Failed(FailedJobRun), + DepMiss(DepMissJobRun), } pub enum JobRunConfig { @@ -180,12 +192,18 @@ pub struct SubProcessCanceled { pub stdout_buffer: Vec, } +pub struct SubProcessDepMiss { + pub stdout_buffer: Vec, + pub missing_deps: Vec, +} + impl JobRunBackend for SubProcessBackend { type NotStartedState = SubProcessNotStarted; type RunningState = SubProcessRunning; type CompletedState = SubProcessCompleted; type FailedState = SubProcessFailed; type CanceledState = SubProcessCanceled; + type DepMissState = SubProcessDepMiss; fn create(entry_point: String, args: Vec) -> Self::NotStartedState { SubProcessNotStarted { entry_point, args } @@ -210,8 +228,9 @@ impl JobRunBackend for SubProcessBackend { fn poll( running: &mut Self::RunningState - ) -> Result, Box> { + ) -> Result, Box> { // Non-blocking check for exit status + let mut missing_deps: Option = None; if let Some(exit_status) = running.process.try_wait()? { // Read any remaining stdout if let Some(stdout) = running.process.stdout.take() { @@ -219,6 +238,9 @@ impl JobRunBackend for SubProcessBackend { for line in reader.lines() { // TODO we should write lines to the job's file logs if let Ok(line) = line { + if let Some(dep_miss) = parse_log_line(&line) { + missing_deps = Some(dep_miss); + } running.stdout_buffer.push(line); } } @@ -238,12 +260,21 @@ impl JobRunBackend for SubProcessBackend { } Some(code) => { // Failed with exit code - let reason = format!("Job failed with exit code {}", code); - Ok(PollResult::Failed(SubProcessFailed { - exit_code: code, - reason, - stdout_buffer, - })) + match missing_deps { + Some(misses) => Ok(PollResult::DepMiss(SubProcessDepMiss { + stdout_buffer, + missing_deps: misses.missing_deps, + })), + None => { + // No missing deps, job failed + let reason = format!("Job failed with exit code {}", code); + Ok(PollResult::Failed(SubProcessFailed { + exit_code: code, + reason, + stdout_buffer, + })) + } + } } None => { // Terminated by signal (Unix) - treat as failure @@ -351,6 +382,9 @@ mod tests { std::thread::sleep(std::time::Duration::from_millis(10)); continue; } + JobRunVisitResult::DepMiss(dep_miss) => { + panic!("Job dep miss unexpectedly"); + } } } } @@ -384,6 +418,9 @@ mod tests { std::thread::sleep(std::time::Duration::from_millis(10)); continue; } + JobRunVisitResult::DepMiss(dep_miss) => { + panic!("Job dep miss unexpectedly"); + } } } } diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index 2553405..ac564df 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -147,6 +147,9 @@ impl Orchestrator { self.bel.append_event(&event)?; self.failed_jobs.push(failed); } + JobRunVisitResult::DepMiss(dep_miss) => { + todo!(); + } } } self.running_jobs = still_running;