From 873f766aa0ecbc524b7d0ac3eafa04b869585911 Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Mon, 13 Oct 2025 19:38:12 -0700 Subject: [PATCH] add job run basics --- databuild/databuild.proto | 18 +++- databuild/job.rs | 9 +- databuild/job_run.rs | 172 ++++++++------------------------------ databuild/orchestrator.rs | 32 ++----- 4 files changed, 64 insertions(+), 167 deletions(-) diff --git a/databuild/databuild.proto b/databuild/databuild.proto index 797dc87..8dcba81 100644 --- a/databuild/databuild.proto +++ b/databuild/databuild.proto @@ -188,9 +188,23 @@ message TaintDetail { // TODO } -message JobRunDetail { - // TODO +message JobRunStatus { + JobRunStatusCode code = 1; + string name = 2; } +enum JobRunStatusCode { + JobRunQueued = 0; + JobRunRunning = 1; + JobRunFailed = 2; + JobRunSucceeded = 3; +} +message JobRunDetail { + string id = 1; + JobRunStatus status = 2; + optional uint64 last_heartbeat_at = 3; + repeated WantAttributedPartitions servicing_wants = 4; +} + message EventFilter { diff --git a/databuild/job.rs b/databuild/job.rs index cfcfc38..e79c75a 100644 --- a/databuild/job.rs +++ b/databuild/job.rs @@ -1,4 +1,4 @@ -use crate::job_run::JobRun; +use crate::job_run::{spawn_job_run, JobRun, JobRunConfig}; use crate::{PartitionRef, WantDetail}; use regex::Regex; use std::error::Error; @@ -7,16 +7,17 @@ use std::error::Error; pub struct JobConfiguration { pub label: String, pub pattern: String, - pub entrypoint: String, + pub entry_point: String, } impl JobConfiguration { /** Launch job to build the partitions specified by the provided wants. */ - pub fn spawn(&self, wants: Vec) -> Result> { + pub fn spawn(&self, wants: Vec) -> Result, Box> { let wanted_refs: Vec = wants.iter().flat_map(|want| want.partitions.clone()).collect(); let args: Vec = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect(); - JobRun::spawn(self.entrypoint.clone(), args) + let config = JobRunConfig::SubProcess { entry_point: self.entry_point.clone(), args }; + spawn_job_run(config) } pub fn matches(&self, refs: &PartitionRef) -> bool { diff --git a/databuild/job_run.rs b/databuild/job_run.rs index 41c4d0c..88c9c43 100644 --- a/databuild/job_run.rs +++ b/databuild/job_run.rs @@ -1,164 +1,62 @@ use crate::build_event_log::{BELStorage, MemoryBELStorage}; use crate::data_build_event::Event; -use crate::data_build_event::Event::{JobRunFailureV1, JobRunSuccessV1}; -use crate::{DataBuildEvent, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunSuccessEventV1}; +use crate::JobRunStatus; use std::error::Error; -use std::io::{BufRead, BufReader}; -use std::process::{Child, Command, ExitStatus, Stdio}; +use std::io::BufRead; +use std::process::Child; use uuid::Uuid; -/** Wrapper type that can be mocked */ -trait JobRunChild { - fn exit_status(&mut self) -> Option; - fn stdout_lines(&mut self) -> Vec; +// TODO log to /var/log/databuild/jobruns/$JOB_RUN_ID/, and rotate over max size (e.g. only ever use 1GB for logs) +// Leave door open to background log processor that tails job logs, but don't include in jobrun concept + +pub trait JobRun { + fn id(&self) -> Uuid; + fn visit(&mut self) -> Result>; + fn cancel(&mut self) -> Result<(), Box>; } -#[derive(Debug)] -struct WrappedProcessChild(Child); +pub enum JobRunConfig { + SubProcess { entry_point: String, args: Vec }, +} -impl JobRunChild for WrappedProcessChild { - fn exit_status(&mut self) -> Option { - self.0.try_wait().expect("coudn't wait") - } - - fn stdout_lines(&mut self) -> Vec { - let mut stdout_lines = Vec::new(); - let stdout = self.0.stdout.take().expect("stdout not piped"); - let reader = BufReader::new(stdout); - for line in reader.lines() { - stdout_lines.push(line.expect("stdout not piped")); - } - stdout_lines +pub fn spawn_job_run(config: JobRunConfig) -> Result, Box> { + match config { + JobRunConfig::SubProcess { entry_point, args } => Ok(SubProcessJobRun::spawn(entry_point, args)?), + _ => Err("No impl for this job config type".into()), } } -impl From for WrappedProcessChild { - fn from(child: Child) -> Self { - Self { 0: child } - } +pub struct SubProcessJobRun { + pub job_run_id: Uuid, + pub process: Child, + pub storage: MemoryBELStorage, } -pub struct JobRun { - job_run_id: Uuid, - events: MemoryBELStorage, - child: Box, - unhandled_lines: Vec, -} - -const EVENT_SIZE_LIMIT: u64 = 1000000; - -impl JobRun { - pub fn spawn(command: String, args: Vec) -> Result> { - Ok(JobRun { - job_run_id: Default::default(), - events: Default::default(), - child: Box::new(WrappedProcessChild::from( - Command::new(command) - .args(args) - .stdout(Stdio::piped()) - .spawn()?, - )), - unhandled_lines: Default::default(), - }) +impl JobRun for SubProcessJobRun { + fn id(&self) -> Uuid { + self.job_run_id } - pub fn visit(&mut self, since_idx: u64) -> Result, Box> { - // Collect new lines from child process - - // Parse BEL events from child process - let new_events = Self::process_lines(self.job_run_id, &self.unhandled_lines); - for event in new_events { - self.events.append_event(&event)?; - } - self.unhandled_lines.drain(..); - - // Potentially react to job completion - match self.exit_status() { - None => {} // No exit -> no harm - Some(status) => { - if status.success() { - self.events - .append_event(&JobRunSuccessV1(JobRunSuccessEventV1 { - job_run_id: self.job_run_id.into(), - }))?; - } else { - self.events - .append_event(&JobRunFailureV1(JobRunFailureEventV1 { - job_run_id: self.job_run_id.into(), - }))?; - } - } - } - - // Return BEL events since provided idx - self.events - .list_events(since_idx, EVENT_SIZE_LIMIT) - .and_then(|events| { - if events.len() as u64 == EVENT_SIZE_LIMIT { - Err(format!( - "Returned {} events - that's way too many.", - EVENT_SIZE_LIMIT - ) - .into()) - } else { - Ok(events) - } - }) - } - - pub fn cancel(&mut self) { + fn visit(&mut self) -> Result> { todo!() } - pub fn event_for_line(line: String) -> Option { - // TODO parse missing data dep event - // TODO parse job state - None + fn cancel(&mut self) -> Result<(), Box> { + todo!() } +} - pub fn process_lines(job_run_id: Uuid, lines: &Vec) -> Vec { - let mut events: Vec = Default::default(); - - if lines.len() > 0 { - // If any lines were written to stdout, we should heartbeat - events.push(Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 { - job_run_id: job_run_id.clone().into(), - })); - } - - for event in lines - .iter() - .flat_map(|line| Self::event_for_line(line.clone())) - { - events.push(event); - } - - events +impl SubProcessJobRun { + pub fn spawn(entry_point: String, args: Vec) -> Result, Box> { + todo!() } +} - pub fn exit_status(&mut self) -> Option { - self.child.exit_status() - } +pub struct JobRunPollResult { + pub new_events: Vec, // Parsed BEL events, not raw lines + pub status: JobRunStatus, } mod tests { - use crate::job_run::JobRun; - #[test] - fn test_process_lines_empty() { - let lines = Vec::::new(); - let events = JobRun::process_lines(Default::default(), &lines); - assert_eq!(events.len(), 0); - } - - #[test] - fn test_process_lines_heartbeat() { - let lines_1 = vec!["Hello, salem".to_string()]; - let events_1 = JobRun::process_lines(Default::default(), &lines_1); - assert_eq!(events_1.len(), 1); - - let lines_2 = vec!["Hello, salem".to_string(), "Hello, pippin".to_string()]; - let events_2 = JobRun::process_lines(Default::default(), &lines_2); - assert_eq!(events_2.len(), 1); - } } diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index 943bc6e..32b9602 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -1,7 +1,7 @@ use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage}; use crate::job::JobConfiguration; use crate::job_run::JobRun; -use crate::{PartitionRef, WantDetail}; +use crate::{JobRunStatusCode, PartitionRef, WantDetail}; use std::collections::HashMap; use std::error::Error; use std::fmt::Debug; @@ -13,7 +13,7 @@ the visitor pattern to monitor job exec progress and liveness, and adds struct Orchestrator { bel: BuildEventLog, - job_runs: Vec, + job_runs: Vec>, config: OrchestratorConfig, } @@ -63,20 +63,6 @@ impl Orchestrator { } } -struct JobRunHandle { - job_run: JobRun, - bel_idx: u64, -} - -impl From for JobRunHandle { - fn from(job_run: JobRun) -> Self { - Self { - job_run, - bel_idx: 0, - } - } -} - #[derive(Debug, Clone)] struct OrchestratorConfig { jobs: Vec, @@ -134,13 +120,11 @@ impl Orchestrator { // Visit existing jobs, remove completed self.job_runs.retain_mut(|jr| { // Append emitted events - let events = jr - .job_run - .visit(jr.bel_idx.clone()) + let result = jr + .visit() .expect("Job visit failed"); - events + result.new_events .iter() - .filter_map(|event| event.event.clone()) .for_each(|event| { self.bel .append_event(&event) @@ -148,7 +132,7 @@ impl Orchestrator { }); // Retain job run if it doesn't yet have an exit code (still running) - jr.job_run.exit_status().is_none() + result.status.code == JobRunStatusCode::JobRunRunning as i32 }); Ok(()) @@ -179,7 +163,7 @@ impl Orchestrator { } else { for wg in grouped_wants.want_groups { let job_run = wg.job.spawn(wg.wants)?; - self.job_runs.push(JobRunHandle::from(job_run)); + self.job_runs.push(job_run); } Ok(()) @@ -341,7 +325,7 @@ mod tests { JobConfiguration { label: label.to_string(), pattern: pattern.to_string(), - entrypoint: "test_entrypoint".to_string(), + entry_point: "test_entrypoint".to_string(), } }