add job run basics

This commit is contained in:
Stuart Axelbrooke 2025-10-13 19:38:12 -07:00
parent bc61d8f530
commit 873f766aa0
4 changed files with 64 additions and 167 deletions

View file

@ -188,9 +188,23 @@ message TaintDetail {
// TODO
}
message JobRunDetail {
// TODO
message JobRunStatus {
JobRunStatusCode code = 1;
string name = 2;
}
enum JobRunStatusCode {
JobRunQueued = 0;
JobRunRunning = 1;
JobRunFailed = 2;
JobRunSucceeded = 3;
}
message JobRunDetail {
string id = 1;
JobRunStatus status = 2;
optional uint64 last_heartbeat_at = 3;
repeated WantAttributedPartitions servicing_wants = 4;
}
message EventFilter {

View file

@ -1,4 +1,4 @@
use crate::job_run::JobRun;
use crate::job_run::{spawn_job_run, JobRun, JobRunConfig};
use crate::{PartitionRef, WantDetail};
use regex::Regex;
use std::error::Error;
@ -7,16 +7,17 @@ use std::error::Error;
pub struct JobConfiguration {
pub label: String,
pub pattern: String,
pub entrypoint: String,
pub entry_point: String,
}
impl JobConfiguration {
/** Launch job to build the partitions specified by the provided wants. */
pub fn spawn(&self, wants: Vec<WantDetail>) -> Result<JobRun, Box<dyn Error>> {
pub fn spawn(&self, wants: Vec<WantDetail>) -> Result<Box<dyn JobRun>, Box<dyn Error>> {
let wanted_refs: Vec<PartitionRef> =
wants.iter().flat_map(|want| want.partitions.clone()).collect();
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
JobRun::spawn(self.entrypoint.clone(), args)
let config = JobRunConfig::SubProcess { entry_point: self.entry_point.clone(), args };
spawn_job_run(config)
}
pub fn matches(&self, refs: &PartitionRef) -> bool {

View file

@ -1,164 +1,62 @@
use crate::build_event_log::{BELStorage, MemoryBELStorage};
use crate::data_build_event::Event;
use crate::data_build_event::Event::{JobRunFailureV1, JobRunSuccessV1};
use crate::{DataBuildEvent, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunSuccessEventV1};
use crate::JobRunStatus;
use std::error::Error;
use std::io::{BufRead, BufReader};
use std::process::{Child, Command, ExitStatus, Stdio};
use std::io::BufRead;
use std::process::Child;
use uuid::Uuid;
/** Wrapper type that can be mocked */
trait JobRunChild {
fn exit_status(&mut self) -> Option<ExitStatus>;
fn stdout_lines(&mut self) -> Vec<String>;
// TODO log to /var/log/databuild/jobruns/$JOB_RUN_ID/, and rotate over max size (e.g. only ever use 1GB for logs)
// Leave door open to background log processor that tails job logs, but don't include in jobrun concept
pub trait JobRun {
fn id(&self) -> Uuid;
fn visit(&mut self) -> Result<JobRunPollResult, Box<dyn Error>>;
fn cancel(&mut self) -> Result<(), Box<dyn Error>>;
}
#[derive(Debug)]
struct WrappedProcessChild(Child);
pub enum JobRunConfig {
SubProcess { entry_point: String, args: Vec<String> },
}
impl JobRunChild for WrappedProcessChild {
fn exit_status(&mut self) -> Option<ExitStatus> {
self.0.try_wait().expect("coudn't wait")
}
fn stdout_lines(&mut self) -> Vec<String> {
let mut stdout_lines = Vec::new();
let stdout = self.0.stdout.take().expect("stdout not piped");
let reader = BufReader::new(stdout);
for line in reader.lines() {
stdout_lines.push(line.expect("stdout not piped"));
}
stdout_lines
pub fn spawn_job_run(config: JobRunConfig) -> Result<Box<dyn JobRun>, Box<dyn Error>> {
match config {
JobRunConfig::SubProcess { entry_point, args } => Ok(SubProcessJobRun::spawn(entry_point, args)?),
_ => Err("No impl for this job config type".into()),
}
}
impl From<Child> for WrappedProcessChild {
fn from(child: Child) -> Self {
Self { 0: child }
}
pub struct SubProcessJobRun {
pub job_run_id: Uuid,
pub process: Child,
pub storage: MemoryBELStorage,
}
pub struct JobRun {
job_run_id: Uuid,
events: MemoryBELStorage,
child: Box<dyn JobRunChild>,
unhandled_lines: Vec<String>,
}
const EVENT_SIZE_LIMIT: u64 = 1000000;
impl JobRun {
pub fn spawn(command: String, args: Vec<String>) -> Result<JobRun, Box<dyn Error>> {
Ok(JobRun {
job_run_id: Default::default(),
events: Default::default(),
child: Box::new(WrappedProcessChild::from(
Command::new(command)
.args(args)
.stdout(Stdio::piped())
.spawn()?,
)),
unhandled_lines: Default::default(),
})
impl JobRun for SubProcessJobRun {
fn id(&self) -> Uuid {
self.job_run_id
}
pub fn visit(&mut self, since_idx: u64) -> Result<Vec<DataBuildEvent>, Box<dyn Error>> {
// Collect new lines from child process
// Parse BEL events from child process
let new_events = Self::process_lines(self.job_run_id, &self.unhandled_lines);
for event in new_events {
self.events.append_event(&event)?;
}
self.unhandled_lines.drain(..);
// Potentially react to job completion
match self.exit_status() {
None => {} // No exit -> no harm
Some(status) => {
if status.success() {
self.events
.append_event(&JobRunSuccessV1(JobRunSuccessEventV1 {
job_run_id: self.job_run_id.into(),
}))?;
} else {
self.events
.append_event(&JobRunFailureV1(JobRunFailureEventV1 {
job_run_id: self.job_run_id.into(),
}))?;
}
}
}
// Return BEL events since provided idx
self.events
.list_events(since_idx, EVENT_SIZE_LIMIT)
.and_then(|events| {
if events.len() as u64 == EVENT_SIZE_LIMIT {
Err(format!(
"Returned {} events - that's way too many.",
EVENT_SIZE_LIMIT
)
.into())
} else {
Ok(events)
}
})
}
pub fn cancel(&mut self) {
fn visit(&mut self) -> Result<JobRunPollResult, Box<dyn Error>> {
todo!()
}
pub fn event_for_line(line: String) -> Option<Event> {
// TODO parse missing data dep event
// TODO parse job state
None
fn cancel(&mut self) -> Result<(), Box<dyn Error>> {
todo!()
}
}
pub fn process_lines(job_run_id: Uuid, lines: &Vec<String>) -> Vec<Event> {
let mut events: Vec<Event> = Default::default();
if lines.len() > 0 {
// If any lines were written to stdout, we should heartbeat
events.push(Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 {
job_run_id: job_run_id.clone().into(),
}));
}
for event in lines
.iter()
.flat_map(|line| Self::event_for_line(line.clone()))
{
events.push(event);
}
events
impl SubProcessJobRun {
pub fn spawn(entry_point: String, args: Vec<String>) -> Result<Box<dyn JobRun>, Box<dyn Error>> {
todo!()
}
}
pub fn exit_status(&mut self) -> Option<ExitStatus> {
self.child.exit_status()
}
pub struct JobRunPollResult {
pub new_events: Vec<Event>, // Parsed BEL events, not raw lines
pub status: JobRunStatus,
}
mod tests {
use crate::job_run::JobRun;
#[test]
fn test_process_lines_empty() {
let lines = Vec::<String>::new();
let events = JobRun::process_lines(Default::default(), &lines);
assert_eq!(events.len(), 0);
}
#[test]
fn test_process_lines_heartbeat() {
let lines_1 = vec!["Hello, salem".to_string()];
let events_1 = JobRun::process_lines(Default::default(), &lines_1);
assert_eq!(events_1.len(), 1);
let lines_2 = vec!["Hello, salem".to_string(), "Hello, pippin".to_string()];
let events_2 = JobRun::process_lines(Default::default(), &lines_2);
assert_eq!(events_2.len(), 1);
}
}

View file

@ -1,7 +1,7 @@
use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage};
use crate::job::JobConfiguration;
use crate::job_run::JobRun;
use crate::{PartitionRef, WantDetail};
use crate::{JobRunStatusCode, PartitionRef, WantDetail};
use std::collections::HashMap;
use std::error::Error;
use std::fmt::Debug;
@ -13,7 +13,7 @@ the visitor pattern to monitor job exec progress and liveness, and adds
struct Orchestrator<S: BELStorage + Debug> {
bel: BuildEventLog<S>,
job_runs: Vec<JobRunHandle>,
job_runs: Vec<Box<dyn JobRun>>,
config: OrchestratorConfig,
}
@ -63,20 +63,6 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
}
}
struct JobRunHandle {
job_run: JobRun,
bel_idx: u64,
}
impl From<JobRun> for JobRunHandle {
fn from(job_run: JobRun) -> Self {
Self {
job_run,
bel_idx: 0,
}
}
}
#[derive(Debug, Clone)]
struct OrchestratorConfig {
jobs: Vec<JobConfiguration>,
@ -134,13 +120,11 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
// Visit existing jobs, remove completed
self.job_runs.retain_mut(|jr| {
// Append emitted events
let events = jr
.job_run
.visit(jr.bel_idx.clone())
let result = jr
.visit()
.expect("Job visit failed");
events
result.new_events
.iter()
.filter_map(|event| event.event.clone())
.for_each(|event| {
self.bel
.append_event(&event)
@ -148,7 +132,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
});
// Retain job run if it doesn't yet have an exit code (still running)
jr.job_run.exit_status().is_none()
result.status.code == JobRunStatusCode::JobRunRunning as i32
});
Ok(())
@ -179,7 +163,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
} else {
for wg in grouped_wants.want_groups {
let job_run = wg.job.spawn(wg.wants)?;
self.job_runs.push(JobRunHandle::from(job_run));
self.job_runs.push(job_run);
}
Ok(())
@ -341,7 +325,7 @@ mod tests {
JobConfiguration {
label: label.to_string(),
pattern: pattern.to_string(),
entrypoint: "test_entrypoint".to_string(),
entry_point: "test_entrypoint".to_string(),
}
}