implement simple job succeeds and job fails tests for job run
This commit is contained in:
parent
fa5a5fa200
commit
7debea96a2
3 changed files with 142 additions and 20 deletions
|
|
@ -73,6 +73,7 @@ message JobRunSuccessEventV1 {
|
||||||
// Simply indicates that the job has failed. Depending on retry logic defined in job, it may retry.
|
// Simply indicates that the job has failed. Depending on retry logic defined in job, it may retry.
|
||||||
message JobRunFailureEventV1 {
|
message JobRunFailureEventV1 {
|
||||||
string job_run_id = 1;
|
string job_run_id = 1;
|
||||||
|
string reason = 2;
|
||||||
}
|
}
|
||||||
// Job was explicitly canceled.
|
// Job was explicitly canceled.
|
||||||
message JobRunCancelEventV1 {
|
message JobRunCancelEventV1 {
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
use crate::util::current_timestamp;
|
use crate::util::current_timestamp;
|
||||||
use crate::{PartitionRef, PartitionStatus, PartitionStatusCode, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode};
|
use crate::{JobRunStatus, JobRunStatusCode, PartitionRef, PartitionStatus, PartitionStatusCode, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode};
|
||||||
use crate::data_build_event::Event;
|
use crate::data_build_event::Event;
|
||||||
|
|
||||||
impl From<&WantCreateEventV1> for WantDetail {
|
impl From<&WantCreateEventV1> for WantDetail {
|
||||||
|
|
@ -59,3 +59,12 @@ impl From<PartitionStatusCode> for PartitionStatus {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<JobRunStatusCode> for JobRunStatus {
|
||||||
|
fn from(code: JobRunStatusCode) -> Self {
|
||||||
|
JobRunStatus {
|
||||||
|
code: code.into(),
|
||||||
|
name: code.as_str_name().to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,9 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
use crate::build_event_log::{BELStorage, MemoryBELStorage};
|
use crate::build_event_log::{BELStorage, MemoryBELStorage};
|
||||||
use crate::data_build_event::Event;
|
use crate::data_build_event::Event;
|
||||||
use crate::{JobRunHeartbeatEventV1, JobRunStatus};
|
use crate::{JobRunHeartbeatEventV1, JobRunStatus, JobRunStatusCode, JobRunSuccessEventV1, JobRunFailureEventV1};
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::io::BufRead;
|
use std::io::{BufRead, BufReader};
|
||||||
use std::process::{Child, Command, Stdio};
|
use std::process::{Child, Command, Stdio};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
|
@ -16,7 +17,10 @@ pub trait JobRun {
|
||||||
*/
|
*/
|
||||||
fn visit(&mut self) -> Result<JobRunPollResult, Box<dyn Error>>;
|
fn visit(&mut self) -> Result<JobRunPollResult, Box<dyn Error>>;
|
||||||
fn cancel(&mut self) -> Result<(), Box<dyn Error>>;
|
fn cancel(&mut self) -> Result<(), Box<dyn Error>>;
|
||||||
fn run(&mut self) -> Result<JobRunHeartbeatEventV1, Box<dyn Error>>;
|
fn run_with_env(&mut self, env: Option<HashMap<String, String>>) -> Result<JobRunHeartbeatEventV1, Box<dyn Error>>;
|
||||||
|
fn run(&mut self) -> Result<JobRunHeartbeatEventV1, Box<dyn Error>> {
|
||||||
|
self.run_with_env(None)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum JobRunConfig {
|
pub enum JobRunConfig {
|
||||||
|
|
@ -37,6 +41,8 @@ pub struct SubProcessJobRun {
|
||||||
pub running: bool,
|
pub running: bool,
|
||||||
pub entry_point: String,
|
pub entry_point: String,
|
||||||
pub args: Vec<String>,
|
pub args: Vec<String>,
|
||||||
|
pub stdout_buffer: Vec<String>, // Buffered stdout lines
|
||||||
|
pub exit_code: Option<i32>, // Cached exit code
|
||||||
}
|
}
|
||||||
|
|
||||||
impl JobRun for SubProcessJobRun {
|
impl JobRun for SubProcessJobRun {
|
||||||
|
|
@ -45,20 +51,91 @@ impl JobRun for SubProcessJobRun {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn visit(&mut self) -> Result<JobRunPollResult, Box<dyn Error>> {
|
fn visit(&mut self) -> Result<JobRunPollResult, Box<dyn Error>> {
|
||||||
todo!()
|
let mut new_events = Vec::new();
|
||||||
|
|
||||||
|
// visit() should only be called on running jobs
|
||||||
|
if !self.running {
|
||||||
|
return Err("visit() called on non-running job".into());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process must be present if running
|
||||||
|
let child = self.process.as_mut()
|
||||||
|
.ok_or("visit() called but no process present")?;
|
||||||
|
|
||||||
|
// Non-blocking check for exit status
|
||||||
|
if let Some(exit_status) = child.try_wait()? {
|
||||||
|
self.running = false;
|
||||||
|
|
||||||
|
// Read any remaining stdout
|
||||||
|
if let Some(stdout) = child.stdout.take() {
|
||||||
|
let reader = BufReader::new(stdout);
|
||||||
|
for line in reader.lines() {
|
||||||
|
if let Ok(line) = line {
|
||||||
|
self.stdout_buffer.push(line);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check exit status
|
||||||
|
match exit_status.code() {
|
||||||
|
Some(0) => {
|
||||||
|
// Success case
|
||||||
|
self.exit_code = Some(0);
|
||||||
|
new_events.push(Event::JobRunSuccessV1(JobRunSuccessEventV1 {
|
||||||
|
job_run_id: self.job_run_id.to_string(),
|
||||||
|
}));
|
||||||
|
return Ok(JobRunPollResult {
|
||||||
|
new_events,
|
||||||
|
status: JobRunStatusCode::JobRunSucceeded.into(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Some(code) => {
|
||||||
|
// Failed with exit code
|
||||||
|
self.exit_code = Some(code);
|
||||||
|
new_events.push(Event::JobRunFailureV1(JobRunFailureEventV1 {
|
||||||
|
job_run_id: self.job_run_id.to_string(),
|
||||||
|
reason: format!("Job failed with exit code {}", code),
|
||||||
|
}));
|
||||||
|
return Ok(JobRunPollResult {
|
||||||
|
new_events,
|
||||||
|
status: JobRunStatusCode::JobRunFailed.into(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// Terminated by signal (Unix) - treat as failure
|
||||||
|
new_events.push(Event::JobRunFailureV1(JobRunFailureEventV1 {
|
||||||
|
job_run_id: self.job_run_id.to_string(),
|
||||||
|
reason: format!("Job terminated by signal: {}", exit_status),
|
||||||
|
}));
|
||||||
|
return Ok(JobRunPollResult {
|
||||||
|
new_events,
|
||||||
|
status: JobRunStatusCode::JobRunFailed.into(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Still running
|
||||||
|
Ok(JobRunPollResult {
|
||||||
|
new_events,
|
||||||
|
status: JobRunStatusCode::JobRunRunning.into(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn cancel(&mut self) -> Result<(), Box<dyn Error>> {
|
fn cancel(&mut self) -> Result<(), Box<dyn Error>> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run(&mut self) -> Result<JobRunHeartbeatEventV1, Box<dyn Error>> {
|
/// Mostly for test purposes
|
||||||
|
fn run_with_env(&mut self, env: Option<HashMap<String, String>>) -> Result<JobRunHeartbeatEventV1, Box<dyn Error>> {
|
||||||
self.process = Some(Command::new(self.entry_point.clone())
|
self.process = Some(Command::new(self.entry_point.clone())
|
||||||
.args(self.args.clone())
|
.args(self.args.clone())
|
||||||
.stdout(Stdio::piped())
|
.stdout(Stdio::piped())
|
||||||
.stderr(Stdio::piped())
|
.stderr(Stdio::piped())
|
||||||
|
.envs(env.unwrap_or_default())
|
||||||
.spawn()?);
|
.spawn()?);
|
||||||
self.running = true;
|
self.running = true;
|
||||||
|
// TODO should this return the event now? Or enqueue it? No sense in waiting I suppose, and orchestrator should just handle it?
|
||||||
Ok(JobRunHeartbeatEventV1 { job_run_id: self.id().to_string() })
|
Ok(JobRunHeartbeatEventV1 { job_run_id: self.id().to_string() })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -72,6 +149,8 @@ impl SubProcessJobRun {
|
||||||
running: false,
|
running: false,
|
||||||
entry_point,
|
entry_point,
|
||||||
args,
|
args,
|
||||||
|
stdout_buffer: Vec::new(),
|
||||||
|
exit_code: None,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -82,24 +161,22 @@ pub struct JobRunPollResult {
|
||||||
}
|
}
|
||||||
|
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use crate::data_build_event::Event;
|
||||||
|
use crate::job_run::SubProcessJobRun;
|
||||||
use crate::JobRunStatusCode;
|
use crate::JobRunStatusCode;
|
||||||
|
|
||||||
|
fn test_helper_path() -> String {
|
||||||
|
std::env::var("TEST_SRCDIR")
|
||||||
|
.map(|srcdir| format!("{}/_main/databuild/test/test_job_helper", srcdir))
|
||||||
|
.unwrap_or_else(|_| "bazel-bin/databuild/test/test_job_helper".to_string())
|
||||||
|
}
|
||||||
|
|
||||||
/// Happy path - run that succeeds should emit a JobRunSuccessEventV1
|
/// Happy path - run that succeeds should emit a JobRunSuccessEventV1
|
||||||
#[test]
|
#[test]
|
||||||
#[ignore]
|
|
||||||
fn test_job_run_success_returns_job_run_success_event() {
|
fn test_job_run_success_returns_job_run_success_event() {
|
||||||
use super::*;
|
|
||||||
use crate::data_build_event::Event;
|
|
||||||
|
|
||||||
// Get path to test helper binary from Bazel runfiles
|
|
||||||
let test_helper = std::env::var("TEST_SRCDIR")
|
|
||||||
.map(|srcdir| format!("{}/_main/databuild/test/test_job_helper", srcdir))
|
|
||||||
.unwrap_or_else(|_| "bazel-bin/databuild/test/test_job_helper".to_string());
|
|
||||||
|
|
||||||
println!("test_helper: {}", test_helper);
|
|
||||||
|
|
||||||
// Spawn a job run that will succeed (exit code 0)
|
// Spawn a job run that will succeed (exit code 0)
|
||||||
let mut job_run = SubProcessJobRun::spawn(test_helper, vec![]).unwrap();
|
let mut job_run = SubProcessJobRun::spawn(test_helper_path(), vec![]).unwrap();
|
||||||
|
|
||||||
// Start the job
|
// Start the job
|
||||||
job_run.run().unwrap();
|
job_run.run().unwrap();
|
||||||
|
|
@ -140,9 +217,44 @@ mod tests {
|
||||||
|
|
||||||
/// Job run that fails should emit a JobRunFailureEventV1
|
/// Job run that fails should emit a JobRunFailureEventV1
|
||||||
#[test]
|
#[test]
|
||||||
#[ignore]
|
|
||||||
fn test_job_run_failure_returns_job_run_failure_event() {
|
fn test_job_run_failure_returns_job_run_failure_event() {
|
||||||
todo!()
|
// Spawn a job run
|
||||||
|
let mut job_run = SubProcessJobRun::spawn(test_helper_path(), vec![]).unwrap();
|
||||||
|
|
||||||
|
// Start the job with an exit code that indicates failure (non-zero)
|
||||||
|
let env: HashMap<String, String> = HashMap::from([
|
||||||
|
("DATABUILD_TEST_EXIT_CODE".to_string(), "1".to_string())
|
||||||
|
]);
|
||||||
|
job_run.run_with_env(Some(env)).unwrap();
|
||||||
|
|
||||||
|
|
||||||
|
// Poll until we get completion
|
||||||
|
loop {
|
||||||
|
let result = job_run.visit().unwrap();
|
||||||
|
|
||||||
|
// Check if we got a success event
|
||||||
|
if result.new_events.iter().any(|event| {
|
||||||
|
matches!(event, Event::JobRunSuccessV1(_))
|
||||||
|
}) {
|
||||||
|
panic!("Job succeeded unexpectedly");
|
||||||
|
};
|
||||||
|
|
||||||
|
if result.new_events.iter().any(|event| {
|
||||||
|
matches!(event, Event::JobRunFailureV1(_))
|
||||||
|
}) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If job is still running, sleep briefly and poll again
|
||||||
|
let expected = JobRunStatusCode::JobRunRunning as i32;
|
||||||
|
if matches!(result.status.code, expected) {
|
||||||
|
std::thread::sleep(std::time::Duration::from_millis(10));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we got here, job failed when it shouldn't have
|
||||||
|
panic!("Job failed unexpectedly: {:?}", result.status);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Canceling an event before it completes should result in it:
|
/// Canceling an event before it completes should result in it:
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue