diff --git a/databuild/databuild.proto b/databuild/databuild.proto index 8dcba81..a1513e5 100644 --- a/databuild/databuild.proto +++ b/databuild/databuild.proto @@ -73,6 +73,7 @@ message JobRunSuccessEventV1 { // Simply indicates that the job has failed. Depending on retry logic defined in job, it may retry. message JobRunFailureEventV1 { string job_run_id = 1; + string reason = 2; } // Job was explicitly canceled. message JobRunCancelEventV1 { diff --git a/databuild/event_transforms.rs b/databuild/event_transforms.rs index 25d913e..5d37a40 100644 --- a/databuild/event_transforms.rs +++ b/databuild/event_transforms.rs @@ -1,5 +1,5 @@ use crate::util::current_timestamp; -use crate::{PartitionRef, PartitionStatus, PartitionStatusCode, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode}; +use crate::{JobRunStatus, JobRunStatusCode, PartitionRef, PartitionStatus, PartitionStatusCode, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode}; use crate::data_build_event::Event; impl From<&WantCreateEventV1> for WantDetail { @@ -59,3 +59,12 @@ impl From for PartitionStatus { } } } + +impl From for JobRunStatus { + fn from(code: JobRunStatusCode) -> Self { + JobRunStatus { + code: code.into(), + name: code.as_str_name().to_string(), + } + } +} diff --git a/databuild/job_run.rs b/databuild/job_run.rs index ea1965f..5aed740 100644 --- a/databuild/job_run.rs +++ b/databuild/job_run.rs @@ -1,8 +1,9 @@ +use std::collections::HashMap; use crate::build_event_log::{BELStorage, MemoryBELStorage}; use crate::data_build_event::Event; -use crate::{JobRunHeartbeatEventV1, JobRunStatus}; +use crate::{JobRunHeartbeatEventV1, JobRunStatus, JobRunStatusCode, JobRunSuccessEventV1, JobRunFailureEventV1}; use std::error::Error; -use std::io::BufRead; +use std::io::{BufRead, BufReader}; use std::process::{Child, Command, Stdio}; use uuid::Uuid; @@ -16,7 +17,10 @@ pub trait JobRun { */ fn visit(&mut self) -> Result>; fn cancel(&mut self) -> Result<(), Box>; - fn run(&mut self) -> Result>; + fn run_with_env(&mut self, env: Option>) -> Result>; + fn run(&mut self) -> Result> { + self.run_with_env(None) + } } pub enum JobRunConfig { @@ -37,6 +41,8 @@ pub struct SubProcessJobRun { pub running: bool, pub entry_point: String, pub args: Vec, + pub stdout_buffer: Vec, // Buffered stdout lines + pub exit_code: Option, // Cached exit code } impl JobRun for SubProcessJobRun { @@ -45,20 +51,91 @@ impl JobRun for SubProcessJobRun { } fn visit(&mut self) -> Result> { - todo!() + let mut new_events = Vec::new(); + + // visit() should only be called on running jobs + if !self.running { + return Err("visit() called on non-running job".into()); + } + + // Process must be present if running + let child = self.process.as_mut() + .ok_or("visit() called but no process present")?; + + // Non-blocking check for exit status + if let Some(exit_status) = child.try_wait()? { + self.running = false; + + // Read any remaining stdout + if let Some(stdout) = child.stdout.take() { + let reader = BufReader::new(stdout); + for line in reader.lines() { + if let Ok(line) = line { + self.stdout_buffer.push(line); + } + } + } + + // Check exit status + match exit_status.code() { + Some(0) => { + // Success case + self.exit_code = Some(0); + new_events.push(Event::JobRunSuccessV1(JobRunSuccessEventV1 { + job_run_id: self.job_run_id.to_string(), + })); + return Ok(JobRunPollResult { + new_events, + status: JobRunStatusCode::JobRunSucceeded.into(), + }); + } + Some(code) => { + // Failed with exit code + self.exit_code = Some(code); + new_events.push(Event::JobRunFailureV1(JobRunFailureEventV1 { + job_run_id: self.job_run_id.to_string(), + reason: format!("Job failed with exit code {}", code), + })); + return Ok(JobRunPollResult { + new_events, + status: JobRunStatusCode::JobRunFailed.into(), + }); + } + None => { + // Terminated by signal (Unix) - treat as failure + new_events.push(Event::JobRunFailureV1(JobRunFailureEventV1 { + job_run_id: self.job_run_id.to_string(), + reason: format!("Job terminated by signal: {}", exit_status), + })); + return Ok(JobRunPollResult { + new_events, + status: JobRunStatusCode::JobRunFailed.into(), + }); + } + } + } + + // Still running + Ok(JobRunPollResult { + new_events, + status: JobRunStatusCode::JobRunRunning.into(), + }) } fn cancel(&mut self) -> Result<(), Box> { todo!() } - fn run(&mut self) -> Result> { + /// Mostly for test purposes + fn run_with_env(&mut self, env: Option>) -> Result> { self.process = Some(Command::new(self.entry_point.clone()) .args(self.args.clone()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) + .envs(env.unwrap_or_default()) .spawn()?); self.running = true; + // TODO should this return the event now? Or enqueue it? No sense in waiting I suppose, and orchestrator should just handle it? Ok(JobRunHeartbeatEventV1 { job_run_id: self.id().to_string() }) } } @@ -72,6 +149,8 @@ impl SubProcessJobRun { running: false, entry_point, args, + stdout_buffer: Vec::new(), + exit_code: None, })) } } @@ -82,24 +161,22 @@ pub struct JobRunPollResult { } mod tests { + use std::collections::HashMap; + use crate::data_build_event::Event; + use crate::job_run::SubProcessJobRun; use crate::JobRunStatusCode; + fn test_helper_path() -> String { + std::env::var("TEST_SRCDIR") + .map(|srcdir| format!("{}/_main/databuild/test/test_job_helper", srcdir)) + .unwrap_or_else(|_| "bazel-bin/databuild/test/test_job_helper".to_string()) + } + /// Happy path - run that succeeds should emit a JobRunSuccessEventV1 #[test] - #[ignore] fn test_job_run_success_returns_job_run_success_event() { - use super::*; - use crate::data_build_event::Event; - - // Get path to test helper binary from Bazel runfiles - let test_helper = std::env::var("TEST_SRCDIR") - .map(|srcdir| format!("{}/_main/databuild/test/test_job_helper", srcdir)) - .unwrap_or_else(|_| "bazel-bin/databuild/test/test_job_helper".to_string()); - - println!("test_helper: {}", test_helper); - // Spawn a job run that will succeed (exit code 0) - let mut job_run = SubProcessJobRun::spawn(test_helper, vec![]).unwrap(); + let mut job_run = SubProcessJobRun::spawn(test_helper_path(), vec![]).unwrap(); // Start the job job_run.run().unwrap(); @@ -140,9 +217,44 @@ mod tests { /// Job run that fails should emit a JobRunFailureEventV1 #[test] - #[ignore] fn test_job_run_failure_returns_job_run_failure_event() { - todo!() + // Spawn a job run + let mut job_run = SubProcessJobRun::spawn(test_helper_path(), vec![]).unwrap(); + + // Start the job with an exit code that indicates failure (non-zero) + let env: HashMap = HashMap::from([ + ("DATABUILD_TEST_EXIT_CODE".to_string(), "1".to_string()) + ]); + job_run.run_with_env(Some(env)).unwrap(); + + + // Poll until we get completion + loop { + let result = job_run.visit().unwrap(); + + // Check if we got a success event + if result.new_events.iter().any(|event| { + matches!(event, Event::JobRunSuccessV1(_)) + }) { + panic!("Job succeeded unexpectedly"); + }; + + if result.new_events.iter().any(|event| { + matches!(event, Event::JobRunFailureV1(_)) + }) { + break; + } + + // If job is still running, sleep briefly and poll again + let expected = JobRunStatusCode::JobRunRunning as i32; + if matches!(result.status.code, expected) { + std::thread::sleep(std::time::Duration::from_millis(10)); + continue; + } + + // If we got here, job failed when it shouldn't have + panic!("Job failed unexpectedly: {:?}", result.status); + } } /// Canceling an event before it completes should result in it: