refactor top level job run interface to represent job states as states

This commit is contained in:
Stuart Axelbrooke 2025-10-16 18:47:23 -07:00
parent 6572d4e3bd
commit eeb90d0386
3 changed files with 368 additions and 278 deletions

View file

@ -1,7 +1,6 @@
use crate::job_run::{spawn_job_run, JobRun, JobRunConfig}; use crate::job_run::{NotStartedJobRun, SubProcessBackend};
use crate::{PartitionRef, WantDetail}; use crate::{PartitionRef, WantDetail};
use regex::Regex; use regex::Regex;
use std::error::Error;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct JobConfiguration { pub struct JobConfiguration {
@ -12,12 +11,11 @@ pub struct JobConfiguration {
impl JobConfiguration { impl JobConfiguration {
/** Launch job to build the partitions specified by the provided wants. */ /** Launch job to build the partitions specified by the provided wants. */
pub fn spawn(&self, wants: Vec<WantDetail>) -> Result<Box<dyn JobRun>, Box<dyn Error>> { pub fn spawn(&self, wants: Vec<WantDetail>) -> Result<NotStartedJobRun<SubProcessBackend>, std::io::Error> {
let wanted_refs: Vec<PartitionRef> = let wanted_refs: Vec<PartitionRef> =
wants.iter().flat_map(|want| want.partitions.clone()).collect(); wants.iter().flat_map(|want| want.partitions.clone()).collect();
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect(); let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
let config = JobRunConfig::SubProcess { entry_point: self.entry_point.clone(), args }; Ok(NotStartedJobRun::spawn(self.entry_point.clone(), args))
spawn_job_run(config)
} }
pub fn matches(&self, refs: &PartitionRef) -> bool { pub fn matches(&self, refs: &PartitionRef) -> bool {

View file

@ -1,4 +1,5 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::marker::PhantomData;
use crate::data_build_event::Event; use crate::data_build_event::Event;
use crate::{JobRunHeartbeatEventV1, JobRunStatus, JobRunStatusCode, JobRunSuccessEventV1, JobRunFailureEventV1, JobRunCancelEventV1, EventSource}; use crate::{JobRunHeartbeatEventV1, JobRunStatus, JobRunStatusCode, JobRunSuccessEventV1, JobRunFailureEventV1, JobRunCancelEventV1, EventSource};
use std::error::Error; use std::error::Error;
@ -9,227 +10,309 @@ use uuid::Uuid;
// TODO log to /var/log/databuild/jobruns/$JOB_RUN_ID/, and rotate over max size (e.g. only ever use 1GB for logs) // TODO log to /var/log/databuild/jobruns/$JOB_RUN_ID/, and rotate over max size (e.g. only ever use 1GB for logs)
// Leave door open to background log processor that tails job logs, but don't include in jobrun concept // Leave door open to background log processor that tails job logs, but don't include in jobrun concept
pub trait JobRun { /// Backend trait that defines the state types and transition logic for different job run implementations
fn id(&self) -> Uuid; pub trait JobRunBackend: Sized {
/** type NotStartedState;
Visit is responsible for observing the state of the job run, type RunningState;
*/ type CompletedState;
fn visit(&mut self) -> Result<JobRunPollResult, Box<dyn Error>>; type FailedState;
fn cancel(&mut self, source: EventSource) -> Result<JobRunCancelEventV1, Box<dyn Error>>; type CanceledState;
fn run_with_env(&mut self, env: Option<HashMap<String, String>>) -> Result<JobRunHeartbeatEventV1, Box<dyn Error>>;
fn run(&mut self) -> Result<JobRunHeartbeatEventV1, Box<dyn Error>> { /// Create a new not-started job run
fn create(entry_point: String, args: Vec<String>) -> Self::NotStartedState;
/// Transition from NotStarted to Running
fn start(
not_started: Self::NotStartedState,
env: Option<HashMap<String, String>>
) -> Result<Self::RunningState, Box<dyn Error>>;
/// Poll a running job for state changes
fn poll(
running: &mut Self::RunningState
) -> Result<PollResult<Self::CompletedState, Self::FailedState>, Box<dyn Error>>;
/// Cancel a running job
fn cancel_job(
running: Self::RunningState,
source: EventSource
) -> Result<Self::CanceledState, Box<dyn Error>>;
}
/// Result of polling a running job
pub enum PollResult<C, F> {
StillRunning,
Completed(C),
Failed(F),
}
/// Generic JobRun that works with any backend, parameterized by state
pub struct JobRun<B: JobRunBackend, S> {
pub job_run_id: Uuid,
pub state: S,
_backend: PhantomData<B>,
}
/// Type aliases for specific states
pub type NotStartedJobRun<B> = JobRun<B, <B as JobRunBackend>::NotStartedState>;
pub type RunningJobRun<B> = JobRun<B, <B as JobRunBackend>::RunningState>;
pub type CompletedJobRun<B> = JobRun<B, <B as JobRunBackend>::CompletedState>;
pub type FailedJobRun<B> = JobRun<B, <B as JobRunBackend>::FailedState>;
pub type CanceledJobRun<B> = JobRun<B, <B as JobRunBackend>::CanceledState>;
// Methods available on all JobRun states
impl<B: JobRunBackend, S> JobRun<B, S> {
pub fn id(&self) -> Uuid {
self.job_run_id
}
}
// Methods available only on NotStarted state
impl<B: JobRunBackend> NotStartedJobRun<B> {
pub fn spawn(entry_point: String, args: Vec<String>) -> Self {
JobRun {
job_run_id: Uuid::new_v4(),
state: B::create(entry_point, args),
_backend: PhantomData,
}
}
pub fn run(self) -> Result<RunningJobRun<B>, Box<dyn Error>> {
self.run_with_env(None) self.run_with_env(None)
} }
pub fn run_with_env(
self,
env: Option<HashMap<String, String>>
) -> Result<RunningJobRun<B>, Box<dyn Error>> {
let running_state = B::start(self.state, env)?;
Ok(JobRun {
job_run_id: self.job_run_id,
state: running_state,
_backend: PhantomData,
})
}
}
// Methods available only on Running state
impl<B: JobRunBackend> RunningJobRun<B> {
pub fn visit(&mut self) -> Result<JobRunVisitResult<B>, Box<dyn Error>> {
match B::poll(&mut self.state)? {
PollResult::StillRunning => Ok(JobRunVisitResult::StillRunning),
PollResult::Completed(completed_state) => {
let job_run_id = self.job_run_id;
Ok(JobRunVisitResult::Completed(JobRun {
job_run_id,
state: completed_state,
_backend: PhantomData,
}))
}
PollResult::Failed(failed_state) => {
let job_run_id = self.job_run_id;
Ok(JobRunVisitResult::Failed(JobRun {
job_run_id,
state: failed_state,
_backend: PhantomData,
}))
}
}
}
pub fn cancel(self, source: EventSource) -> Result<CanceledJobRun<B>, Box<dyn Error>> {
let canceled_state = B::cancel_job(self.state, source)?;
Ok(JobRun {
job_run_id: self.job_run_id,
state: canceled_state,
_backend: PhantomData,
})
}
}
/// Result of visiting a running job
pub enum JobRunVisitResult<B: JobRunBackend> {
StillRunning,
Completed(CompletedJobRun<B>),
Failed(FailedJobRun<B>),
} }
pub enum JobRunConfig { pub enum JobRunConfig {
SubProcess { entry_point: String, args: Vec<String> }, SubProcess { entry_point: String, args: Vec<String> },
} }
pub fn spawn_job_run(config: JobRunConfig) -> Result<Box<dyn JobRun>, Box<dyn Error>> { // ===== SubProcess Backend Implementation =====
match config {
JobRunConfig::SubProcess { entry_point, args } => Ok(SubProcessJobRun::spawn(entry_point, args)?),
_ => Err("No impl for this job config type".into()),
}
}
pub struct SubProcessJobRun { /// SubProcess backend for running jobs as local subprocesses
pub job_run_id: Uuid, pub struct SubProcessBackend;
/// NotStarted state for SubProcess backend
pub struct SubProcessNotStarted {
pub entry_point: String, pub entry_point: String,
pub args: Vec<String>, pub args: Vec<String>,
pub state: JobRunState,
} }
enum JobRunState { /// Running state for SubProcess backend
NotStarted, pub struct SubProcessRunning {
Running { pub process: Child,
process: Child, pub stdout_buffer: Vec<String>,
stdout_buffer: Vec<String>,
},
Completed {
exit_code: i32,
stdout_buffer: Vec<String>,
},
Failed {
exit_code: i32,
reason: String,
stdout_buffer: Vec<String>,
},
Canceled {
source: EventSource,
stdout_buffer: Vec<String>,
},
} }
impl JobRun for SubProcessJobRun { /// Completed state for SubProcess backend
fn id(&self) -> Uuid { pub struct SubProcessCompleted {
self.job_run_id pub exit_code: i32,
pub stdout_buffer: Vec<String>,
}
/// Failed state for SubProcess backend
pub struct SubProcessFailed {
pub exit_code: i32,
pub reason: String,
pub stdout_buffer: Vec<String>,
}
/// Canceled state for SubProcess backend
pub struct SubProcessCanceled {
pub source: EventSource,
pub stdout_buffer: Vec<String>,
}
impl JobRunBackend for SubProcessBackend {
type NotStartedState = SubProcessNotStarted;
type RunningState = SubProcessRunning;
type CompletedState = SubProcessCompleted;
type FailedState = SubProcessFailed;
type CanceledState = SubProcessCanceled;
fn create(entry_point: String, args: Vec<String>) -> Self::NotStartedState {
SubProcessNotStarted { entry_point, args }
} }
fn visit(&mut self) -> Result<JobRunPollResult, Box<dyn Error>> { fn start(
let mut new_events = Vec::new(); not_started: Self::NotStartedState,
env: Option<HashMap<String, String>>
) -> Result<Self::RunningState, Box<dyn Error>> {
let process = Command::new(not_started.entry_point)
.args(not_started.args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.envs(env.unwrap_or_default())
.spawn()?;
match &mut self.state { Ok(SubProcessRunning {
JobRunState::Running { process, stdout_buffer } => { process,
// Non-blocking check for exit status stdout_buffer: Vec::new(),
if let Some(exit_status) = process.try_wait()? { })
// Read any remaining stdout }
if let Some(stdout) = process.stdout.take() {
let reader = BufReader::new(stdout);
for line in reader.lines() {
// TODO we should write lines to the job's file logs
if let Ok(line) = line {
stdout_buffer.push(line);
}
}
}
// Take ownership of the current state to transition fn poll(
let old_state = std::mem::replace(&mut self.state, JobRunState::NotStarted); running: &mut Self::RunningState
let stdout_buf = if let JobRunState::Running { stdout_buffer, .. } = old_state { ) -> Result<PollResult<Self::CompletedState, Self::FailedState>, Box<dyn Error>> {
stdout_buffer // Non-blocking check for exit status
} else { if let Some(exit_status) = running.process.try_wait()? {
Vec::new() // Read any remaining stdout
}; if let Some(stdout) = running.process.stdout.take() {
let reader = BufReader::new(stdout);
// Check exit status and transition to terminal state for line in reader.lines() {
match exit_status.code() { // TODO we should write lines to the job's file logs
Some(0) => { if let Ok(line) = line {
// Success case running.stdout_buffer.push(line);
self.state = JobRunState::Completed {
exit_code: 0,
stdout_buffer: stdout_buf,
};
new_events.push(Event::JobRunSuccessV1(JobRunSuccessEventV1 {
job_run_id: self.job_run_id.to_string(),
}));
return Ok(JobRunPollResult {
new_events,
status: JobRunStatusCode::JobRunSucceeded.into(),
});
}
Some(code) => {
// Failed with exit code
let reason = format!("Job failed with exit code {}", code);
self.state = JobRunState::Failed {
exit_code: code,
reason: reason.clone(),
stdout_buffer: stdout_buf,
};
new_events.push(Event::JobRunFailureV1(JobRunFailureEventV1 {
job_run_id: self.job_run_id.to_string(),
reason,
}));
return Ok(JobRunPollResult {
new_events,
status: JobRunStatusCode::JobRunFailed.into(),
});
}
None => {
// Terminated by signal (Unix) - treat as failure
let reason = format!("Job terminated by signal: {}", exit_status);
self.state = JobRunState::Failed {
exit_code: -1,
reason: reason.clone(),
stdout_buffer: stdout_buf,
};
new_events.push(Event::JobRunFailureV1(JobRunFailureEventV1 {
job_run_id: self.job_run_id.to_string(),
reason,
}));
return Ok(JobRunPollResult {
new_events,
status: JobRunStatusCode::JobRunFailed.into(),
});
}
} }
} }
// Still running
Ok(JobRunPollResult {
new_events,
status: JobRunStatusCode::JobRunRunning.into(),
})
} }
_ => Err("visit() called on non-running job".into()),
// Take ownership of stdout_buffer
let stdout_buffer = std::mem::take(&mut running.stdout_buffer);
// Check exit status and return appropriate result
match exit_status.code() {
Some(0) => {
// Success case
Ok(PollResult::Completed(SubProcessCompleted {
exit_code: 0,
stdout_buffer,
}))
}
Some(code) => {
// Failed with exit code
let reason = format!("Job failed with exit code {}", code);
Ok(PollResult::Failed(SubProcessFailed {
exit_code: code,
reason,
stdout_buffer,
}))
}
None => {
// Terminated by signal (Unix) - treat as failure
let reason = format!("Job terminated by signal: {}", exit_status);
Ok(PollResult::Failed(SubProcessFailed {
exit_code: -1,
reason,
stdout_buffer,
}))
}
}
} else {
// Still running
Ok(PollResult::StillRunning)
} }
} }
fn cancel(&mut self, source: EventSource) -> Result<JobRunCancelEventV1, Box<dyn Error>> { fn cancel_job(
match std::mem::replace(&mut self.state, JobRunState::NotStarted) { mut running: Self::RunningState,
JobRunState::Running { mut process, stdout_buffer } => { source: EventSource
// Kill the process ) -> Result<Self::CanceledState, Box<dyn Error>> {
process.kill()?; // Kill the process
running.process.kill()?;
// Wait for it to actually terminate // Wait for it to actually terminate
process.wait()?; running.process.wait()?;
// Transition to Canceled state // Return canceled state
self.state = JobRunState::Canceled { Ok(SubProcessCanceled {
source: source.clone(), source,
stdout_buffer, stdout_buffer: running.stdout_buffer,
}; })
Ok(JobRunCancelEventV1 {
job_run_id: self.job_run_id.to_string(),
source: Some(source),
comment: Some("Job was canceled".to_string()),
})
}
other_state => {
// Restore the state and error
self.state = other_state;
Err("cancel() called on non-running job".into())
}
}
} }
}
/// Mostly for test purposes // Helper functions to convert between states and events
fn run_with_env(&mut self, env: Option<HashMap<String, String>>) -> Result<JobRunHeartbeatEventV1, Box<dyn Error>> { impl SubProcessCompleted {
match &self.state { pub fn to_event(&self, job_run_id: &Uuid) -> Event {
JobRunState::NotStarted => { Event::JobRunSuccessV1(JobRunSuccessEventV1 {
let process = Command::new(self.entry_point.clone()) job_run_id: job_run_id.to_string(),
.args(self.args.clone()) })
.stdout(Stdio::piped()) }
.stderr(Stdio::piped()) }
.envs(env.unwrap_or_default())
.spawn()?;
self.state = JobRunState::Running { impl SubProcessFailed {
process, pub fn to_event(&self, job_run_id: &Uuid) -> Event {
stdout_buffer: Vec::new(), Event::JobRunFailureV1(JobRunFailureEventV1 {
}; job_run_id: job_run_id.to_string(),
reason: self.reason.clone(),
})
}
}
// TODO should this return the event now? Or enqueue it? No sense in waiting I suppose, and orchestrator should just handle it? impl SubProcessCanceled {
Ok(JobRunHeartbeatEventV1 { job_run_id: self.id().to_string() }) pub fn to_event(&self, job_run_id: &Uuid) -> JobRunCancelEventV1 {
} JobRunCancelEventV1 {
_ => Err("run() called on already-running or completed job".into()) job_run_id: job_run_id.to_string(),
source: Some(self.source.clone()),
comment: Some("Job was canceled".to_string()),
} }
} }
} }
impl SubProcessJobRun { // Old JobRunPollResult structure - kept for compatibility during migration
pub fn spawn(entry_point: String, args: Vec<String>) -> Result<Box<dyn JobRun>, Box<dyn Error>> {
Ok(Box::new(SubProcessJobRun {
job_run_id: Uuid::new_v4(),
entry_point,
args,
state: JobRunState::NotStarted,
}))
}
}
pub struct JobRunPollResult { pub struct JobRunPollResult {
pub new_events: Vec<Event>, // Parsed BEL events, not raw lines pub new_events: Vec<Event>,
pub status: JobRunStatus, pub status: JobRunStatus,
} }
mod tests { mod tests {
use std::collections::HashMap; use std::collections::HashMap;
use crate::data_build_event::Event; use crate::data_build_event::Event;
use crate::job_run::{JobRun, SubProcessJobRun}; use crate::job_run::{JobRunVisitResult, NotStartedJobRun, SubProcessBackend};
use crate::{JobRunStatusCode, ManuallyTriggeredEvent}; use crate::{ManuallyTriggeredEvent};
use uuid::Uuid;
fn test_helper_path() -> String { fn test_helper_path() -> String {
std::env::var("TEST_SRCDIR") std::env::var("TEST_SRCDIR")
@ -241,35 +324,29 @@ mod tests {
#[test] #[test]
fn test_job_run_success_returns_job_run_success_event() { fn test_job_run_success_returns_job_run_success_event() {
// Spawn a job run that will succeed (exit code 0) // Spawn a job run that will succeed (exit code 0)
let mut job_run = SubProcessJobRun::spawn(test_helper_path(), vec![]).unwrap(); let job_run: NotStartedJobRun<SubProcessBackend> = NotStartedJobRun::spawn(test_helper_path(), vec![]);
// Start the job // Start the job - this consumes the NotStarted and returns Running
job_run.run().unwrap(); let mut running_job = job_run.run().unwrap();
// Poll until we get completion // Poll until we get completion
loop { loop {
let result = job_run.visit().unwrap(); match running_job.visit().unwrap() {
JobRunVisitResult::Completed(completed) => {
// Check if we got a success event // Generate the event from the completed state
let has_success = result.new_events.iter().any(|event| { let event = completed.state.to_event(&completed.id());
matches!(event, Event::JobRunSuccessV1(_)) assert!(matches!(event, Event::JobRunSuccessV1(_)));
}); break;
}
if has_success { JobRunVisitResult::Failed(failed) => {
let expected = JobRunStatusCode::JobRunSucceeded as i32; panic!("Job failed unexpectedly: {}", failed.state.reason);
assert!(matches!(result.status.code, expected)); }
break; JobRunVisitResult::StillRunning => {
// Sleep briefly and poll again
std::thread::sleep(std::time::Duration::from_millis(10));
continue;
}
} }
// If job is still running, sleep briefly and poll again
let expected = JobRunStatusCode::JobRunRunning as i32;
if matches!(result.status.code, expected) {
std::thread::sleep(std::time::Duration::from_millis(10));
continue;
}
// If we got here, job failed when it shouldn't have
panic!("Job failed unexpectedly: {:?}", result.status);
} }
} }
@ -277,41 +354,32 @@ mod tests {
#[test] #[test]
fn test_job_run_failure_returns_job_run_failure_event() { fn test_job_run_failure_returns_job_run_failure_event() {
// Spawn a job run // Spawn a job run
let mut job_run = SubProcessJobRun::spawn(test_helper_path(), vec![]).unwrap(); let job_run: NotStartedJobRun<SubProcessBackend> = NotStartedJobRun::spawn(test_helper_path(), vec![]);
// Start the job with an exit code that indicates failure (non-zero) // Start the job with an exit code that indicates failure (non-zero)
let env: HashMap<String, String> = HashMap::from([ let env: HashMap<String, String> = HashMap::from([
("DATABUILD_TEST_EXIT_CODE".to_string(), "1".to_string()) ("DATABUILD_TEST_EXIT_CODE".to_string(), "1".to_string())
]); ]);
job_run.run_with_env(Some(env)).unwrap(); let mut running_job = job_run.run_with_env(Some(env)).unwrap();
// Poll until we get completion // Poll until we get completion
loop { loop {
let result = job_run.visit().unwrap(); match running_job.visit().unwrap() {
JobRunVisitResult::Completed(_) => {
// Check if we got a success event panic!("Job succeeded unexpectedly");
if result.new_events.iter().any(|event| { }
matches!(event, Event::JobRunSuccessV1(_)) JobRunVisitResult::Failed(failed) => {
}) { // Generate the event from the failed state
panic!("Job succeeded unexpectedly"); let event = failed.state.to_event(&failed.id());
}; assert!(matches!(event, Event::JobRunFailureV1(_)));
break;
if result.new_events.iter().any(|event| { }
matches!(event, Event::JobRunFailureV1(_)) JobRunVisitResult::StillRunning => {
}) { // Sleep briefly and poll again
break; std::thread::sleep(std::time::Duration::from_millis(10));
continue;
}
} }
// If job is still running, sleep briefly and poll again
let expected = JobRunStatusCode::JobRunRunning as i32;
if matches!(result.status.code, expected) {
std::thread::sleep(std::time::Duration::from_millis(10));
continue;
}
// If we got here, job failed when it shouldn't have
panic!("Job failed unexpectedly: {:?}", result.status);
} }
} }
@ -322,28 +390,32 @@ mod tests {
fn test_job_run_cancel_returns_job_run_cancel_event() { fn test_job_run_cancel_returns_job_run_cancel_event() {
use std::fs; use std::fs;
use crate::ManuallyTriggeredEvent; use crate::ManuallyTriggeredEvent;
use uuid::Uuid;
// Create a temp file path for the test // Create a temp file path for the test
let temp_file = format!("/tmp/databuild_test_cancel_{}", Uuid::new_v4()); let temp_file = format!("/tmp/databuild_test_cancel_{}", Uuid::new_v4());
// Spawn a job run that will sleep for 1 second and write a file // Spawn a job run that will sleep for 1 second and write a file
let mut job_run = SubProcessJobRun::spawn(test_helper_path(), vec![]).unwrap(); let job_run: NotStartedJobRun<SubProcessBackend> = NotStartedJobRun::spawn(test_helper_path(), vec![]);
let env: HashMap<String, String> = HashMap::from([ let env: HashMap<String, String> = HashMap::from([
("DATABUILD_TEST_SLEEP_MS".to_string(), "1000".to_string()), ("DATABUILD_TEST_SLEEP_MS".to_string(), "1000".to_string()),
("DATABUILD_TEST_OUTPUT_FILE".to_string(), temp_file.clone()), ("DATABUILD_TEST_OUTPUT_FILE".to_string(), temp_file.clone()),
("DATABUILD_TEST_FILE_CONTENT".to_string(), "completed".to_string()), ("DATABUILD_TEST_FILE_CONTENT".to_string(), "completed".to_string()),
]); ]);
job_run.run_with_env(Some(env)).unwrap(); let running_job = job_run.run_with_env(Some(env)).unwrap();
// Give it a tiny bit of time to start // Give it a tiny bit of time to start
std::thread::sleep(std::time::Duration::from_millis(10)); std::thread::sleep(std::time::Duration::from_millis(10));
// Cancel the job before it can complete - this returns the cancel event // Cancel the job before it can complete - this consumes the running job and returns canceled
let cancel_event = job_run.cancel(ManuallyTriggeredEvent { user: "test_user".into() }.into()).unwrap(); let canceled_job = running_job.cancel(ManuallyTriggeredEvent { user: "test_user".into() }.into()).unwrap();
// Generate the cancel event from the canceled state
let cancel_event = canceled_job.state.to_event(&canceled_job.id());
// Verify we got the cancel event // Verify we got the cancel event
assert_eq!(cancel_event.job_run_id, job_run.id().to_string()); assert_eq!(cancel_event.job_run_id, canceled_job.id().to_string());
assert!(cancel_event.source.is_some()); assert!(cancel_event.source.is_some());
assert_eq!(cancel_event.comment, Some("Job was canceled".to_string())); assert_eq!(cancel_event.comment, Some("Job was canceled".to_string()));

View file

@ -1,7 +1,7 @@
use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage}; use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage};
use crate::job::JobConfiguration; use crate::job::JobConfiguration;
use crate::job_run::JobRun; use crate::job_run::{NotStartedJobRun, RunningJobRun, CompletedJobRun, FailedJobRun, SubProcessBackend};
use crate::{JobRunStatusCode, PartitionRef, WantDetail}; use crate::{PartitionRef, WantDetail};
use std::collections::HashMap; use std::collections::HashMap;
use std::error::Error; use std::error::Error;
use std::fmt::Debug; use std::fmt::Debug;
@ -13,7 +13,10 @@ the visitor pattern to monitor job exec progress and liveness, and adds
struct Orchestrator<S: BELStorage + Debug> { struct Orchestrator<S: BELStorage + Debug> {
bel: BuildEventLog<S>, bel: BuildEventLog<S>,
job_runs: Vec<Box<dyn JobRun>>, not_started_jobs: Vec<NotStartedJobRun<SubProcessBackend>>,
running_jobs: Vec<RunningJobRun<SubProcessBackend>>,
completed_jobs: Vec<CompletedJobRun<SubProcessBackend>>,
failed_jobs: Vec<FailedJobRun<SubProcessBackend>>,
config: OrchestratorConfig, config: OrchestratorConfig,
} }
@ -21,7 +24,10 @@ impl Default for Orchestrator<MemoryBELStorage> {
fn default() -> Self { fn default() -> Self {
Self { Self {
bel: Default::default(), bel: Default::default(),
job_runs: Default::default(), not_started_jobs: Default::default(),
running_jobs: Default::default(),
completed_jobs: Default::default(),
failed_jobs: Default::default(),
config: Default::default(), config: Default::default(),
} }
} }
@ -31,7 +37,10 @@ impl Orchestrator<MemoryBELStorage> {
fn copy(&self) -> Self { fn copy(&self) -> Self {
Self { Self {
bel: self.bel.clone(), bel: self.bel.clone(),
job_runs: Default::default(), not_started_jobs: Default::default(),
running_jobs: Default::default(),
completed_jobs: Default::default(),
failed_jobs: Default::default(),
config: self.config.clone(), config: self.config.clone(),
} }
} }
@ -39,27 +48,18 @@ impl Orchestrator<MemoryBELStorage> {
impl<S: BELStorage + Debug> Orchestrator<S> { impl<S: BELStorage + Debug> Orchestrator<S> {
fn with_config(self, config: OrchestratorConfig) -> Self { fn with_config(self, config: OrchestratorConfig) -> Self {
Self { Self { config, ..self }
bel: self.bel,
job_runs: self.job_runs,
config,
}
} }
fn with_jobs(self, jobs: Vec<JobConfiguration>) -> Self { fn with_jobs(self, jobs: Vec<JobConfiguration>) -> Self {
Self { Self {
bel: self.bel,
job_runs: self.job_runs,
config: self.config.with_jobs(jobs), config: self.config.with_jobs(jobs),
..self
} }
} }
fn with_bel(self, bel: BuildEventLog<S>) -> Self { fn with_bel(self, bel: BuildEventLog<S>) -> Self {
Self { Self { bel, ..self }
bel,
job_runs: self.job_runs,
config: self.config,
}
} }
} }
@ -110,30 +110,46 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
fn new(storage: S, config: OrchestratorConfig) -> Self { fn new(storage: S, config: OrchestratorConfig) -> Self {
Self { Self {
bel: BuildEventLog::new(storage, Default::default()), bel: BuildEventLog::new(storage, Default::default()),
job_runs: Vec::new(), not_started_jobs: Vec::new(),
running_jobs: Vec::new(),
completed_jobs: Vec::new(),
failed_jobs: Vec::new(),
config, config,
} }
} }
/** Continuously invoked function to watch job run status */ /** Continuously invoked function to watch job run status */
fn poll_job_runs(&mut self) -> Result<(), Box<dyn Error>> { fn poll_job_runs(&mut self) -> Result<(), Box<dyn Error>> {
// Visit existing jobs, remove completed use crate::job_run::JobRunVisitResult;
self.job_runs.retain_mut(|jr| {
// Append emitted events
let result = jr
.visit()
.expect("Job visit failed");
result.new_events
.iter()
.for_each(|event| {
self.bel
.append_event(&event)
.expect("Failed to append event");
});
// Retain job run if it doesn't yet have an exit code (still running) // First, start any not-started jobs
result.status.code == JobRunStatusCode::JobRunRunning as i32 while let Some(job) = self.not_started_jobs.pop() {
}); let running = job.run()?;
self.running_jobs.push(running);
}
// Visit running jobs and transition them to terminal states
let mut still_running = Vec::new();
for mut job in self.running_jobs.drain(..) {
match job.visit()? {
JobRunVisitResult::StillRunning => {
still_running.push(job);
}
JobRunVisitResult::Completed(completed) => {
// Emit success event
let event = completed.state.to_event(&completed.id());
self.bel.append_event(&event)?;
self.completed_jobs.push(completed);
}
JobRunVisitResult::Failed(failed) => {
// Emit failure event
let event = failed.state.to_event(&failed.id());
self.bel.append_event(&event)?;
self.failed_jobs.push(failed);
}
}
}
self.running_jobs = still_running;
Ok(()) Ok(())
} }
@ -152,18 +168,18 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
.collect(); .collect();
let grouped_wants = Orchestrator::<S>::group_wants(&self.config, &schedulable_wants); let grouped_wants = Orchestrator::<S>::group_wants(&self.config, &schedulable_wants);
if !grouped_wants.want_groups.is_empty() { if !grouped_wants.unhandled_wants.is_empty() {
// All wants must be mapped to jobs that can be handled // All wants must be mapped to jobs that can be handled
// TODO we probably want to handle this gracefully in the near future // TODO we probably want to handle this gracefully in the near future
Err(format!( Err(format!(
"Unable to map following wants: {:?}", "Unable to map following wants: {:?}",
&grouped_wants.want_groups &grouped_wants.unhandled_wants
) )
.into()) .into())
} else { } else {
for wg in grouped_wants.want_groups { for wg in grouped_wants.want_groups {
let job_run = wg.job.spawn(wg.wants)?; let job_run = wg.job.spawn(wg.wants)?;
self.job_runs.push(job_run); self.not_started_jobs.push(job_run);
} }
Ok(()) Ok(())
@ -256,11 +272,15 @@ mod tests {
#[test] #[test]
fn test_empty_wants_noop() { fn test_empty_wants_noop() {
let mut orchestrator = build_orchestrator(); let mut orchestrator = build_orchestrator();
assert!(orchestrator.job_runs.is_empty()); // Should init with no work to do // Should init with no work to do
assert!(orchestrator.not_started_jobs.is_empty());
assert!(orchestrator.running_jobs.is_empty());
orchestrator orchestrator
.poll_wants() .poll_wants()
.expect("shouldn't fail to poll empty wants"); .expect("shouldn't fail to poll empty wants");
assert!(orchestrator.job_runs.is_empty()); // Should still be empty since no work to do // Should still be empty since no work to do
assert!(orchestrator.not_started_jobs.is_empty());
assert!(orchestrator.running_jobs.is_empty());
} }
// Use case: Some schedulable wants with jobs that can be matched should launch those jobs // Use case: Some schedulable wants with jobs that can be matched should launch those jobs