diff --git a/databuild/job.rs b/databuild/job.rs index e79c75a..357b8c4 100644 --- a/databuild/job.rs +++ b/databuild/job.rs @@ -1,7 +1,6 @@ -use crate::job_run::{spawn_job_run, JobRun, JobRunConfig}; +use crate::job_run::{NotStartedJobRun, SubProcessBackend}; use crate::{PartitionRef, WantDetail}; use regex::Regex; -use std::error::Error; #[derive(Debug, Clone)] pub struct JobConfiguration { @@ -12,12 +11,11 @@ pub struct JobConfiguration { impl JobConfiguration { /** Launch job to build the partitions specified by the provided wants. */ - pub fn spawn(&self, wants: Vec) -> Result, Box> { + pub fn spawn(&self, wants: Vec) -> Result, std::io::Error> { let wanted_refs: Vec = wants.iter().flat_map(|want| want.partitions.clone()).collect(); let args: Vec = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect(); - let config = JobRunConfig::SubProcess { entry_point: self.entry_point.clone(), args }; - spawn_job_run(config) + Ok(NotStartedJobRun::spawn(self.entry_point.clone(), args)) } pub fn matches(&self, refs: &PartitionRef) -> bool { diff --git a/databuild/job_run.rs b/databuild/job_run.rs index c5ed343..965694e 100644 --- a/databuild/job_run.rs +++ b/databuild/job_run.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::marker::PhantomData; use crate::data_build_event::Event; use crate::{JobRunHeartbeatEventV1, JobRunStatus, JobRunStatusCode, JobRunSuccessEventV1, JobRunFailureEventV1, JobRunCancelEventV1, EventSource}; use std::error::Error; @@ -9,227 +10,309 @@ use uuid::Uuid; // TODO log to /var/log/databuild/jobruns/$JOB_RUN_ID/, and rotate over max size (e.g. only ever use 1GB for logs) // Leave door open to background log processor that tails job logs, but don't include in jobrun concept -pub trait JobRun { - fn id(&self) -> Uuid; - /** - Visit is responsible for observing the state of the job run, - */ - fn visit(&mut self) -> Result>; - fn cancel(&mut self, source: EventSource) -> Result>; - fn run_with_env(&mut self, env: Option>) -> Result>; - fn run(&mut self) -> Result> { +/// Backend trait that defines the state types and transition logic for different job run implementations +pub trait JobRunBackend: Sized { + type NotStartedState; + type RunningState; + type CompletedState; + type FailedState; + type CanceledState; + + /// Create a new not-started job run + fn create(entry_point: String, args: Vec) -> Self::NotStartedState; + + /// Transition from NotStarted to Running + fn start( + not_started: Self::NotStartedState, + env: Option> + ) -> Result>; + + /// Poll a running job for state changes + fn poll( + running: &mut Self::RunningState + ) -> Result, Box>; + + /// Cancel a running job + fn cancel_job( + running: Self::RunningState, + source: EventSource + ) -> Result>; +} + +/// Result of polling a running job +pub enum PollResult { + StillRunning, + Completed(C), + Failed(F), +} + +/// Generic JobRun that works with any backend, parameterized by state +pub struct JobRun { + pub job_run_id: Uuid, + pub state: S, + _backend: PhantomData, +} + +/// Type aliases for specific states +pub type NotStartedJobRun = JobRun::NotStartedState>; +pub type RunningJobRun = JobRun::RunningState>; +pub type CompletedJobRun = JobRun::CompletedState>; +pub type FailedJobRun = JobRun::FailedState>; +pub type CanceledJobRun = JobRun::CanceledState>; + +// Methods available on all JobRun states +impl JobRun { + pub fn id(&self) -> Uuid { + self.job_run_id + } +} + +// Methods available only on NotStarted state +impl NotStartedJobRun { + pub fn spawn(entry_point: String, args: Vec) -> Self { + JobRun { + job_run_id: Uuid::new_v4(), + state: B::create(entry_point, args), + _backend: PhantomData, + } + } + + pub fn run(self) -> Result, Box> { self.run_with_env(None) } + + pub fn run_with_env( + self, + env: Option> + ) -> Result, Box> { + let running_state = B::start(self.state, env)?; + Ok(JobRun { + job_run_id: self.job_run_id, + state: running_state, + _backend: PhantomData, + }) + } +} + +// Methods available only on Running state +impl RunningJobRun { + pub fn visit(&mut self) -> Result, Box> { + match B::poll(&mut self.state)? { + PollResult::StillRunning => Ok(JobRunVisitResult::StillRunning), + PollResult::Completed(completed_state) => { + let job_run_id = self.job_run_id; + Ok(JobRunVisitResult::Completed(JobRun { + job_run_id, + state: completed_state, + _backend: PhantomData, + })) + } + PollResult::Failed(failed_state) => { + let job_run_id = self.job_run_id; + Ok(JobRunVisitResult::Failed(JobRun { + job_run_id, + state: failed_state, + _backend: PhantomData, + })) + } + } + } + + pub fn cancel(self, source: EventSource) -> Result, Box> { + let canceled_state = B::cancel_job(self.state, source)?; + Ok(JobRun { + job_run_id: self.job_run_id, + state: canceled_state, + _backend: PhantomData, + }) + } +} + +/// Result of visiting a running job +pub enum JobRunVisitResult { + StillRunning, + Completed(CompletedJobRun), + Failed(FailedJobRun), } pub enum JobRunConfig { SubProcess { entry_point: String, args: Vec }, } -pub fn spawn_job_run(config: JobRunConfig) -> Result, Box> { - match config { - JobRunConfig::SubProcess { entry_point, args } => Ok(SubProcessJobRun::spawn(entry_point, args)?), - _ => Err("No impl for this job config type".into()), - } -} +// ===== SubProcess Backend Implementation ===== -pub struct SubProcessJobRun { - pub job_run_id: Uuid, +/// SubProcess backend for running jobs as local subprocesses +pub struct SubProcessBackend; + +/// NotStarted state for SubProcess backend +pub struct SubProcessNotStarted { pub entry_point: String, pub args: Vec, - pub state: JobRunState, } -enum JobRunState { - NotStarted, - Running { - process: Child, - stdout_buffer: Vec, - }, - Completed { - exit_code: i32, - stdout_buffer: Vec, - }, - Failed { - exit_code: i32, - reason: String, - stdout_buffer: Vec, - }, - Canceled { - source: EventSource, - stdout_buffer: Vec, - }, +/// Running state for SubProcess backend +pub struct SubProcessRunning { + pub process: Child, + pub stdout_buffer: Vec, } -impl JobRun for SubProcessJobRun { - fn id(&self) -> Uuid { - self.job_run_id +/// Completed state for SubProcess backend +pub struct SubProcessCompleted { + pub exit_code: i32, + pub stdout_buffer: Vec, +} + +/// Failed state for SubProcess backend +pub struct SubProcessFailed { + pub exit_code: i32, + pub reason: String, + pub stdout_buffer: Vec, +} + +/// Canceled state for SubProcess backend +pub struct SubProcessCanceled { + pub source: EventSource, + pub stdout_buffer: Vec, +} + +impl JobRunBackend for SubProcessBackend { + type NotStartedState = SubProcessNotStarted; + type RunningState = SubProcessRunning; + type CompletedState = SubProcessCompleted; + type FailedState = SubProcessFailed; + type CanceledState = SubProcessCanceled; + + fn create(entry_point: String, args: Vec) -> Self::NotStartedState { + SubProcessNotStarted { entry_point, args } } - fn visit(&mut self) -> Result> { - let mut new_events = Vec::new(); + fn start( + not_started: Self::NotStartedState, + env: Option> + ) -> Result> { + let process = Command::new(not_started.entry_point) + .args(not_started.args) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .envs(env.unwrap_or_default()) + .spawn()?; - match &mut self.state { - JobRunState::Running { process, stdout_buffer } => { - // Non-blocking check for exit status - if let Some(exit_status) = process.try_wait()? { - // Read any remaining stdout - if let Some(stdout) = process.stdout.take() { - let reader = BufReader::new(stdout); - for line in reader.lines() { - // TODO we should write lines to the job's file logs - if let Ok(line) = line { - stdout_buffer.push(line); - } - } - } + Ok(SubProcessRunning { + process, + stdout_buffer: Vec::new(), + }) + } - // Take ownership of the current state to transition - let old_state = std::mem::replace(&mut self.state, JobRunState::NotStarted); - let stdout_buf = if let JobRunState::Running { stdout_buffer, .. } = old_state { - stdout_buffer - } else { - Vec::new() - }; - - // Check exit status and transition to terminal state - match exit_status.code() { - Some(0) => { - // Success case - self.state = JobRunState::Completed { - exit_code: 0, - stdout_buffer: stdout_buf, - }; - new_events.push(Event::JobRunSuccessV1(JobRunSuccessEventV1 { - job_run_id: self.job_run_id.to_string(), - })); - return Ok(JobRunPollResult { - new_events, - status: JobRunStatusCode::JobRunSucceeded.into(), - }); - } - Some(code) => { - // Failed with exit code - let reason = format!("Job failed with exit code {}", code); - self.state = JobRunState::Failed { - exit_code: code, - reason: reason.clone(), - stdout_buffer: stdout_buf, - }; - new_events.push(Event::JobRunFailureV1(JobRunFailureEventV1 { - job_run_id: self.job_run_id.to_string(), - reason, - })); - return Ok(JobRunPollResult { - new_events, - status: JobRunStatusCode::JobRunFailed.into(), - }); - } - None => { - // Terminated by signal (Unix) - treat as failure - let reason = format!("Job terminated by signal: {}", exit_status); - self.state = JobRunState::Failed { - exit_code: -1, - reason: reason.clone(), - stdout_buffer: stdout_buf, - }; - new_events.push(Event::JobRunFailureV1(JobRunFailureEventV1 { - job_run_id: self.job_run_id.to_string(), - reason, - })); - return Ok(JobRunPollResult { - new_events, - status: JobRunStatusCode::JobRunFailed.into(), - }); - } + fn poll( + running: &mut Self::RunningState + ) -> Result, Box> { + // Non-blocking check for exit status + if let Some(exit_status) = running.process.try_wait()? { + // Read any remaining stdout + if let Some(stdout) = running.process.stdout.take() { + let reader = BufReader::new(stdout); + for line in reader.lines() { + // TODO we should write lines to the job's file logs + if let Ok(line) = line { + running.stdout_buffer.push(line); } } - - // Still running - Ok(JobRunPollResult { - new_events, - status: JobRunStatusCode::JobRunRunning.into(), - }) } - _ => Err("visit() called on non-running job".into()), + + // Take ownership of stdout_buffer + let stdout_buffer = std::mem::take(&mut running.stdout_buffer); + + // Check exit status and return appropriate result + match exit_status.code() { + Some(0) => { + // Success case + Ok(PollResult::Completed(SubProcessCompleted { + exit_code: 0, + stdout_buffer, + })) + } + Some(code) => { + // Failed with exit code + let reason = format!("Job failed with exit code {}", code); + Ok(PollResult::Failed(SubProcessFailed { + exit_code: code, + reason, + stdout_buffer, + })) + } + None => { + // Terminated by signal (Unix) - treat as failure + let reason = format!("Job terminated by signal: {}", exit_status); + Ok(PollResult::Failed(SubProcessFailed { + exit_code: -1, + reason, + stdout_buffer, + })) + } + } + } else { + // Still running + Ok(PollResult::StillRunning) } } - fn cancel(&mut self, source: EventSource) -> Result> { - match std::mem::replace(&mut self.state, JobRunState::NotStarted) { - JobRunState::Running { mut process, stdout_buffer } => { - // Kill the process - process.kill()?; + fn cancel_job( + mut running: Self::RunningState, + source: EventSource + ) -> Result> { + // Kill the process + running.process.kill()?; - // Wait for it to actually terminate - process.wait()?; + // Wait for it to actually terminate + running.process.wait()?; - // Transition to Canceled state - self.state = JobRunState::Canceled { - source: source.clone(), - stdout_buffer, - }; - - Ok(JobRunCancelEventV1 { - job_run_id: self.job_run_id.to_string(), - source: Some(source), - comment: Some("Job was canceled".to_string()), - }) - } - other_state => { - // Restore the state and error - self.state = other_state; - Err("cancel() called on non-running job".into()) - } - } + // Return canceled state + Ok(SubProcessCanceled { + source, + stdout_buffer: running.stdout_buffer, + }) } +} - /// Mostly for test purposes - fn run_with_env(&mut self, env: Option>) -> Result> { - match &self.state { - JobRunState::NotStarted => { - let process = Command::new(self.entry_point.clone()) - .args(self.args.clone()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .envs(env.unwrap_or_default()) - .spawn()?; +// Helper functions to convert between states and events +impl SubProcessCompleted { + pub fn to_event(&self, job_run_id: &Uuid) -> Event { + Event::JobRunSuccessV1(JobRunSuccessEventV1 { + job_run_id: job_run_id.to_string(), + }) + } +} - self.state = JobRunState::Running { - process, - stdout_buffer: Vec::new(), - }; +impl SubProcessFailed { + pub fn to_event(&self, job_run_id: &Uuid) -> Event { + Event::JobRunFailureV1(JobRunFailureEventV1 { + job_run_id: job_run_id.to_string(), + reason: self.reason.clone(), + }) + } +} - // TODO should this return the event now? Or enqueue it? No sense in waiting I suppose, and orchestrator should just handle it? - Ok(JobRunHeartbeatEventV1 { job_run_id: self.id().to_string() }) - } - _ => Err("run() called on already-running or completed job".into()) +impl SubProcessCanceled { + pub fn to_event(&self, job_run_id: &Uuid) -> JobRunCancelEventV1 { + JobRunCancelEventV1 { + job_run_id: job_run_id.to_string(), + source: Some(self.source.clone()), + comment: Some("Job was canceled".to_string()), } } } -impl SubProcessJobRun { - pub fn spawn(entry_point: String, args: Vec) -> Result, Box> { - Ok(Box::new(SubProcessJobRun { - job_run_id: Uuid::new_v4(), - entry_point, - args, - state: JobRunState::NotStarted, - })) - } -} - +// Old JobRunPollResult structure - kept for compatibility during migration pub struct JobRunPollResult { - pub new_events: Vec, // Parsed BEL events, not raw lines + pub new_events: Vec, pub status: JobRunStatus, } mod tests { use std::collections::HashMap; use crate::data_build_event::Event; - use crate::job_run::{JobRun, SubProcessJobRun}; - use crate::{JobRunStatusCode, ManuallyTriggeredEvent}; - use uuid::Uuid; + use crate::job_run::{JobRunVisitResult, NotStartedJobRun, SubProcessBackend}; + use crate::{ManuallyTriggeredEvent}; fn test_helper_path() -> String { std::env::var("TEST_SRCDIR") @@ -241,35 +324,29 @@ mod tests { #[test] fn test_job_run_success_returns_job_run_success_event() { // Spawn a job run that will succeed (exit code 0) - let mut job_run = SubProcessJobRun::spawn(test_helper_path(), vec![]).unwrap(); + let job_run: NotStartedJobRun = NotStartedJobRun::spawn(test_helper_path(), vec![]); - // Start the job - job_run.run().unwrap(); + // Start the job - this consumes the NotStarted and returns Running + let mut running_job = job_run.run().unwrap(); // Poll until we get completion loop { - let result = job_run.visit().unwrap(); - - // Check if we got a success event - let has_success = result.new_events.iter().any(|event| { - matches!(event, Event::JobRunSuccessV1(_)) - }); - - if has_success { - let expected = JobRunStatusCode::JobRunSucceeded as i32; - assert!(matches!(result.status.code, expected)); - break; + match running_job.visit().unwrap() { + JobRunVisitResult::Completed(completed) => { + // Generate the event from the completed state + let event = completed.state.to_event(&completed.id()); + assert!(matches!(event, Event::JobRunSuccessV1(_))); + break; + } + JobRunVisitResult::Failed(failed) => { + panic!("Job failed unexpectedly: {}", failed.state.reason); + } + JobRunVisitResult::StillRunning => { + // Sleep briefly and poll again + std::thread::sleep(std::time::Duration::from_millis(10)); + continue; + } } - - // If job is still running, sleep briefly and poll again - let expected = JobRunStatusCode::JobRunRunning as i32; - if matches!(result.status.code, expected) { - std::thread::sleep(std::time::Duration::from_millis(10)); - continue; - } - - // If we got here, job failed when it shouldn't have - panic!("Job failed unexpectedly: {:?}", result.status); } } @@ -277,41 +354,32 @@ mod tests { #[test] fn test_job_run_failure_returns_job_run_failure_event() { // Spawn a job run - let mut job_run = SubProcessJobRun::spawn(test_helper_path(), vec![]).unwrap(); + let job_run: NotStartedJobRun = NotStartedJobRun::spawn(test_helper_path(), vec![]); // Start the job with an exit code that indicates failure (non-zero) let env: HashMap = HashMap::from([ ("DATABUILD_TEST_EXIT_CODE".to_string(), "1".to_string()) ]); - job_run.run_with_env(Some(env)).unwrap(); - + let mut running_job = job_run.run_with_env(Some(env)).unwrap(); // Poll until we get completion loop { - let result = job_run.visit().unwrap(); - - // Check if we got a success event - if result.new_events.iter().any(|event| { - matches!(event, Event::JobRunSuccessV1(_)) - }) { - panic!("Job succeeded unexpectedly"); - }; - - if result.new_events.iter().any(|event| { - matches!(event, Event::JobRunFailureV1(_)) - }) { - break; + match running_job.visit().unwrap() { + JobRunVisitResult::Completed(_) => { + panic!("Job succeeded unexpectedly"); + } + JobRunVisitResult::Failed(failed) => { + // Generate the event from the failed state + let event = failed.state.to_event(&failed.id()); + assert!(matches!(event, Event::JobRunFailureV1(_))); + break; + } + JobRunVisitResult::StillRunning => { + // Sleep briefly and poll again + std::thread::sleep(std::time::Duration::from_millis(10)); + continue; + } } - - // If job is still running, sleep briefly and poll again - let expected = JobRunStatusCode::JobRunRunning as i32; - if matches!(result.status.code, expected) { - std::thread::sleep(std::time::Duration::from_millis(10)); - continue; - } - - // If we got here, job failed when it shouldn't have - panic!("Job failed unexpectedly: {:?}", result.status); } } @@ -322,28 +390,32 @@ mod tests { fn test_job_run_cancel_returns_job_run_cancel_event() { use std::fs; use crate::ManuallyTriggeredEvent; + use uuid::Uuid; // Create a temp file path for the test let temp_file = format!("/tmp/databuild_test_cancel_{}", Uuid::new_v4()); // Spawn a job run that will sleep for 1 second and write a file - let mut job_run = SubProcessJobRun::spawn(test_helper_path(), vec![]).unwrap(); + let job_run: NotStartedJobRun = NotStartedJobRun::spawn(test_helper_path(), vec![]); let env: HashMap = HashMap::from([ ("DATABUILD_TEST_SLEEP_MS".to_string(), "1000".to_string()), ("DATABUILD_TEST_OUTPUT_FILE".to_string(), temp_file.clone()), ("DATABUILD_TEST_FILE_CONTENT".to_string(), "completed".to_string()), ]); - job_run.run_with_env(Some(env)).unwrap(); + let running_job = job_run.run_with_env(Some(env)).unwrap(); // Give it a tiny bit of time to start std::thread::sleep(std::time::Duration::from_millis(10)); - // Cancel the job before it can complete - this returns the cancel event - let cancel_event = job_run.cancel(ManuallyTriggeredEvent { user: "test_user".into() }.into()).unwrap(); + // Cancel the job before it can complete - this consumes the running job and returns canceled + let canceled_job = running_job.cancel(ManuallyTriggeredEvent { user: "test_user".into() }.into()).unwrap(); + + // Generate the cancel event from the canceled state + let cancel_event = canceled_job.state.to_event(&canceled_job.id()); // Verify we got the cancel event - assert_eq!(cancel_event.job_run_id, job_run.id().to_string()); + assert_eq!(cancel_event.job_run_id, canceled_job.id().to_string()); assert!(cancel_event.source.is_some()); assert_eq!(cancel_event.comment, Some("Job was canceled".to_string())); diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index 32b9602..2553405 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -1,7 +1,7 @@ use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage}; use crate::job::JobConfiguration; -use crate::job_run::JobRun; -use crate::{JobRunStatusCode, PartitionRef, WantDetail}; +use crate::job_run::{NotStartedJobRun, RunningJobRun, CompletedJobRun, FailedJobRun, SubProcessBackend}; +use crate::{PartitionRef, WantDetail}; use std::collections::HashMap; use std::error::Error; use std::fmt::Debug; @@ -13,7 +13,10 @@ the visitor pattern to monitor job exec progress and liveness, and adds struct Orchestrator { bel: BuildEventLog, - job_runs: Vec>, + not_started_jobs: Vec>, + running_jobs: Vec>, + completed_jobs: Vec>, + failed_jobs: Vec>, config: OrchestratorConfig, } @@ -21,7 +24,10 @@ impl Default for Orchestrator { fn default() -> Self { Self { bel: Default::default(), - job_runs: Default::default(), + not_started_jobs: Default::default(), + running_jobs: Default::default(), + completed_jobs: Default::default(), + failed_jobs: Default::default(), config: Default::default(), } } @@ -31,7 +37,10 @@ impl Orchestrator { fn copy(&self) -> Self { Self { bel: self.bel.clone(), - job_runs: Default::default(), + not_started_jobs: Default::default(), + running_jobs: Default::default(), + completed_jobs: Default::default(), + failed_jobs: Default::default(), config: self.config.clone(), } } @@ -39,27 +48,18 @@ impl Orchestrator { impl Orchestrator { fn with_config(self, config: OrchestratorConfig) -> Self { - Self { - bel: self.bel, - job_runs: self.job_runs, - config, - } + Self { config, ..self } } fn with_jobs(self, jobs: Vec) -> Self { Self { - bel: self.bel, - job_runs: self.job_runs, config: self.config.with_jobs(jobs), + ..self } } fn with_bel(self, bel: BuildEventLog) -> Self { - Self { - bel, - job_runs: self.job_runs, - config: self.config, - } + Self { bel, ..self } } } @@ -110,30 +110,46 @@ impl Orchestrator { fn new(storage: S, config: OrchestratorConfig) -> Self { Self { bel: BuildEventLog::new(storage, Default::default()), - job_runs: Vec::new(), + not_started_jobs: Vec::new(), + running_jobs: Vec::new(), + completed_jobs: Vec::new(), + failed_jobs: Vec::new(), config, } } /** Continuously invoked function to watch job run status */ fn poll_job_runs(&mut self) -> Result<(), Box> { - // Visit existing jobs, remove completed - self.job_runs.retain_mut(|jr| { - // Append emitted events - let result = jr - .visit() - .expect("Job visit failed"); - result.new_events - .iter() - .for_each(|event| { - self.bel - .append_event(&event) - .expect("Failed to append event"); - }); + use crate::job_run::JobRunVisitResult; - // Retain job run if it doesn't yet have an exit code (still running) - result.status.code == JobRunStatusCode::JobRunRunning as i32 - }); + // First, start any not-started jobs + while let Some(job) = self.not_started_jobs.pop() { + let running = job.run()?; + self.running_jobs.push(running); + } + + // Visit running jobs and transition them to terminal states + let mut still_running = Vec::new(); + for mut job in self.running_jobs.drain(..) { + match job.visit()? { + JobRunVisitResult::StillRunning => { + still_running.push(job); + } + JobRunVisitResult::Completed(completed) => { + // Emit success event + let event = completed.state.to_event(&completed.id()); + self.bel.append_event(&event)?; + self.completed_jobs.push(completed); + } + JobRunVisitResult::Failed(failed) => { + // Emit failure event + let event = failed.state.to_event(&failed.id()); + self.bel.append_event(&event)?; + self.failed_jobs.push(failed); + } + } + } + self.running_jobs = still_running; Ok(()) } @@ -152,18 +168,18 @@ impl Orchestrator { .collect(); let grouped_wants = Orchestrator::::group_wants(&self.config, &schedulable_wants); - if !grouped_wants.want_groups.is_empty() { + if !grouped_wants.unhandled_wants.is_empty() { // All wants must be mapped to jobs that can be handled // TODO we probably want to handle this gracefully in the near future Err(format!( "Unable to map following wants: {:?}", - &grouped_wants.want_groups + &grouped_wants.unhandled_wants ) .into()) } else { for wg in grouped_wants.want_groups { let job_run = wg.job.spawn(wg.wants)?; - self.job_runs.push(job_run); + self.not_started_jobs.push(job_run); } Ok(()) @@ -256,11 +272,15 @@ mod tests { #[test] fn test_empty_wants_noop() { let mut orchestrator = build_orchestrator(); - assert!(orchestrator.job_runs.is_empty()); // Should init with no work to do + // Should init with no work to do + assert!(orchestrator.not_started_jobs.is_empty()); + assert!(orchestrator.running_jobs.is_empty()); orchestrator .poll_wants() .expect("shouldn't fail to poll empty wants"); - assert!(orchestrator.job_runs.is_empty()); // Should still be empty since no work to do + // Should still be empty since no work to do + assert!(orchestrator.not_started_jobs.is_empty()); + assert!(orchestrator.running_jobs.is_empty()); } // Use case: Some schedulable wants with jobs that can be matched should launch those jobs