impl dep miss in job run

This commit is contained in:
Stuart Axelbrooke 2025-10-16 20:30:11 -07:00
parent aa2106ad8c
commit 3f223829bb
2 changed files with 88 additions and 29 deletions

View file

@ -1,7 +1,7 @@
use crate::JobRunMissingDeps;
// TODO - how do we version this?
const DATABUILD_JSON: &str = "DATABUILD_MISSING_DEPS_JSON:";
pub const DATABUILD_JSON: &str = "DATABUILD_MISSING_DEPS_JSON:";
pub fn parse_log_line(line: &str) -> Option<JobRunMissingDeps> {
line_matches(line).and_then(json_to_missing_deps)

View file

@ -1,12 +1,15 @@
use crate::data_build_event::Event;
use crate::{EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDeps, JobRunStatus, JobRunSuccessEventV1, MissingDeps};
use crate::data_deps::parse_log_line;
use crate::{
EventSource, JobRunCancelEventV1, JobRunFailureEventV1, JobRunMissingDeps, JobRunStatus,
JobRunSuccessEventV1, MissingDeps,
};
use std::collections::HashMap;
use std::error::Error;
use std::io::{BufRead, BufReader};
use std::marker::PhantomData;
use std::process::{Child, Command, Stdio};
use uuid::Uuid;
use crate::data_deps::parse_log_line;
// TODO log to /var/log/databuild/jobruns/$JOB_RUN_ID/, and rotate over max size (e.g. only ever use 1GB for logs)
// Leave door open to background log processor that tails job logs, but don't include in jobrun concept
@ -30,18 +33,21 @@ pub trait JobRunBackend: Sized {
/// Transition from NotStarted to Running
fn start(
not_started: Self::NotStartedState,
env: Option<HashMap<String, String>>
env: Option<HashMap<String, String>>,
) -> Result<Self::RunningState, Box<dyn Error>>;
/// Poll a running job for state changes
fn poll(
running: &mut Self::RunningState
) -> Result<PollResult<Self::CompletedState, Self::FailedState, Self::DepMissState>, Box<dyn Error>>;
running: &mut Self::RunningState,
) -> Result<
PollResult<Self::CompletedState, Self::FailedState, Self::DepMissState>,
Box<dyn Error>,
>;
/// Cancel a running job
fn cancel_job(
running: Self::RunningState,
source: EventSource
source: EventSource,
) -> Result<Self::CanceledState, Box<dyn Error>>;
}
@ -91,7 +97,7 @@ impl<B: JobRunBackend> NotStartedJobRun<B> {
pub fn run_with_env(
self,
env: Option<HashMap<String, String>>
env: Option<HashMap<String, String>>,
) -> Result<RunningJobRun<B>, Box<dyn Error>> {
let running_state = B::start(self.state, env)?;
Ok(JobRun {
@ -153,7 +159,10 @@ pub enum JobRunVisitResult<B: JobRunBackend> {
}
pub enum JobRunConfig {
SubProcess { entry_point: String, args: Vec<String> },
SubProcess {
entry_point: String,
args: Vec<String>,
},
}
// ===== SubProcess Backend Implementation =====
@ -211,7 +220,7 @@ impl JobRunBackend for SubProcessBackend {
fn start(
not_started: Self::NotStartedState,
env: Option<HashMap<String, String>>
env: Option<HashMap<String, String>>,
) -> Result<Self::RunningState, Box<dyn Error>> {
let process = Command::new(not_started.entry_point)
.args(not_started.args)
@ -227,10 +236,12 @@ impl JobRunBackend for SubProcessBackend {
}
fn poll(
running: &mut Self::RunningState
) -> Result<PollResult<Self::CompletedState, Self::FailedState, Self::DepMissState>, Box<dyn Error>> {
running: &mut Self::RunningState,
) -> Result<
PollResult<Self::CompletedState, Self::FailedState, Self::DepMissState>,
Box<dyn Error>,
> {
// Non-blocking check for exit status
let mut missing_deps: Option<JobRunMissingDeps> = None;
if let Some(exit_status) = running.process.try_wait()? {
// Read any remaining stdout
if let Some(stdout) = running.process.stdout.take() {
@ -238,9 +249,6 @@ impl JobRunBackend for SubProcessBackend {
for line in reader.lines() {
// TODO we should write lines to the job's file logs
if let Ok(line) = line {
if let Some(dep_miss) = parse_log_line(&line) {
missing_deps = Some(dep_miss);
}
running.stdout_buffer.push(line);
}
}
@ -259,8 +267,10 @@ impl JobRunBackend for SubProcessBackend {
}))
}
Some(code) => {
let missing_deps = stdout_buffer.iter().flat_map(|s| parse_log_line(&s));
// Failed with exit code
match missing_deps {
match missing_deps.last() {
Some(misses) => Ok(PollResult::DepMiss(SubProcessDepMiss {
stdout_buffer,
missing_deps: misses.missing_deps,
@ -294,7 +304,7 @@ impl JobRunBackend for SubProcessBackend {
fn cancel_job(
mut running: Self::RunningState,
source: EventSource
source: EventSource,
) -> Result<Self::CanceledState, Box<dyn Error>> {
// Kill the process
running.process.kill()?;
@ -346,8 +356,9 @@ pub struct JobRunPollResult {
mod tests {
use crate::data_build_event::Event;
use crate::data_deps::DATABUILD_JSON;
use crate::job_run::{JobRunBackend, JobRunVisitResult, SubProcessBackend};
use crate::ManuallyTriggeredEvent;
use crate::{JobRunMissingDeps, ManuallyTriggeredEvent, MissingDeps};
use std::collections::HashMap;
fn test_helper_path() -> String {
@ -396,9 +407,8 @@ mod tests {
let job_run = SubProcessBackend::spawn(test_helper_path(), vec![]);
// Start the job with an exit code that indicates failure (non-zero)
let env: HashMap<String, String> = HashMap::from([
("DATABUILD_TEST_EXIT_CODE".to_string(), "1".to_string())
]);
let env: HashMap<String, String> =
HashMap::from([("DATABUILD_TEST_EXIT_CODE".to_string(), "1".to_string())]);
let mut running_job = job_run.run_with_env(Some(env)).unwrap();
// Poll until we get completion
@ -430,8 +440,8 @@ mod tests {
/// - Emitting a JobRunCancelEventV1 event
#[test]
fn test_job_run_cancel_returns_job_run_cancel_event() {
use std::fs;
use crate::ManuallyTriggeredEvent;
use std::fs;
use uuid::Uuid;
// Create a temp file path for the test
@ -443,7 +453,10 @@ mod tests {
let env: HashMap<String, String> = HashMap::from([
("DATABUILD_TEST_SLEEP_MS".to_string(), "1000".to_string()),
("DATABUILD_TEST_OUTPUT_FILE".to_string(), temp_file.clone()),
("DATABUILD_TEST_FILE_CONTENT".to_string(), "completed".to_string()),
(
"DATABUILD_TEST_FILE_CONTENT".to_string(),
"completed".to_string(),
),
]);
let running_job = job_run.run_with_env(Some(env)).unwrap();
@ -451,7 +464,14 @@ mod tests {
std::thread::sleep(std::time::Duration::from_millis(10));
// Cancel the job before it can complete - this consumes the running job and returns canceled
let canceled_job = running_job.cancel(ManuallyTriggeredEvent { user: "test_user".into() }.into()).unwrap();
let canceled_job = running_job
.cancel(
ManuallyTriggeredEvent {
user: "test_user".into(),
}
.into(),
)
.unwrap();
// Generate the cancel event from the canceled state
let cancel_event = canceled_job.state.to_event(&canceled_job.id());
@ -462,8 +482,10 @@ mod tests {
assert_eq!(cancel_event.comment, Some("Job was canceled".to_string()));
// Verify the output file was NOT written (process was killed before it could complete)
assert!(!std::path::Path::new(&temp_file).exists(),
"Output file should not exist - process should have been killed");
assert!(
!std::path::Path::new(&temp_file).exists(),
"Output file should not exist - process should have been killed"
);
// Cleanup just in case
let _ = fs::remove_file(&temp_file);
@ -471,8 +493,45 @@ mod tests {
/// Job run that fails and emits a recognized "dep miss" statement should emit a JobRunMissingDepsEventV1
#[test]
#[ignore]
fn test_job_run_fail_on_missing_deps_should_emit_missing_deps_event() {
todo!()
// Spawn a job run that will sleep for 1 second and write a file
let job_run = SubProcessBackend::spawn(test_helper_path(), vec![]);
let expected_dep_miss = JobRunMissingDeps {
version: "1".into(),
missing_deps: vec![MissingDeps {
impacted: vec!["my_fav_output".into()],
missing: vec!["cool_input_1".into(), "cool_input_2".into()],
}],
};
let dep_miss_json =
serde_json::to_string(&expected_dep_miss).expect("Failed to serialize dep miss");
let dep_miss_line = format!("{}{}", DATABUILD_JSON, dep_miss_json);
let env: HashMap<String, String> = HashMap::from([
("DATABUILD_TEST_STDOUT".to_string(), dep_miss_line),
("DATABUILD_TEST_EXIT_CODE".to_string(), "1".to_string()),
]);
let mut running_job = job_run.run_with_env(Some(env)).unwrap();
// Poll until we get completion
loop {
match running_job.visit().unwrap() {
JobRunVisitResult::Completed(_) => {
panic!("Job succeeded unexpectedly");
}
JobRunVisitResult::Failed(failed) => {
panic!("Job failed unexpectedly");
}
JobRunVisitResult::StillRunning => {
// Sleep briefly and poll again
std::thread::sleep(std::time::Duration::from_millis(10));
continue;
}
JobRunVisitResult::DepMiss(backend) => {
assert_eq!(backend.state.missing_deps, expected_dep_miss.missing_deps);
break;
}
}
}
}
}