From 55f51125c3b3c0a6d38304ed56bed2b6eef2479d Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Fri, 21 Nov 2025 15:57:16 +0800 Subject: [PATCH] refactor job runs to have internal state objects --- databuild/job.rs | 6 +- databuild/job_run.rs | 314 +++++++++++++++++++++++--------------- databuild/orchestrator.rs | 230 +++++++++++++++------------- 3 files changed, 312 insertions(+), 238 deletions(-) diff --git a/databuild/job.rs b/databuild/job.rs index 2a70323..759f1a4 100644 --- a/databuild/job.rs +++ b/databuild/job.rs @@ -1,4 +1,4 @@ -use crate::job_run::{NotStartedJobRun, SubProcessBackend}; +use crate::job_run::{JobRun, SubProcessBackend}; use crate::{JobConfig, PartitionRef, WantDetail}; use regex::Regex; use crate::util::DatabuildError; @@ -12,11 +12,11 @@ pub struct JobConfiguration { impl JobConfiguration { /** Launch job to build the partitions specified by the provided wants. */ - pub fn spawn(&self, wants: Vec) -> Result, std::io::Error> { + pub fn spawn(&self, wants: Vec) -> Result, std::io::Error> { let wanted_refs: Vec = wants.iter().flat_map(|want| want.partitions.clone()).collect(); let args: Vec = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect(); - Ok(NotStartedJobRun::spawn(self.entry_point.clone(), args)) + Ok(JobRun::spawn(self.entry_point.clone(), args)) } pub fn matches(&self, refs: &PartitionRef) -> bool { diff --git a/databuild/job_run.rs b/databuild/job_run.rs index 9a96146..08c8299 100644 --- a/databuild/job_run.rs +++ b/databuild/job_run.rs @@ -25,11 +25,6 @@ pub trait JobRunBackend: Sized { /// Create a new not-started job run fn create(entry_point: String, args: Vec) -> Self::NotStartedState; - /// Convenience method to spawn a new job run (calls create and wraps in JobRun) - fn spawn(entry_point: String, args: Vec) -> NotStartedJobRun { - NotStartedJobRun::spawn(entry_point, args) - } - /// Transition from NotStarted to Running fn start( not_started: Self::NotStartedState, @@ -59,103 +54,35 @@ pub enum PollResult { DepMiss(D), } -/// Generic JobRun that works with any backend, parameterized by state -pub struct JobRun { +// ===== TYPE-SAFE STATE MACHINE PATTERN ===== +// Uses parameterized JobRunWithState wrapped in JobRun enum for storage + +/// New JobRun with embedded state enum +/// Type-safe job run struct, parameterized by backend and state +/// This struct can only perform operations valid for its current state type +pub struct JobRunWithState { pub job_run_id: Uuid, pub state: S, pub _backend: PhantomData, } -/// Type aliases for specific states -pub type NotStartedJobRun = JobRun::NotStartedState>; -pub type RunningJobRun = JobRun::RunningState>; -pub type CompletedJobRun = JobRun::CompletedState>; -pub type FailedJobRun = JobRun::FailedState>; -pub type CanceledJobRun = JobRun::CanceledState>; -pub type DepMissJobRun = JobRun::DepMissState>; - -// Methods available on all JobRun states -impl JobRun { - pub fn id(&self) -> Uuid { - self.job_run_id - } +/// Wrapper enum for storing job runs in a single collection +/// This allows us to store jobs in different states together while maintaining type safety +pub enum JobRun { + NotStarted(JobRunWithState), + Running(JobRunWithState), + Completed(JobRunWithState), + Failed(JobRunWithState), + Canceled(JobRunWithState), + DepMiss(JobRunWithState), } -// Methods available only on NotStarted state -impl NotStartedJobRun { - pub fn spawn(entry_point: String, args: Vec) -> Self { - JobRun { - job_run_id: Uuid::new_v4(), - state: B::create(entry_point, args), - _backend: PhantomData, - } - } - - pub fn run(self) -> Result, DatabuildError> { - self.run_with_env(None) - } - - pub fn run_with_env( - self, - env: Option>, - ) -> Result, DatabuildError> { - let running_state = B::start(self.state, env)?; - Ok(JobRun { - job_run_id: self.job_run_id, - state: running_state, - _backend: PhantomData, - }) - } -} - -// Methods available only on Running state -impl RunningJobRun { - pub fn visit(&mut self) -> Result, DatabuildError> { - match B::poll(&mut self.state)? { - PollResult::StillRunning => Ok(JobRunVisitResult::StillRunning), - PollResult::Completed(completed_state) => { - let job_run_id = self.job_run_id; - Ok(JobRunVisitResult::Completed(JobRun { - job_run_id, - state: completed_state, - _backend: PhantomData, - })) - } - PollResult::Failed(failed_state) => { - let job_run_id = self.job_run_id; - Ok(JobRunVisitResult::Failed(JobRun { - job_run_id, - state: failed_state, - _backend: PhantomData, - })) - } - PollResult::DepMiss(state) => { - let job_run_id = self.job_run_id; - Ok(JobRunVisitResult::DepMiss(JobRun { - job_run_id, - state, - _backend: PhantomData, - })) - } - } - } - - pub fn cancel(self, source: EventSource) -> Result, DatabuildError> { - let canceled_state = B::cancel_job(self.state, source)?; - Ok(JobRun { - job_run_id: self.job_run_id, - state: canceled_state, - _backend: PhantomData, - }) - } -} - -/// Result of visiting a running job -pub enum JobRunVisitResult { - StillRunning, - Completed(CompletedJobRun), - Failed(FailedJobRun), - DepMiss(DepMissJobRun), +/// Result of visiting a running job - returns the typed states +pub enum VisitResult { + StillRunning(JobRunWithState), + Completed(JobRunWithState), + Failed(JobRunWithState), + DepMiss(JobRunWithState), } pub enum JobRunConfig { @@ -368,40 +295,164 @@ pub struct JobRunPollResult { pub status: JobRunStatus, } +// ===== Type-Safe State Transition Implementation ===== + +// Factory and helper methods on the JobRun enum +impl JobRun { + /// Create a new job run in the NotStarted state + pub fn spawn(entry_point: String, args: Vec) -> Self { + JobRun::NotStarted(JobRunWithState { + job_run_id: Uuid::new_v4(), + state: B::create(entry_point, args), + _backend: PhantomData, + }) + } + + /// Get the job run ID regardless of state + pub fn job_run_id(&self) -> &Uuid { + match self { + JobRun::NotStarted(j) => &j.job_run_id, + JobRun::Running(j) => &j.job_run_id, + JobRun::Completed(j) => &j.job_run_id, + JobRun::Failed(j) => &j.job_run_id, + JobRun::Canceled(j) => &j.job_run_id, + JobRun::DepMiss(j) => &j.job_run_id, + } + } + + /// Check if the job is in a terminal state + pub fn is_terminal(&self) -> bool { + matches!( + self, + JobRun::Completed(_) | JobRun::Failed(_) | JobRun::Canceled(_) | JobRun::DepMiss(_) + ) + } +} + +// Type-safe transition: NotStarted -> Running +// This method can ONLY be called on NotStarted jobs - compile error otherwise! +impl JobRunWithState { + pub fn run( + self, + env: Option>, + ) -> Result, DatabuildError> { + let running = B::start(self.state, env)?; + Ok(JobRunWithState { + job_run_id: self.job_run_id, + state: running, + _backend: PhantomData, + }) + } +} + +// Type-safe transition: Running -> (Running | Completed | Failed | DepMiss) +// This method can ONLY be called on Running jobs - compile error otherwise! +impl JobRunWithState { + pub fn visit(mut self) -> Result, DatabuildError> { + match B::poll(&mut self.state)? { + PollResult::StillRunning => Ok(VisitResult::StillRunning(self)), + PollResult::Completed(completed) => Ok(VisitResult::Completed(JobRunWithState { + job_run_id: self.job_run_id, + state: completed, + _backend: PhantomData, + })), + PollResult::Failed(failed) => Ok(VisitResult::Failed(JobRunWithState { + job_run_id: self.job_run_id, + state: failed, + _backend: PhantomData, + })), + PollResult::DepMiss(dep_miss) => Ok(VisitResult::DepMiss(JobRunWithState { + job_run_id: self.job_run_id, + state: dep_miss, + _backend: PhantomData, + })), + } + } + + pub fn cancel( + self, + source: EventSource, + ) -> Result, DatabuildError> { + let canceled = B::cancel_job(self.state, source)?; + Ok(JobRunWithState { + job_run_id: self.job_run_id, + state: canceled, + _backend: PhantomData, + }) + } +} + +// Helper trait for converting states to events +pub trait ToEvent { + fn to_event(&self, job_run_id: &Uuid) -> Event; +} + +impl ToEvent for SubProcessCompleted { + fn to_event(&self, job_run_id: &Uuid) -> Event { + Event::JobRunSuccessV1(JobRunSuccessEventV1 { + job_run_id: job_run_id.to_string(), + }) + } +} + +impl ToEvent for SubProcessFailed { + fn to_event(&self, job_run_id: &Uuid) -> Event { + Event::JobRunFailureV1(JobRunFailureEventV1 { + job_run_id: job_run_id.to_string(), + reason: self.reason.clone(), + }) + } +} + +impl ToEvent for SubProcessDepMiss { + fn to_event(&self, job_run_id: &Uuid) -> Event { + Event::JobRunMissingDepsV1(JobRunMissingDepsEventV1 { + job_run_id: job_run_id.to_string(), + missing_deps: self.missing_deps.clone(), + read_deps: self.read_deps.clone(), + }) + } +} + mod tests { use crate::data_build_event::Event; use crate::data_deps::DATABUILD_MISSING_DEPS_JSON; - use crate::job_run::{JobRunBackend, JobRunVisitResult, SubProcessBackend}; + use crate::job_run::{JobRun, JobRunBackend, VisitResult, SubProcessBackend}; use crate::mock_job_run::MockJobRun; - use crate::{JobRunMissingDeps, ManuallyTriggeredEvent, MissingDeps}; + use crate::{JobRunMissingDeps, MissingDeps}; /// Happy path - run that succeeds should emit a JobRunSuccessEventV1 #[test] fn test_job_run_success_returns_job_run_success_event() { // Spawn a job run that will succeed (exit code 0) - let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]); + let job_run = JobRun::::spawn(MockJobRun::bin_path(), vec![]); // Start the job - this consumes the NotStarted and returns Running - let mut running_job = job_run.run().unwrap(); + let running_job = match job_run { + JobRun::NotStarted(not_started) => not_started.run(None).unwrap(), + _ => panic!("Expected NotStarted job"), + }; // Poll until we get completion + let mut current_job = running_job; loop { - match running_job.visit().unwrap() { - JobRunVisitResult::Completed(completed) => { + match current_job.visit().unwrap() { + VisitResult::Completed(completed) => { // Generate the event from the completed state - let event = completed.state.to_event(&completed.id()); + let event = completed.state.to_event(&completed.job_run_id); assert!(matches!(event, Event::JobRunSuccessV1(_))); break; } - JobRunVisitResult::Failed(failed) => { + VisitResult::Failed(failed) => { panic!("Job failed unexpectedly: {}", failed.state.reason); } - JobRunVisitResult::StillRunning => { + VisitResult::StillRunning(still_running) => { // Sleep briefly and poll again std::thread::sleep(std::time::Duration::from_millis(10)); + current_job = still_running; continue; } - JobRunVisitResult::DepMiss(dep_miss) => { + VisitResult::DepMiss(_dep_miss) => { panic!("Job dep miss unexpectedly"); } } @@ -412,30 +463,35 @@ mod tests { #[test] fn test_job_run_failure_returns_job_run_failure_event() { // Spawn a job run - let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]); + let job_run = JobRun::::spawn(MockJobRun::bin_path(), vec![]); // Start the job with an exit code that indicates failure (non-zero) let env = MockJobRun::new().exit_code(1).to_env(); - let mut running_job = job_run.run_with_env(Some(env)).unwrap(); + let running_job = match job_run { + JobRun::NotStarted(not_started) => not_started.run(Some(env)).unwrap(), + _ => panic!("Expected NotStarted job"), + }; // Poll until we get completion + let mut current_job = running_job; loop { - match running_job.visit().unwrap() { - JobRunVisitResult::Completed(_) => { + match current_job.visit().unwrap() { + VisitResult::Completed(_) => { panic!("Job succeeded unexpectedly"); } - JobRunVisitResult::Failed(failed) => { + VisitResult::Failed(failed) => { // Generate the event from the failed state - let event = failed.state.to_event(&failed.id()); + let event = failed.state.to_event(&failed.job_run_id); assert!(matches!(event, Event::JobRunFailureV1(_))); break; } - JobRunVisitResult::StillRunning => { + VisitResult::StillRunning(still_running) => { // Sleep briefly and poll again std::thread::sleep(std::time::Duration::from_millis(10)); + current_job = still_running; continue; } - JobRunVisitResult::DepMiss(dep_miss) => { + VisitResult::DepMiss(_dep_miss) => { panic!("Job dep miss unexpectedly"); } } @@ -455,14 +511,17 @@ mod tests { let temp_file = format!("/tmp/databuild_test_cancel_{}", Uuid::new_v4()); // Spawn a job run that will sleep for 1 second and write a file - let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]); + let job_run = JobRun::::spawn(MockJobRun::bin_path(), vec![]); let env = MockJobRun::new() .sleep_ms(1000) .output_file(&temp_file, &"completed".to_string()) .exit_code(0) .to_env(); - let running_job = job_run.run_with_env(Some(env)).unwrap(); + let running_job = match job_run { + JobRun::NotStarted(not_started) => not_started.run(Some(env)).unwrap(), + _ => panic!("Expected NotStarted job"), + }; // Give it a tiny bit of time to start std::thread::sleep(std::time::Duration::from_millis(10)); @@ -478,10 +537,10 @@ mod tests { .unwrap(); // Generate the cancel event from the canceled state - let cancel_event = canceled_job.state.to_event(&canceled_job.id()); + let cancel_event = canceled_job.state.to_event(&canceled_job.job_run_id); // Verify we got the cancel event - assert_eq!(cancel_event.job_run_id, canceled_job.id().to_string()); + assert_eq!(cancel_event.job_run_id, canceled_job.job_run_id.to_string()); assert!(cancel_event.source.is_some()); assert_eq!(cancel_event.comment, Some("Job was canceled".to_string())); @@ -499,7 +558,7 @@ mod tests { #[test] fn test_job_run_fail_on_missing_deps_should_emit_missing_deps_event() { // Spawn a job run that will sleep for 1 second and write a file - let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]); + let job_run = JobRun::::spawn(MockJobRun::bin_path(), vec![]); let expected_dep_miss = JobRunMissingDeps { version: "1".into(), @@ -515,24 +574,29 @@ mod tests { .stdout_msg(&dep_miss_line) .exit_code(1) .to_env(); - let mut running_job = job_run.run_with_env(Some(env)).unwrap(); + let running_job = match job_run { + JobRun::NotStarted(not_started) => not_started.run(Some(env)).unwrap(), + _ => panic!("Expected NotStarted job"), + }; // Poll until we get completion + let mut current_job = running_job; loop { - match running_job.visit().unwrap() { - JobRunVisitResult::Completed(_) => { + match current_job.visit().unwrap() { + VisitResult::Completed(_) => { panic!("Job succeeded unexpectedly"); } - JobRunVisitResult::Failed(failed) => { + VisitResult::Failed(_failed) => { panic!("Job failed unexpectedly"); } - JobRunVisitResult::StillRunning => { + VisitResult::StillRunning(still_running) => { // Sleep briefly and poll again std::thread::sleep(std::time::Duration::from_millis(10)); + current_job = still_running; continue; } - JobRunVisitResult::DepMiss(backend) => { - assert_eq!(backend.state.missing_deps, expected_dep_miss.missing_deps); + VisitResult::DepMiss(dep_miss) => { + assert_eq!(dep_miss.state.missing_deps, expected_dep_miss.missing_deps); break; } } diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index 006f3d5..ef31745 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -1,10 +1,7 @@ use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage}; use crate::data_build_event::Event; use crate::job::JobConfiguration; -use crate::job_run::{ - CompletedJobRun, DepMissJobRun, FailedJobRun, JobRunVisitResult, NotStartedJobRun, - RunningJobRun, SubProcessBackend, -}; +use crate::job_run::SubProcessBackend; use crate::util::DatabuildError; use crate::{JobRunBufferEventV1, PartitionRef, WantDetail}; use std::collections::HashMap; @@ -22,24 +19,16 @@ JTBDs: */ struct Orchestrator { pub bel: BuildEventLog, - pub not_started_jobs: Vec>, - pub running_jobs: Vec>, - pub completed_jobs: Vec>, - pub failed_jobs: Vec>, - pub dep_miss_jobs: Vec>, pub config: OrchestratorConfig, + pub job_runs: Vec>, } impl Default for Orchestrator { fn default() -> Self { Self { bel: Default::default(), - not_started_jobs: Default::default(), - running_jobs: Default::default(), - completed_jobs: Default::default(), - failed_jobs: Default::default(), - dep_miss_jobs: Default::default(), config: Default::default(), + job_runs: Default::default(), } } } @@ -48,12 +37,8 @@ impl Orchestrator { fn copy(&self) -> Self { Self { bel: self.bel.clone(), - not_started_jobs: Default::default(), - running_jobs: Default::default(), - completed_jobs: Default::default(), - failed_jobs: Default::default(), - dep_miss_jobs: Default::default(), config: self.config.clone(), + job_runs: Default::default(), } } } @@ -112,11 +97,6 @@ struct WantGroup { wants: Vec, } -impl WantGroup { - pub fn spawn(&self) -> Result, std::io::Error> { - self.job.spawn(self.wants.clone()) - } -} #[derive(Debug, Clone)] struct GroupedWants { @@ -140,63 +120,68 @@ impl Orchestrator { fn new(storage: S, config: OrchestratorConfig) -> Self { Self { bel: BuildEventLog::new(storage, Default::default()), - not_started_jobs: Vec::new(), - running_jobs: Vec::new(), - completed_jobs: Vec::new(), - failed_jobs: Vec::new(), - dep_miss_jobs: Vec::new(), config, + job_runs: Vec::new(), } } fn schedule_queued_jobs(&mut self) -> Result<(), DatabuildError> { - // TODO need to incorporate concurrency limit/pools here, probably? - while let Some(job) = self.not_started_jobs.pop() { - self.running_jobs.push(job.run()?); - } + use crate::job_run::JobRun; + let mut new_jobs = Vec::new(); + for job in self.job_runs.drain(..) { + let transitioned = match job { + JobRun::NotStarted(not_started) => JobRun::Running(not_started.run(None)?), + other => other, // Pass through all other states unchanged + }; + new_jobs.push(transitioned); + } + self.job_runs = new_jobs; Ok(()) } /// Visits individual job runs, appending resulting events, and moving runs between run status /// containers. Either jobs are still running, or they are moved to terminal states. fn poll_job_runs(&mut self) -> Result<(), DatabuildError> { + use crate::job_run::{JobRun, VisitResult}; + self.schedule_queued_jobs()?; - // Visit running jobs and transition them to terminal states - let mut still_running = Vec::new(); - // TODO make sure that failure in the middle can't mess up build state - likely need to - // refactor here (e.g. turn state changes into data, commit them after all have been - // calculated and validated) - for mut job in self.running_jobs.drain(..) { - match job.visit()? { - JobRunVisitResult::StillRunning => { - println!("Still running job: {:?}", job.id()); - still_running.push(job); + // Visit all running jobs using type-safe transitions + let mut new_jobs = Vec::new(); + for job in self.job_runs.drain(..) { + let transitioned = match job { + JobRun::Running(running) => { + match running.visit()? { + VisitResult::StillRunning(still_running) => { + println!("Still running job: {:?}", still_running.job_run_id); + JobRun::Running(still_running) + } + VisitResult::Completed(completed) => { + println!("Completed job: {:?}", completed.job_run_id); + let event = completed.state.to_event(&completed.job_run_id); + self.bel.append_event(&event)?; + JobRun::Completed(completed) + } + VisitResult::Failed(failed) => { + println!("Failed job: {:?}", failed.job_run_id); + let event = failed.state.to_event(&failed.job_run_id); + self.bel.append_event(&event)?; + JobRun::Failed(failed) + } + VisitResult::DepMiss(dep_miss) => { + println!("Dep miss job: {:?}", dep_miss.job_run_id); + let event = dep_miss.state.to_event(&dep_miss.job_run_id); + self.bel.append_event(&event)?; + JobRun::DepMiss(dep_miss) + } + } } - JobRunVisitResult::Completed(completed) => { - // Emit success event - println!("Completed job: {:?}", completed.id()); - self.bel - .append_event(&completed.state.to_event(&completed.id()))?; - self.completed_jobs.push(completed); - } - JobRunVisitResult::Failed(failed) => { - // Emit failure event - println!("Failed job: {:?}", failed.id()); - let event: Event = failed.state.to_event(&failed.id()); - self.bel.append_event(&event)?; - self.failed_jobs.push(failed); - } - JobRunVisitResult::DepMiss(dep_miss) => { - println!("Dep miss job: {:?}", dep_miss.job_run_id); - let event = dep_miss.state.to_event(&dep_miss.id()); - self.bel.append_event(&event)?; - self.dep_miss_jobs.push(dep_miss); - } - } + other => other, // Pass through all non-running states unchanged + }; + new_jobs.push(transitioned); } - self.running_jobs = still_running; + self.job_runs = new_jobs; Ok(()) } @@ -243,12 +228,17 @@ impl Orchestrator { } fn queue_job(&mut self, wg: WantGroup) -> Result<(), DatabuildError> { - // Spawn job run (not started, but need only be `.run`'d) - let job_run = wg.spawn()?; + use crate::job_run::JobRun; + + // Compute args from wants the same way JobConfiguration::spawn() does + let wanted_refs: Vec = + wg.wants.iter().flat_map(|want| want.partitions.clone()).collect(); + let args: Vec = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect(); + let job_run = JobRun::spawn(wg.job.entry_point.clone(), args); // Create job run buffer event let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 { - job_run_id: job_run.job_run_id.into(), + job_run_id: job_run.job_run_id().to_string(), job_label: wg.job.label, building_partitions: wg .wants @@ -259,7 +249,7 @@ impl Orchestrator { want_attributed_partitions: wg.wants.iter().map(|w| w.into()).collect(), }); self.bel.append_event(&job_buffer_event)?; - self.not_started_jobs.push(job_run); + self.job_runs.push(job_run); Ok(()) } @@ -270,6 +260,36 @@ impl Orchestrator { Ok(()) } + // Helper methods for tests to count jobs by state + #[cfg(test)] + fn count_running_jobs(&self) -> usize { + use crate::job_run::JobRun; + self.job_runs.iter().filter(|j| matches!(j, JobRun::Running(_))).count() + } + + #[cfg(test)] + fn count_terminal_jobs(&self) -> usize { + self.job_runs.iter().filter(|j| j.is_terminal()).count() + } + + #[cfg(test)] + fn count_not_started_jobs(&self) -> usize { + use crate::job_run::JobRun; + self.job_runs.iter().filter(|j| matches!(j, JobRun::NotStarted(_))).count() + } + + #[cfg(test)] + fn count_dep_miss_jobs(&self) -> usize { + use crate::job_run::JobRun; + self.job_runs.iter().filter(|j| matches!(j, JobRun::DepMiss(_))).count() + } + + #[cfg(test)] + fn count_completed_jobs(&self) -> usize { + use crate::job_run::JobRun; + self.job_runs.iter().filter(|j| matches!(j, JobRun::Completed(_))).count() + } + /** Entrypoint for running jobs */ pub fn join(&mut self) -> Result<(), DatabuildError> { loop { @@ -376,14 +396,14 @@ mod tests { fn test_empty_wants_noop() { let mut orchestrator = build_orchestrator(); // Should init with no work to do - assert!(orchestrator.not_started_jobs.is_empty()); - assert!(orchestrator.running_jobs.is_empty()); + assert_eq!(orchestrator.count_not_started_jobs(), 0); + assert_eq!(orchestrator.count_running_jobs(), 0); orchestrator .poll_wants() .expect("shouldn't fail to poll empty wants"); // Should still be empty since no work to do - assert!(orchestrator.not_started_jobs.is_empty()); - assert!(orchestrator.running_jobs.is_empty()); + assert_eq!(orchestrator.count_not_started_jobs(), 0); + assert_eq!(orchestrator.count_running_jobs(), 0); } // Use case: Some schedulable wants with jobs that can be matched should launch those jobs @@ -400,7 +420,7 @@ mod tests { for e in events { orchestrator.bel.append_event(&e).expect("append"); } - assert_eq!(orchestrator.not_started_jobs.len(), 0); + assert_eq!(orchestrator.count_not_started_jobs(), 0); assert_eq!(orchestrator.bel.state.count_job_runs(), 0); // When @@ -410,19 +430,13 @@ mod tests { .expect("shouldn't fail to poll wants"); // Should schedule alpha job - assert_eq!(orchestrator.not_started_jobs.len(), 1); - assert_eq!( - orchestrator - .not_started_jobs - .iter() - .take(1) - .last() - .unwrap() - .state - .args, - vec!["data/alpha"], - "should have scheduled alpha job" - ); + assert_eq!(orchestrator.count_not_started_jobs(), 1); + // Verify the job has the right args by checking the first NotStarted job + use crate::job_run::JobRun; + let not_started_job = orchestrator.job_runs.iter().find(|j| matches!(j, JobRun::NotStarted(_))).unwrap(); + if let JobRun::NotStarted(job) = not_started_job { + assert_eq!(job.state.args, vec!["data/alpha"], "should have scheduled alpha job"); + } assert_eq!(orchestrator.bel.state.count_job_runs(), 1); } @@ -442,7 +456,7 @@ mod tests { .expect("shouldn't fail to poll wants"); // Should not have scheduled any jobs - assert_eq!(orchestrator.not_started_jobs.len(), 0); + assert_eq!(orchestrator.count_not_started_jobs(), 0); } } @@ -565,32 +579,29 @@ mod tests { orchestrator .poll_wants() .expect("stage unscheduled jobs based on wants failed"); - assert_eq!(orchestrator.not_started_jobs.len(), 1); - // poll job runs should start job run - orchestrator.poll_job_runs().expect("should start run"); - assert_eq!(orchestrator.running_jobs.len(), 1); + assert_eq!(orchestrator.count_not_started_jobs(), 1); + // step should start job run + orchestrator.step().expect("should start run"); + assert_eq!(orchestrator.count_running_jobs(), 1); assert_eq!(orchestrator.bel.state.count_job_runs(), 1); thread::sleep(Duration::from_millis(1)); // Should still be running after 1ms orchestrator - .poll_job_runs() + .step() .expect("should still be running"); - assert_eq!(orchestrator.running_jobs.len(), 1); + assert_eq!(orchestrator.count_running_jobs(), 1); assert_eq!(orchestrator.bel.state.count_job_runs(), 1); println!("STATE: {:?}", orchestrator.bel.state); // Wait for it to complete thread::sleep(Duration::from_millis(10)); orchestrator - .poll_job_runs() + .step() .expect("should be able to poll existing job run"); // Job run should have succeeded - assert!(orchestrator.not_started_jobs.is_empty()); - assert!(orchestrator.failed_jobs.is_empty()); - assert!(orchestrator.dep_miss_jobs.is_empty()); - assert!(orchestrator.running_jobs.is_empty()); - assert_eq!(orchestrator.completed_jobs.len(), 1); + assert_eq!(orchestrator.count_not_started_jobs(), 0); + assert_eq!(orchestrator.count_completed_jobs(), 1); // Build state should show partition as live assert_eq!( @@ -615,7 +626,7 @@ mod tests { for _i in 0..max_steps { thread::sleep(Duration::from_millis(50)); - if orchestrator.running_jobs.is_empty() { + if orchestrator.count_running_jobs() == 0 { return Ok(()); } orchestrator @@ -702,7 +713,7 @@ echo 'Beta succeeded' // Step 1: Should schedule beta job (want -> not_started_jobs) orchestrator.step().expect("step 1"); assert_eq!( - orchestrator.not_started_jobs.len(), + orchestrator.count_not_started_jobs(), 1, "beta job should be queued" ); @@ -710,7 +721,7 @@ echo 'Beta succeeded' // Step 2: Should start beta job (not_started_jobs -> running_jobs) orchestrator.step().expect("step 2"); assert_eq!( - orchestrator.running_jobs.len(), + orchestrator.count_running_jobs(), 1, "beta job should be running" ); @@ -719,7 +730,7 @@ echo 'Beta succeeded' wait_for_jobs_to_complete(&mut orchestrator, 10).expect("beta job should complete"); // (Beta should now be in dep_miss state, and a want for alpha should be created) assert_eq!( - orchestrator.dep_miss_jobs.len(), + orchestrator.count_dep_miss_jobs(), 1, "beta should have dep miss" ); @@ -728,7 +739,7 @@ echo 'Beta succeeded' // (dep miss handler created the alpha want, which will be picked up by poll_wants) orchestrator.step().expect("step 4"); assert_eq!( - orchestrator.running_jobs.len(), + orchestrator.count_running_jobs(), 1, "alpha job should be running" ); @@ -736,7 +747,7 @@ echo 'Beta succeeded' // Step 6: Alpha completes successfully wait_for_jobs_to_complete(&mut orchestrator, 10).expect("alpha job should complete"); assert_eq!( - orchestrator.completed_jobs.len(), + orchestrator.count_completed_jobs(), 1, "alpha should complete" ); @@ -753,23 +764,22 @@ echo 'Beta succeeded' // Step 7: Beta is rescheduled and started (want -> running_jobs) orchestrator.step().expect("step 7"); - assert_eq!(orchestrator.running_jobs.len(), 1, "beta should be running"); + assert_eq!(orchestrator.count_running_jobs(), 1, "beta should be running"); // Step 8: Beta completes successfully wait_for_jobs_to_complete(&mut orchestrator, 10).expect("beta job should complete"); // Then: Verify both partitions are live and both jobs completed assert_eq!( - orchestrator.completed_jobs.len(), + orchestrator.count_completed_jobs(), 2, "both jobs should complete" ); assert_eq!( - orchestrator.dep_miss_jobs.len(), + orchestrator.count_dep_miss_jobs(), 1, "should have one dep miss" ); - assert!(orchestrator.failed_jobs.is_empty(), "no jobs should fail"); assert_eq!( orchestrator