impl dep miss state in job runs

This commit is contained in:
Stuart Axelbrooke 2025-10-16 19:55:50 -07:00
parent 2cd2ce7f7d
commit aa2106ad8c
3 changed files with 55 additions and 21 deletions

View file

@ -1,14 +1,10 @@
use crate::JobRunMissingDeps; use crate::JobRunMissingDeps;
pub struct LogLine(String);
// TODO - how do we version this? // TODO - how do we version this?
const DATABUILD_JSON: &str = "DATABUILD_MISSING_DEPS_JSON:"; const DATABUILD_JSON: &str = "DATABUILD_MISSING_DEPS_JSON:";
impl From<LogLine> for Option<JobRunMissingDeps> { pub fn parse_log_line(line: &str) -> Option<JobRunMissingDeps> {
fn from(value: LogLine) -> Self { line_matches(line).and_then(json_to_missing_deps)
line_matches(&value.0).and_then(json_to_missing_deps)
}
} }
fn line_matches(line: &str) -> Option<&str> { fn line_matches(line: &str) -> Option<&str> {
@ -25,11 +21,9 @@ mod tests {
#[test] #[test]
fn test_parse_missing_deps_with_1_to_1_and_1_to_n() { fn test_parse_missing_deps_with_1_to_1_and_1_to_n() {
let log_line = LogLine( let log_line = r#"DATABUILD_MISSING_DEPS_JSON:{"version":"1","missing_deps":[{"impacted":[{"ref":"output/p1"}],"missing":[{"ref":"input/p1"}]},{"impacted":[{"ref":"output/p2"},{"ref":"output/p3"}],"missing":[{"ref":"input/p2"}]}]}"#.to_string();
r#"DATABUILD_MISSING_DEPS_JSON:{"version":"1","missing_deps":[{"impacted":[{"ref":"output/p1"}],"missing":[{"ref":"input/p1"}]},{"impacted":[{"ref":"output/p2"},{"ref":"output/p3"}],"missing":[{"ref":"input/p2"}]}]}"#.to_string()
);
let result: Option<JobRunMissingDeps> = log_line.into(); let result = parse_log_line(&log_line);
assert!(result.is_some()); assert!(result.is_some());
let missing_deps = result.unwrap(); let missing_deps = result.unwrap();

View file

@ -1,12 +1,12 @@
use crate::data_build_event::Event; use crate::data_build_event::Event;
use crate::{EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunStatus, JobRunSuccessEventV1}; use crate::{EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDeps, JobRunStatus, JobRunSuccessEventV1, MissingDeps};
use std::collections::HashMap; use std::collections::HashMap;
use std::error::Error; use std::error::Error;
use std::io::{BufRead, BufReader}; use std::io::{BufRead, BufReader};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::process::{Child, Command, Stdio}; use std::process::{Child, Command, Stdio};
use uuid::Uuid; use uuid::Uuid;
use crate::data_deps::parse_log_line;
// TODO log to /var/log/databuild/jobruns/$JOB_RUN_ID/, and rotate over max size (e.g. only ever use 1GB for logs) // TODO log to /var/log/databuild/jobruns/$JOB_RUN_ID/, and rotate over max size (e.g. only ever use 1GB for logs)
// Leave door open to background log processor that tails job logs, but don't include in jobrun concept // Leave door open to background log processor that tails job logs, but don't include in jobrun concept
@ -17,6 +17,7 @@ pub trait JobRunBackend: Sized {
type CompletedState; type CompletedState;
type FailedState; type FailedState;
type CanceledState; type CanceledState;
type DepMissState;
/// Create a new not-started job run /// Create a new not-started job run
fn create(entry_point: String, args: Vec<String>) -> Self::NotStartedState; fn create(entry_point: String, args: Vec<String>) -> Self::NotStartedState;
@ -35,7 +36,7 @@ pub trait JobRunBackend: Sized {
/// Poll a running job for state changes /// Poll a running job for state changes
fn poll( fn poll(
running: &mut Self::RunningState running: &mut Self::RunningState
) -> Result<PollResult<Self::CompletedState, Self::FailedState>, Box<dyn Error>>; ) -> Result<PollResult<Self::CompletedState, Self::FailedState, Self::DepMissState>, Box<dyn Error>>;
/// Cancel a running job /// Cancel a running job
fn cancel_job( fn cancel_job(
@ -45,10 +46,11 @@ pub trait JobRunBackend: Sized {
} }
/// Result of polling a running job /// Result of polling a running job
pub enum PollResult<C, F> { pub enum PollResult<C, F, D> {
StillRunning, StillRunning,
Completed(C), Completed(C),
Failed(F), Failed(F),
DepMiss(D),
} }
/// Generic JobRun that works with any backend, parameterized by state /// Generic JobRun that works with any backend, parameterized by state
@ -64,6 +66,7 @@ pub type RunningJobRun<B> = JobRun<B, <B as JobRunBackend>::RunningState>;
pub type CompletedJobRun<B> = JobRun<B, <B as JobRunBackend>::CompletedState>; pub type CompletedJobRun<B> = JobRun<B, <B as JobRunBackend>::CompletedState>;
pub type FailedJobRun<B> = JobRun<B, <B as JobRunBackend>::FailedState>; pub type FailedJobRun<B> = JobRun<B, <B as JobRunBackend>::FailedState>;
pub type CanceledJobRun<B> = JobRun<B, <B as JobRunBackend>::CanceledState>; pub type CanceledJobRun<B> = JobRun<B, <B as JobRunBackend>::CanceledState>;
pub type DepMissJobRun<B> = JobRun<B, <B as JobRunBackend>::DepMissState>;
// Methods available on all JobRun states // Methods available on all JobRun states
impl<B: JobRunBackend, S> JobRun<B, S> { impl<B: JobRunBackend, S> JobRun<B, S> {
@ -120,6 +123,14 @@ impl<B: JobRunBackend> RunningJobRun<B> {
_backend: PhantomData, _backend: PhantomData,
})) }))
} }
PollResult::DepMiss(state) => {
let job_run_id = self.job_run_id;
Ok(JobRunVisitResult::DepMiss(JobRun {
job_run_id,
state,
_backend: PhantomData,
}))
}
} }
} }
@ -138,6 +149,7 @@ pub enum JobRunVisitResult<B: JobRunBackend> {
StillRunning, StillRunning,
Completed(CompletedJobRun<B>), Completed(CompletedJobRun<B>),
Failed(FailedJobRun<B>), Failed(FailedJobRun<B>),
DepMiss(DepMissJobRun<B>),
} }
pub enum JobRunConfig { pub enum JobRunConfig {
@ -180,12 +192,18 @@ pub struct SubProcessCanceled {
pub stdout_buffer: Vec<String>, pub stdout_buffer: Vec<String>,
} }
pub struct SubProcessDepMiss {
pub stdout_buffer: Vec<String>,
pub missing_deps: Vec<MissingDeps>,
}
impl JobRunBackend for SubProcessBackend { impl JobRunBackend for SubProcessBackend {
type NotStartedState = SubProcessNotStarted; type NotStartedState = SubProcessNotStarted;
type RunningState = SubProcessRunning; type RunningState = SubProcessRunning;
type CompletedState = SubProcessCompleted; type CompletedState = SubProcessCompleted;
type FailedState = SubProcessFailed; type FailedState = SubProcessFailed;
type CanceledState = SubProcessCanceled; type CanceledState = SubProcessCanceled;
type DepMissState = SubProcessDepMiss;
fn create(entry_point: String, args: Vec<String>) -> Self::NotStartedState { fn create(entry_point: String, args: Vec<String>) -> Self::NotStartedState {
SubProcessNotStarted { entry_point, args } SubProcessNotStarted { entry_point, args }
@ -210,8 +228,9 @@ impl JobRunBackend for SubProcessBackend {
fn poll( fn poll(
running: &mut Self::RunningState running: &mut Self::RunningState
) -> Result<PollResult<Self::CompletedState, Self::FailedState>, Box<dyn Error>> { ) -> Result<PollResult<Self::CompletedState, Self::FailedState, Self::DepMissState>, Box<dyn Error>> {
// Non-blocking check for exit status // Non-blocking check for exit status
let mut missing_deps: Option<JobRunMissingDeps> = None;
if let Some(exit_status) = running.process.try_wait()? { if let Some(exit_status) = running.process.try_wait()? {
// Read any remaining stdout // Read any remaining stdout
if let Some(stdout) = running.process.stdout.take() { if let Some(stdout) = running.process.stdout.take() {
@ -219,6 +238,9 @@ impl JobRunBackend for SubProcessBackend {
for line in reader.lines() { for line in reader.lines() {
// TODO we should write lines to the job's file logs // TODO we should write lines to the job's file logs
if let Ok(line) = line { if let Ok(line) = line {
if let Some(dep_miss) = parse_log_line(&line) {
missing_deps = Some(dep_miss);
}
running.stdout_buffer.push(line); running.stdout_buffer.push(line);
} }
} }
@ -238,12 +260,21 @@ impl JobRunBackend for SubProcessBackend {
} }
Some(code) => { Some(code) => {
// Failed with exit code // Failed with exit code
let reason = format!("Job failed with exit code {}", code); match missing_deps {
Ok(PollResult::Failed(SubProcessFailed { Some(misses) => Ok(PollResult::DepMiss(SubProcessDepMiss {
exit_code: code, stdout_buffer,
reason, missing_deps: misses.missing_deps,
stdout_buffer, })),
})) None => {
// No missing deps, job failed
let reason = format!("Job failed with exit code {}", code);
Ok(PollResult::Failed(SubProcessFailed {
exit_code: code,
reason,
stdout_buffer,
}))
}
}
} }
None => { None => {
// Terminated by signal (Unix) - treat as failure // Terminated by signal (Unix) - treat as failure
@ -351,6 +382,9 @@ mod tests {
std::thread::sleep(std::time::Duration::from_millis(10)); std::thread::sleep(std::time::Duration::from_millis(10));
continue; continue;
} }
JobRunVisitResult::DepMiss(dep_miss) => {
panic!("Job dep miss unexpectedly");
}
} }
} }
} }
@ -384,6 +418,9 @@ mod tests {
std::thread::sleep(std::time::Duration::from_millis(10)); std::thread::sleep(std::time::Duration::from_millis(10));
continue; continue;
} }
JobRunVisitResult::DepMiss(dep_miss) => {
panic!("Job dep miss unexpectedly");
}
} }
} }
} }

View file

@ -147,6 +147,9 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
self.bel.append_event(&event)?; self.bel.append_event(&event)?;
self.failed_jobs.push(failed); self.failed_jobs.push(failed);
} }
JobRunVisitResult::DepMiss(dep_miss) => {
todo!();
}
} }
} }
self.running_jobs = still_running; self.running_jobs = still_running;