diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index 3207080..22d3e9e 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -2,10 +2,7 @@ use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage}; use crate::build_state::BuildState; use crate::data_build_event::Event; use crate::job::JobConfiguration; -use crate::job_run::{ - CompletedJobRun, DepMissJobRun, FailedJobRun, NotStartedJobRun, RunningJobRun, - SubProcessBackend, -}; +use crate::job_run::{CompletedJobRun, DepMissJobRun, FailedJobRun, JobRunVisitResult, NotStartedJobRun, RunningJobRun, SubProcessBackend}; use crate::util::DatabuildError; use crate::{JobRunBufferEventV1, PartitionRef, WantDetail}; use std::collections::HashMap; @@ -151,27 +148,19 @@ impl Orchestrator { } } - fn job_runs_count(&self) -> usize { - self.not_started_jobs.len() - + self.running_jobs.len() - + self.completed_jobs.len() - + self.failed_jobs.len() - + self.dep_miss_jobs.len() + fn schedule_queued_jobs(&mut self) -> Result<(), DatabuildError> { + // TODO need to incorporate concurrency limit/pools here, probably? + while let Some(job) = self.not_started_jobs.pop() { + self.running_jobs.push(job.run()?); + } + + Ok(()) } /// Visits individual job runs, appending resulting events, and moving runs between run status - /// containers. + /// containers. Either jobs are still running, or they are moved to terminal states. fn poll_job_runs(&mut self) -> Result<(), DatabuildError> { - use crate::job_run::JobRunVisitResult; - - // Coherence check setup - let total_runs_count = self.job_runs_count(); - - // First, start any not-started jobs - while let Some(job) = self.not_started_jobs.pop() { - let running = job.run()?; - self.running_jobs.push(running); - } + self.schedule_queued_jobs()?; // Visit running jobs and transition them to terminal states let mut still_running = Vec::new(); @@ -187,11 +176,7 @@ impl Orchestrator { JobRunVisitResult::Completed(completed) => { // Emit success event println!("Completed job: {:?}", completed.id()); - let result = run_complete_to_events(&self.bel.state, &completed)?; - for event in result.events { - self.bel.append_event(&event)?; - } - // Move job to completed + self.bel.append_event(&completed.state.to_event(&completed.id()))?; self.completed_jobs.push(completed); } JobRunVisitResult::Failed(failed) => { @@ -206,20 +191,12 @@ impl Orchestrator { for event in self.bel.state.dep_miss_to_events(&dep_miss)? { self.bel.append_event(&event)?; } - // Record missing upstream status in want details self.dep_miss_jobs.push(dep_miss); } } } self.running_jobs = still_running; - // Panic because this should never happen - assert_eq!( - self.job_runs_count(), - total_runs_count, - "Detected job run count change during job run visit (should never happen)" - ); - Ok(()) } @@ -300,36 +277,6 @@ impl Orchestrator { } } -#[derive(Default, Clone, Debug)] -pub struct JobRunCompleteResult { - /// Events to append to the BEL from this job completing - pub events: Vec, -} - -/// Handle successful run completion: -/// - Adding run success event -/// - Updating status for partitions actually built by the job -fn run_complete_to_events( - bel_state: &BuildState, - completed: &CompletedJobRun, -) -> Result { - let mut events = vec![ - // Event marking completion of job - completed.state.to_event(&completed.id()), - ]; - // let job_detail = bel_state - // .get_job_run(&completed.job_run_id.to_string()) - // .ok_or(format!( - // "No job run found for id `{}`", - // completed.job_run_id - // ))?; - - Ok(JobRunCompleteResult { - // built_partitions: job_detail.building_partitions, - events, - }) -} - #[cfg(test)] mod tests { use crate::WantCreateEventV1;