From 8e8ff33ef87b2b6330326a7ce561f6a6f5510ccb Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Sat, 22 Nov 2025 16:26:28 +0800 Subject: [PATCH] refactor to replace results with panics where they represent system inconsistency errors --- databuild/build_event_log.rs | 2 +- databuild/build_state.rs | 178 +++++++++++++++++------------------ 2 files changed, 88 insertions(+), 92 deletions(-) diff --git a/databuild/build_event_log.rs b/databuild/build_event_log.rs index b3edb80..b46384a 100644 --- a/databuild/build_event_log.rs +++ b/databuild/build_event_log.rs @@ -164,7 +164,7 @@ impl BuildEventLog { } pub fn append_event(&mut self, event: &Event) -> Result { - let events = self.state.handle_event(&event)?; + let events = self.state.handle_event(&event); let idx = self.storage.append_event(event)?; // Recursion here might be dangerous, but in theory the event propagation always terminates for event in events { diff --git a/databuild/build_state.rs b/databuild/build_state.rs index fff0444..3f0c6b0 100644 --- a/databuild/build_state.rs +++ b/databuild/build_state.rs @@ -1,7 +1,7 @@ use crate::data_build_event::Event; use crate::data_deps::{WantTimestamps, missing_deps_to_want_events}; use crate::partition_state::Partition; -use crate::util::{DatabuildError, current_timestamp}; +use crate::util::current_timestamp; use crate::want_state::{IdleState as WantIdleState, Want, WantWithState}; use crate::{ JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1, @@ -550,7 +550,7 @@ impl BuildState { /// Handles reacting to events, updating state, and erroring if its an invalid state transition /// Event handlers can return vecs of events that will then be appended to the BEL - pub fn handle_event(&mut self, event: &Event) -> Result, DatabuildError> { + pub fn handle_event(&mut self, event: &Event) -> Vec { match event { // JobRun events Event::JobRunBufferV1(e) => self.handle_job_run_buffer(e), @@ -573,7 +573,7 @@ impl BuildState { fn handle_want_create( &mut self, event: &WantCreateEventV1, - ) -> Result, DatabuildError> { + ) -> Vec { // Use From impl to create want in Idle state let want_idle: WantWithState = event.clone().into(); self.wants @@ -584,13 +584,13 @@ impl BuildState { self.add_want_to_partition(pref, &event.want_id); } - Ok(vec![]) + vec![] } fn handle_want_cancel( &mut self, event: &WantCancelEventV1, - ) -> Result, DatabuildError> { + ) -> Vec { // TODO actually cancel in-progress job runs that no longer have a sponsoring want // Type-safe transition (API layer should prevent canceling terminal wants) @@ -623,91 +623,93 @@ impl BuildState { }; self.wants.insert(event.want_id.clone(), canceled); - Ok(vec![]) + vec![] } fn handle_job_run_buffer( &mut self, event: &JobRunBufferEventV1, - ) -> Result, DatabuildError> { - // No job run should exist + ) -> Vec { + // No job run should exist - if it does, that's a BUG in the orchestrator if self.job_runs.get(&event.job_run_id).is_some() { - Err(format!("Job run ID collision on job run ID {}", event.job_run_id).into()) - } else { - // Create job run to be inserted - let job_run: JobRunDetail = event.clone().into(); - - // Transition wants to Building - // Valid states when job buffer event arrives: - // - Idle: First job starting for this want (normal case) - // - Building: Another job already started for this want (multiple jobs can service same want) - // Invalid states (panic - indicates orchestrator bug): - // - UpstreamBuilding: Not schedulable, waiting for dependencies - // - Successful/Failed/UpstreamFailed/Canceled: Terminal states, not schedulable - for wap in &job_run.servicing_wants { - let want = self.wants.remove(&wap.want_id).expect(&format!( - "BUG: Want {} must exist when job buffer event received", - wap.want_id - )); - - let transitioned = match want { - Want::Idle(idle) => { - // First job starting for this want - Want::Building(idle.start_building(current_timestamp())) - } - Want::Building(building) => { - // Another job already started, stay in Building (no-op) - Want::Building(building) - } - _ => { - panic!( - "BUG: Want {} in invalid state {:?} for job buffer. Only Idle or Building wants should be scheduled.", - wap.want_id, want - ); - } - }; - - self.wants.insert(wap.want_id.clone(), transitioned); - } - - // Transition partitions to Building state - self.transition_partitions_to_building(&job_run.building_partitions, &event.job_run_id); - - self.job_runs - .insert(event.job_run_id.clone(), job_run.clone()); - println!("Inserted job run: {:?}", job_run); - Ok(vec![]) + panic!( + "BUG: Job run ID collision on job run ID {}. Orchestrator should generate unique IDs.", + event.job_run_id + ); } + + // Create job run to be inserted + let job_run: JobRunDetail = event.clone().into(); + + // Transition wants to Building + // Valid states when job buffer event arrives: + // - Idle: First job starting for this want (normal case) + // - Building: Another job already started for this want (multiple jobs can service same want) + // Invalid states (panic - indicates orchestrator bug): + // - UpstreamBuilding: Not schedulable, waiting for dependencies + // - Successful/Failed/UpstreamFailed/Canceled: Terminal states, not schedulable + for wap in &job_run.servicing_wants { + let want = self.wants.remove(&wap.want_id).expect(&format!( + "BUG: Want {} must exist when job buffer event received", + wap.want_id + )); + + let transitioned = match want { + Want::Idle(idle) => { + // First job starting for this want + Want::Building(idle.start_building(current_timestamp())) + } + Want::Building(building) => { + // Another job already started, stay in Building (no-op) + Want::Building(building) + } + _ => { + panic!( + "BUG: Want {} in invalid state {:?} for job buffer. Only Idle or Building wants should be scheduled.", + wap.want_id, want + ); + } + }; + + self.wants.insert(wap.want_id.clone(), transitioned); + } + + // Transition partitions to Building state + self.transition_partitions_to_building(&job_run.building_partitions, &event.job_run_id); + + self.job_runs + .insert(event.job_run_id.clone(), job_run.clone()); + println!("Inserted job run: {:?}", job_run); + vec![] } fn update_job_run_status( &mut self, job_run_id: &str, status: JobRunStatusCode, - ) -> Result<(), DatabuildError> { - if let Some(job_run) = self.job_runs.get_mut(job_run_id) { - job_run.last_heartbeat_at = Some(current_timestamp()); - job_run.status = Some(status.into()); - Ok(()) - } else { - Err(format!("Job run ID {} not found", job_run_id).into()) - } + ) { + let job_run = self.job_runs.get_mut(job_run_id).expect(&format!( + "BUG: Job run ID {} must exist to update status", + job_run_id + )); + job_run.last_heartbeat_at = Some(current_timestamp()); + job_run.status = Some(status.into()); } fn handle_job_run_heartbeat( &mut self, event: &JobRunHeartbeatEventV1, - ) -> Result, DatabuildError> { - self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunRunning)?; - Ok(vec![]) + ) -> Vec { + self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunRunning); + vec![] } fn handle_job_run_success( &mut self, event: &JobRunSuccessEventV1, - ) -> Result, DatabuildError> { + ) -> Vec { println!("Job run success event: {:?}", event); - self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunSucceeded)?; + self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunSucceeded); let job_run = self.get_job_run(&event.job_run_id).unwrap(); // Clone building_partitions before we use it multiple times @@ -722,14 +724,14 @@ impl BuildState { // UpstreamBuilding → Idle/Building (for downstream wants waiting on newly successful wants) self.unblock_downstream_wants(&newly_successful_wants, &event.job_run_id, current_timestamp()); - Ok(vec![]) + vec![] } fn handle_job_run_failure( &mut self, event: &JobRunFailureEventV1, - ) -> Result, DatabuildError> { - self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed)?; + ) -> Vec { + self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed); let job_run = self.get_job_run(&event.job_run_id).unwrap(); // Clone building_partitions before we use it multiple times @@ -744,31 +746,31 @@ impl BuildState { // UpstreamBuilding → UpstreamFailed (for downstream wants waiting on newly failed wants) self.cascade_failures_to_downstream_wants(&newly_failed_wants, current_timestamp()); - Ok(vec![]) + vec![] } fn handle_job_run_cancel( &mut self, - event: &JobRunCancelEventV1, - ) -> Result, DatabuildError> { + _event: &JobRunCancelEventV1, + ) -> Vec { todo!("should update already inserted job run, partition status, want status") } pub fn handle_job_run_dep_miss( &mut self, event: &JobRunMissingDepsEventV1, - ) -> Result, DatabuildError> { - let job_run_detail = self.get_job_run(&event.job_run_id).ok_or(format!( - "Unable to find job run with id `{}`", + ) -> Vec { + let job_run_detail = self.get_job_run(&event.job_run_id).expect(&format!( + "BUG: Unable to find job run with id `{}`", event.job_run_id - ))?; + )); // Infer data/SLA timestamps from upstream want let want_timestamps: WantTimestamps = job_run_detail .servicing_wants .iter() .flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into())) .reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b)) - .ok_or(format!("No servicing wants found"))?; + .expect("BUG: No servicing wants found"); // Transition partitions back to Missing since this job can't build them yet self.reset_partitions_to_missing(&job_run_detail.building_partitions); @@ -796,20 +798,20 @@ impl BuildState { &partition_to_want_map, ); - Ok(want_events) + want_events } fn handle_taint_create( &mut self, - event: &TaintCreateEventV1, - ) -> Result, DatabuildError> { + _event: &TaintCreateEventV1, + ) -> Vec { todo!("...?") } fn handle_taint_delete( &mut self, - event: &TaintCancelEventV1, - ) -> Result, DatabuildError> { + _event: &TaintCancelEventV1, + ) -> Vec { todo!("...?") } @@ -1077,9 +1079,7 @@ mod tests { e.partitions = vec!["mypart".into()]; let mut state = BuildState::default(); - state - .handle_event(&e.clone().into()) - .expect("want create failed"); + state.handle_event(&e.clone().into()); let want = state.get_want("1234").unwrap(); let mut expected: WantDetail = e.into(); // Into will set this field as current timestamp @@ -1094,16 +1094,12 @@ mod tests { e.partitions = vec!["mypart".into()]; let mut state = BuildState::default(); - state - .handle_event(&e.clone().into()) - .expect("want create failed"); + state.handle_event(&e.clone().into()); // Should be able to cancel let mut e = WantCancelEventV1::default(); e.want_id = "1234".to_string(); - state - .handle_event(&e.clone().into()) - .expect("want cancel failed"); + state.handle_event(&e.clone().into()); let want = state.get_want("1234").unwrap(); assert_eq!(