minor refactor on poll_jobs

This commit is contained in:
Stuart Axelbrooke 2025-11-19 17:18:18 -08:00
parent 9bdd435089
commit 66ba40e2db

View file

@ -2,10 +2,7 @@ use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage};
use crate::build_state::BuildState;
use crate::data_build_event::Event;
use crate::job::JobConfiguration;
use crate::job_run::{
CompletedJobRun, DepMissJobRun, FailedJobRun, NotStartedJobRun, RunningJobRun,
SubProcessBackend,
};
use crate::job_run::{CompletedJobRun, DepMissJobRun, FailedJobRun, JobRunVisitResult, NotStartedJobRun, RunningJobRun, SubProcessBackend};
use crate::util::DatabuildError;
use crate::{JobRunBufferEventV1, PartitionRef, WantDetail};
use std::collections::HashMap;
@ -151,27 +148,19 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
}
}
fn job_runs_count(&self) -> usize {
self.not_started_jobs.len()
+ self.running_jobs.len()
+ self.completed_jobs.len()
+ self.failed_jobs.len()
+ self.dep_miss_jobs.len()
fn schedule_queued_jobs(&mut self) -> Result<(), DatabuildError> {
// TODO need to incorporate concurrency limit/pools here, probably?
while let Some(job) = self.not_started_jobs.pop() {
self.running_jobs.push(job.run()?);
}
Ok(())
}
/// Visits individual job runs, appending resulting events, and moving runs between run status
/// containers.
/// containers. Either jobs are still running, or they are moved to terminal states.
fn poll_job_runs(&mut self) -> Result<(), DatabuildError> {
use crate::job_run::JobRunVisitResult;
// Coherence check setup
let total_runs_count = self.job_runs_count();
// First, start any not-started jobs
while let Some(job) = self.not_started_jobs.pop() {
let running = job.run()?;
self.running_jobs.push(running);
}
self.schedule_queued_jobs()?;
// Visit running jobs and transition them to terminal states
let mut still_running = Vec::new();
@ -187,11 +176,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
JobRunVisitResult::Completed(completed) => {
// Emit success event
println!("Completed job: {:?}", completed.id());
let result = run_complete_to_events(&self.bel.state, &completed)?;
for event in result.events {
self.bel.append_event(&event)?;
}
// Move job to completed
self.bel.append_event(&completed.state.to_event(&completed.id()))?;
self.completed_jobs.push(completed);
}
JobRunVisitResult::Failed(failed) => {
@ -206,20 +191,12 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
for event in self.bel.state.dep_miss_to_events(&dep_miss)? {
self.bel.append_event(&event)?;
}
// Record missing upstream status in want details
self.dep_miss_jobs.push(dep_miss);
}
}
}
self.running_jobs = still_running;
// Panic because this should never happen
assert_eq!(
self.job_runs_count(),
total_runs_count,
"Detected job run count change during job run visit (should never happen)"
);
Ok(())
}
@ -300,36 +277,6 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
}
}
#[derive(Default, Clone, Debug)]
pub struct JobRunCompleteResult {
/// Events to append to the BEL from this job completing
pub events: Vec<Event>,
}
/// Handle successful run completion:
/// - Adding run success event
/// - Updating status for partitions actually built by the job
fn run_complete_to_events(
bel_state: &BuildState,
completed: &CompletedJobRun<SubProcessBackend>,
) -> Result<JobRunCompleteResult, DatabuildError> {
let mut events = vec![
// Event marking completion of job
completed.state.to_event(&completed.id()),
];
// let job_detail = bel_state
// .get_job_run(&completed.job_run_id.to_string())
// .ok_or(format!(
// "No job run found for id `{}`",
// completed.job_run_id
// ))?;
Ok(JobRunCompleteResult {
// built_partitions: job_detail.building_partitions,
events,
})
}
#[cfg(test)]
mod tests {
use crate::WantCreateEventV1;