refactor to replace results with panics where they represent system inconsistency errors
Some checks are pending
/ setup (push) Waiting to run

This commit is contained in:
Stuart Axelbrooke 2025-11-22 16:26:28 +08:00
parent 01d50dde1b
commit 8e8ff33ef8
2 changed files with 88 additions and 92 deletions

View file

@ -164,7 +164,7 @@ impl<S: BELStorage + Debug> BuildEventLog<S> {
} }
pub fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> { pub fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> {
let events = self.state.handle_event(&event)?; let events = self.state.handle_event(&event);
let idx = self.storage.append_event(event)?; let idx = self.storage.append_event(event)?;
// Recursion here might be dangerous, but in theory the event propagation always terminates // Recursion here might be dangerous, but in theory the event propagation always terminates
for event in events { for event in events {

View file

@ -1,7 +1,7 @@
use crate::data_build_event::Event; use crate::data_build_event::Event;
use crate::data_deps::{WantTimestamps, missing_deps_to_want_events}; use crate::data_deps::{WantTimestamps, missing_deps_to_want_events};
use crate::partition_state::Partition; use crate::partition_state::Partition;
use crate::util::{DatabuildError, current_timestamp}; use crate::util::current_timestamp;
use crate::want_state::{IdleState as WantIdleState, Want, WantWithState}; use crate::want_state::{IdleState as WantIdleState, Want, WantWithState};
use crate::{ use crate::{
JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1, JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1,
@ -550,7 +550,7 @@ impl BuildState {
/// Handles reacting to events, updating state, and erroring if its an invalid state transition /// Handles reacting to events, updating state, and erroring if its an invalid state transition
/// Event handlers can return vecs of events that will then be appended to the BEL /// Event handlers can return vecs of events that will then be appended to the BEL
pub fn handle_event(&mut self, event: &Event) -> Result<Vec<Event>, DatabuildError> { pub fn handle_event(&mut self, event: &Event) -> Vec<Event> {
match event { match event {
// JobRun events // JobRun events
Event::JobRunBufferV1(e) => self.handle_job_run_buffer(e), Event::JobRunBufferV1(e) => self.handle_job_run_buffer(e),
@ -573,7 +573,7 @@ impl BuildState {
fn handle_want_create( fn handle_want_create(
&mut self, &mut self,
event: &WantCreateEventV1, event: &WantCreateEventV1,
) -> Result<Vec<Event>, DatabuildError> { ) -> Vec<Event> {
// Use From impl to create want in Idle state // Use From impl to create want in Idle state
let want_idle: WantWithState<WantIdleState> = event.clone().into(); let want_idle: WantWithState<WantIdleState> = event.clone().into();
self.wants self.wants
@ -584,13 +584,13 @@ impl BuildState {
self.add_want_to_partition(pref, &event.want_id); self.add_want_to_partition(pref, &event.want_id);
} }
Ok(vec![]) vec![]
} }
fn handle_want_cancel( fn handle_want_cancel(
&mut self, &mut self,
event: &WantCancelEventV1, event: &WantCancelEventV1,
) -> Result<Vec<Event>, DatabuildError> { ) -> Vec<Event> {
// TODO actually cancel in-progress job runs that no longer have a sponsoring want // TODO actually cancel in-progress job runs that no longer have a sponsoring want
// Type-safe transition (API layer should prevent canceling terminal wants) // Type-safe transition (API layer should prevent canceling terminal wants)
@ -623,91 +623,93 @@ impl BuildState {
}; };
self.wants.insert(event.want_id.clone(), canceled); self.wants.insert(event.want_id.clone(), canceled);
Ok(vec![]) vec![]
} }
fn handle_job_run_buffer( fn handle_job_run_buffer(
&mut self, &mut self,
event: &JobRunBufferEventV1, event: &JobRunBufferEventV1,
) -> Result<Vec<Event>, DatabuildError> { ) -> Vec<Event> {
// No job run should exist // No job run should exist - if it does, that's a BUG in the orchestrator
if self.job_runs.get(&event.job_run_id).is_some() { if self.job_runs.get(&event.job_run_id).is_some() {
Err(format!("Job run ID collision on job run ID {}", event.job_run_id).into()) panic!(
} else { "BUG: Job run ID collision on job run ID {}. Orchestrator should generate unique IDs.",
// Create job run to be inserted event.job_run_id
let job_run: JobRunDetail = event.clone().into(); );
// Transition wants to Building
// Valid states when job buffer event arrives:
// - Idle: First job starting for this want (normal case)
// - Building: Another job already started for this want (multiple jobs can service same want)
// Invalid states (panic - indicates orchestrator bug):
// - UpstreamBuilding: Not schedulable, waiting for dependencies
// - Successful/Failed/UpstreamFailed/Canceled: Terminal states, not schedulable
for wap in &job_run.servicing_wants {
let want = self.wants.remove(&wap.want_id).expect(&format!(
"BUG: Want {} must exist when job buffer event received",
wap.want_id
));
let transitioned = match want {
Want::Idle(idle) => {
// First job starting for this want
Want::Building(idle.start_building(current_timestamp()))
}
Want::Building(building) => {
// Another job already started, stay in Building (no-op)
Want::Building(building)
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} for job buffer. Only Idle or Building wants should be scheduled.",
wap.want_id, want
);
}
};
self.wants.insert(wap.want_id.clone(), transitioned);
}
// Transition partitions to Building state
self.transition_partitions_to_building(&job_run.building_partitions, &event.job_run_id);
self.job_runs
.insert(event.job_run_id.clone(), job_run.clone());
println!("Inserted job run: {:?}", job_run);
Ok(vec![])
} }
// Create job run to be inserted
let job_run: JobRunDetail = event.clone().into();
// Transition wants to Building
// Valid states when job buffer event arrives:
// - Idle: First job starting for this want (normal case)
// - Building: Another job already started for this want (multiple jobs can service same want)
// Invalid states (panic - indicates orchestrator bug):
// - UpstreamBuilding: Not schedulable, waiting for dependencies
// - Successful/Failed/UpstreamFailed/Canceled: Terminal states, not schedulable
for wap in &job_run.servicing_wants {
let want = self.wants.remove(&wap.want_id).expect(&format!(
"BUG: Want {} must exist when job buffer event received",
wap.want_id
));
let transitioned = match want {
Want::Idle(idle) => {
// First job starting for this want
Want::Building(idle.start_building(current_timestamp()))
}
Want::Building(building) => {
// Another job already started, stay in Building (no-op)
Want::Building(building)
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} for job buffer. Only Idle or Building wants should be scheduled.",
wap.want_id, want
);
}
};
self.wants.insert(wap.want_id.clone(), transitioned);
}
// Transition partitions to Building state
self.transition_partitions_to_building(&job_run.building_partitions, &event.job_run_id);
self.job_runs
.insert(event.job_run_id.clone(), job_run.clone());
println!("Inserted job run: {:?}", job_run);
vec![]
} }
fn update_job_run_status( fn update_job_run_status(
&mut self, &mut self,
job_run_id: &str, job_run_id: &str,
status: JobRunStatusCode, status: JobRunStatusCode,
) -> Result<(), DatabuildError> { ) {
if let Some(job_run) = self.job_runs.get_mut(job_run_id) { let job_run = self.job_runs.get_mut(job_run_id).expect(&format!(
job_run.last_heartbeat_at = Some(current_timestamp()); "BUG: Job run ID {} must exist to update status",
job_run.status = Some(status.into()); job_run_id
Ok(()) ));
} else { job_run.last_heartbeat_at = Some(current_timestamp());
Err(format!("Job run ID {} not found", job_run_id).into()) job_run.status = Some(status.into());
}
} }
fn handle_job_run_heartbeat( fn handle_job_run_heartbeat(
&mut self, &mut self,
event: &JobRunHeartbeatEventV1, event: &JobRunHeartbeatEventV1,
) -> Result<Vec<Event>, DatabuildError> { ) -> Vec<Event> {
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunRunning)?; self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunRunning);
Ok(vec![]) vec![]
} }
fn handle_job_run_success( fn handle_job_run_success(
&mut self, &mut self,
event: &JobRunSuccessEventV1, event: &JobRunSuccessEventV1,
) -> Result<Vec<Event>, DatabuildError> { ) -> Vec<Event> {
println!("Job run success event: {:?}", event); println!("Job run success event: {:?}", event);
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunSucceeded)?; self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunSucceeded);
let job_run = self.get_job_run(&event.job_run_id).unwrap(); let job_run = self.get_job_run(&event.job_run_id).unwrap();
// Clone building_partitions before we use it multiple times // Clone building_partitions before we use it multiple times
@ -722,14 +724,14 @@ impl BuildState {
// UpstreamBuilding → Idle/Building (for downstream wants waiting on newly successful wants) // UpstreamBuilding → Idle/Building (for downstream wants waiting on newly successful wants)
self.unblock_downstream_wants(&newly_successful_wants, &event.job_run_id, current_timestamp()); self.unblock_downstream_wants(&newly_successful_wants, &event.job_run_id, current_timestamp());
Ok(vec![]) vec![]
} }
fn handle_job_run_failure( fn handle_job_run_failure(
&mut self, &mut self,
event: &JobRunFailureEventV1, event: &JobRunFailureEventV1,
) -> Result<Vec<Event>, DatabuildError> { ) -> Vec<Event> {
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed)?; self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed);
let job_run = self.get_job_run(&event.job_run_id).unwrap(); let job_run = self.get_job_run(&event.job_run_id).unwrap();
// Clone building_partitions before we use it multiple times // Clone building_partitions before we use it multiple times
@ -744,31 +746,31 @@ impl BuildState {
// UpstreamBuilding → UpstreamFailed (for downstream wants waiting on newly failed wants) // UpstreamBuilding → UpstreamFailed (for downstream wants waiting on newly failed wants)
self.cascade_failures_to_downstream_wants(&newly_failed_wants, current_timestamp()); self.cascade_failures_to_downstream_wants(&newly_failed_wants, current_timestamp());
Ok(vec![]) vec![]
} }
fn handle_job_run_cancel( fn handle_job_run_cancel(
&mut self, &mut self,
event: &JobRunCancelEventV1, _event: &JobRunCancelEventV1,
) -> Result<Vec<Event>, DatabuildError> { ) -> Vec<Event> {
todo!("should update already inserted job run, partition status, want status") todo!("should update already inserted job run, partition status, want status")
} }
pub fn handle_job_run_dep_miss( pub fn handle_job_run_dep_miss(
&mut self, &mut self,
event: &JobRunMissingDepsEventV1, event: &JobRunMissingDepsEventV1,
) -> Result<Vec<Event>, DatabuildError> { ) -> Vec<Event> {
let job_run_detail = self.get_job_run(&event.job_run_id).ok_or(format!( let job_run_detail = self.get_job_run(&event.job_run_id).expect(&format!(
"Unable to find job run with id `{}`", "BUG: Unable to find job run with id `{}`",
event.job_run_id event.job_run_id
))?; ));
// Infer data/SLA timestamps from upstream want // Infer data/SLA timestamps from upstream want
let want_timestamps: WantTimestamps = job_run_detail let want_timestamps: WantTimestamps = job_run_detail
.servicing_wants .servicing_wants
.iter() .iter()
.flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into())) .flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into()))
.reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b)) .reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b))
.ok_or(format!("No servicing wants found"))?; .expect("BUG: No servicing wants found");
// Transition partitions back to Missing since this job can't build them yet // Transition partitions back to Missing since this job can't build them yet
self.reset_partitions_to_missing(&job_run_detail.building_partitions); self.reset_partitions_to_missing(&job_run_detail.building_partitions);
@ -796,20 +798,20 @@ impl BuildState {
&partition_to_want_map, &partition_to_want_map,
); );
Ok(want_events) want_events
} }
fn handle_taint_create( fn handle_taint_create(
&mut self, &mut self,
event: &TaintCreateEventV1, _event: &TaintCreateEventV1,
) -> Result<Vec<Event>, DatabuildError> { ) -> Vec<Event> {
todo!("...?") todo!("...?")
} }
fn handle_taint_delete( fn handle_taint_delete(
&mut self, &mut self,
event: &TaintCancelEventV1, _event: &TaintCancelEventV1,
) -> Result<Vec<Event>, DatabuildError> { ) -> Vec<Event> {
todo!("...?") todo!("...?")
} }
@ -1077,9 +1079,7 @@ mod tests {
e.partitions = vec!["mypart".into()]; e.partitions = vec!["mypart".into()];
let mut state = BuildState::default(); let mut state = BuildState::default();
state state.handle_event(&e.clone().into());
.handle_event(&e.clone().into())
.expect("want create failed");
let want = state.get_want("1234").unwrap(); let want = state.get_want("1234").unwrap();
let mut expected: WantDetail = e.into(); let mut expected: WantDetail = e.into();
// Into will set this field as current timestamp // Into will set this field as current timestamp
@ -1094,16 +1094,12 @@ mod tests {
e.partitions = vec!["mypart".into()]; e.partitions = vec!["mypart".into()];
let mut state = BuildState::default(); let mut state = BuildState::default();
state state.handle_event(&e.clone().into());
.handle_event(&e.clone().into())
.expect("want create failed");
// Should be able to cancel // Should be able to cancel
let mut e = WantCancelEventV1::default(); let mut e = WantCancelEventV1::default();
e.want_id = "1234".to_string(); e.want_id = "1234".to_string();
state state.handle_event(&e.clone().into());
.handle_event(&e.clone().into())
.expect("want cancel failed");
let want = state.get_want("1234").unwrap(); let want = state.get_want("1234").unwrap();
assert_eq!( assert_eq!(