From 01d50dde1bfd1eb50ebdb33ebed3a45a180ee755 Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Sat, 22 Nov 2025 16:15:03 +0800 Subject: [PATCH] factor out specific state transition helpers in build_state.rs --- databuild/build_state.rs | 850 ++++++++++++++++++++++----------------- 1 file changed, 483 insertions(+), 367 deletions(-) diff --git a/databuild/build_state.rs b/databuild/build_state.rs index 2cd9966..fff0444 100644 --- a/databuild/build_state.rs +++ b/databuild/build_state.rs @@ -80,6 +80,474 @@ impl BuildState { } } + /// Transition partitions from Missing to Building state + /// Used when a job run starts building partitions + fn transition_partitions_to_building( + &mut self, + partition_refs: &[PartitionRef], + job_run_id: &str, + ) { + for pref in partition_refs { + if let Some(partition) = self.partitions.remove(&pref.r#ref) { + // Partition exists - transition based on current state + let transitioned = match partition { + // Valid: Missing -> Building + Partition::Missing(missing) => { + Partition::Building(missing.start_building(job_run_id.to_string())) + } + // Invalid state: partition should not already be Building, Live, Failed, or Tainted + _ => { + panic!( + "BUG: Invalid state - partition {} cannot start building from state {:?}", + pref.r#ref, partition + ) + } + }; + self.partitions.insert(pref.r#ref.clone(), transitioned); + } else { + // Partition doesn't exist yet - create in Missing then transition to Building + let missing = Partition::new_missing(pref.clone()); + if let Partition::Missing(m) = missing { + let building = m.start_building(job_run_id.to_string()); + self.partitions + .insert(pref.r#ref.clone(), Partition::Building(building)); + } + } + } + } + + /// Transition partitions from Building to Live state + /// Used when a job run successfully completes + fn transition_partitions_to_live( + &mut self, + partition_refs: &[PartitionRef], + job_run_id: &str, + timestamp: u64, + ) { + for pref in partition_refs { + let partition = self.partitions.remove(&pref.r#ref).expect(&format!( + "BUG: Partition {} must exist and be in Building state before completion", + pref.r#ref + )); + + // ONLY valid transition: Building -> Live + let transitioned = match partition { + Partition::Building(building) => { + Partition::Live(building.complete(job_run_id.to_string(), timestamp)) + } + // All other states are invalid + _ => { + panic!( + "BUG: Invalid state - partition {} must be Building to transition to Live, found {:?}", + pref.r#ref, partition + ) + } + }; + self.partitions.insert(pref.r#ref.clone(), transitioned); + } + } + + /// Transition partitions from Building to Failed state + /// Used when a job run fails + fn transition_partitions_to_failed( + &mut self, + partition_refs: &[PartitionRef], + job_run_id: &str, + timestamp: u64, + ) { + for pref in partition_refs { + let partition = self.partitions.remove(&pref.r#ref).expect(&format!( + "BUG: Partition {} must exist and be in Building state before failure", + pref.r#ref + )); + + // ONLY valid transition: Building -> Failed + let transitioned = match partition { + Partition::Building(building) => { + Partition::Failed(building.fail(job_run_id.to_string(), timestamp)) + } + // All other states are invalid + _ => { + panic!( + "BUG: Invalid state - partition {} must be Building to transition to Failed, found {:?}", + pref.r#ref, partition + ) + } + }; + self.partitions.insert(pref.r#ref.clone(), transitioned); + } + } + + /// Reset partitions from Building back to Missing state + /// Used when a job run encounters missing dependencies and cannot proceed + fn reset_partitions_to_missing( + &mut self, + partition_refs: &[PartitionRef], + ) { + for pref in partition_refs { + let partition = self.partitions.remove(&pref.r#ref).expect(&format!( + "BUG: Partition {} must exist and be in Building state during dep_miss", + pref.r#ref + )); + + // Only valid transition: Building -> Missing + let transitioned = match partition { + Partition::Building(building) => Partition::Missing(building.reset_to_missing()), + // All other states are invalid + _ => { + panic!( + "BUG: Invalid state - partition {} must be Building during dep_miss, found {:?}", + pref.r#ref, partition + ) + } + }; + self.partitions.insert(pref.r#ref.clone(), transitioned); + } + } + + /// Complete wants when all their partitions become Live + /// Transitions Building → Successful, returns list of newly successful want IDs + fn complete_successful_wants( + &mut self, + newly_live_partitions: &[PartitionRef], + job_run_id: &str, + timestamp: u64, + ) -> Vec { + let mut newly_successful_wants: Vec = Vec::new(); + + for pref in newly_live_partitions { + let want_ids = self + .partitions + .get(&pref.r#ref) + .map(|p| p.want_ids().clone()) + .unwrap_or_default(); + + for want_id in want_ids { + let want = self.wants.remove(&want_id).expect(&format!( + "BUG: Want {} must exist when referenced by partition", + want_id + )); + + let transitioned = match want { + Want::Building(building) => { + // Check if ALL partitions for this want are now Live + let all_partitions_live = building.want.partitions.iter().all(|p| { + self.partitions + .get(&p.r#ref) + .map(|partition| partition.is_live()) + .unwrap_or(false) + }); + + if all_partitions_live { + newly_successful_wants.push(want_id.clone()); + Want::Successful( + building.complete(job_run_id.to_string(), timestamp), + ) + } else { + Want::Building(building) // Still building other partitions + } + } + _ => { + panic!( + "BUG: Want {} in invalid state {:?} when partition {} became Live. Should be Building.", + want_id, want, pref.r#ref + ); + } + }; + + self.wants.insert(want_id.clone(), transitioned); + } + } + + newly_successful_wants + } + + /// Fail wants when their partitions fail + /// Transitions Building → Failed, and adds to already-failed wants + /// Returns list of newly failed want IDs for downstream cascade + fn fail_directly_affected_wants( + &mut self, + failed_partitions: &[PartitionRef], + ) -> Vec { + let mut newly_failed_wants: Vec = Vec::new(); + + for pref in failed_partitions { + let want_ids = self + .partitions + .get(&pref.r#ref) + .map(|p| p.want_ids().clone()) + .unwrap_or_default(); + + for want_id in want_ids { + let want = self.wants.remove(&want_id).expect(&format!( + "BUG: Want {} must exist when referenced by partition", + want_id + )); + + let transitioned = match want { + Want::Building(building) => { + newly_failed_wants.push(want_id.clone()); + Want::Failed( + building.fail(vec![pref.clone()], "Partition build failed".to_string()), + ) + } + // Failed → Failed: add new failed partition to existing failed state + Want::Failed(failed) => { + Want::Failed(failed.add_failed_partitions(vec![pref.clone()])) + } + _ => { + panic!( + "BUG: Want {} in invalid state {:?} when partition {} failed. Should be Building or Failed.", + want_id, want, pref.r#ref + ); + } + }; + + self.wants.insert(want_id.clone(), transitioned); + } + } + + newly_failed_wants + } + + /// Unblock downstream wants when their upstream dependencies succeed + /// Transitions UpstreamBuilding → Idle (when ready) or Building (when partitions already building) + fn unblock_downstream_wants( + &mut self, + newly_successful_wants: &[String], + job_run_id: &str, + timestamp: u64, + ) { + // Find downstream wants that are waiting for any of the newly successful wants + // TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants + let downstream_wants_to_check: Vec = self + .wants + .iter() + .filter_map(|(id, want)| { + match want { + Want::UpstreamBuilding(downstream_want) => { + // Is this downstream want waiting for any of the newly successful wants? + let is_affected = downstream_want + .state + .upstream_want_ids + .iter() + .any(|up_id| newly_successful_wants.contains(up_id)); + if is_affected { Some(id.clone()) } else { None } + } + _ => None, + } + }) + .collect(); + + for want_id in downstream_wants_to_check { + let want = self + .wants + .remove(&want_id) + .expect(&format!("BUG: Want {} must exist", want_id)); + + let transitioned = match want { + Want::UpstreamBuilding(downstream_want) => { + // Check if ALL of this downstream want's upstream dependencies are now Successful + let all_upstreams_successful = downstream_want + .state + .upstream_want_ids + .iter() + .all(|up_want_id| { + self.wants + .get(up_want_id) + .map(|w| matches!(w, Want::Successful(_))) + .unwrap_or(false) + }); + + if all_upstreams_successful { + // Check if any of this want's partitions are still being built + // If a job dep-missed, its partitions transitioned back to Missing + // But other jobs might still be building other partitions for this want + let any_partition_building = + downstream_want.want.partitions.iter().any(|p| { + self.partitions + .get(&p.r#ref) + .map(|partition| matches!(partition, Partition::Building(_))) + .unwrap_or(false) + }); + + if any_partition_building { + // Some partitions still being built, continue in Building state + Want::Building( + downstream_want.continue_building( + job_run_id.to_string(), + timestamp, + ), + ) + } else { + // No partitions being built, become schedulable again + Want::Idle(downstream_want.upstreams_satisfied()) + } + } else { + // Upstreams not all satisfied yet, stay in UpstreamBuilding + Want::UpstreamBuilding(downstream_want) + } + } + _ => { + panic!("BUG: Want {} should be UpstreamBuilding here", want_id); + } + }; + + self.wants.insert(want_id, transitioned); + } + } + + /// Cascade failures to downstream wants when their upstream dependencies fail + /// Transitions UpstreamBuilding → UpstreamFailed + fn cascade_failures_to_downstream_wants( + &mut self, + newly_failed_wants: &[String], + timestamp: u64, + ) { + // Find downstream wants that are waiting for any of the newly failed wants + // TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants + let downstream_wants_to_fail: Vec = self + .wants + .iter() + .filter_map(|(id, want)| { + match want { + Want::UpstreamBuilding(downstream_want) => { + // Is this downstream want waiting for any of the newly failed wants? + let is_affected = downstream_want + .state + .upstream_want_ids + .iter() + .any(|up_id| newly_failed_wants.contains(up_id)); + if is_affected { Some(id.clone()) } else { None } + } + _ => None, + } + }) + .collect(); + + for want_id in downstream_wants_to_fail { + let want = self + .wants + .remove(&want_id) + .expect(&format!("BUG: Want {} must exist", want_id)); + + let transitioned = match want { + Want::UpstreamBuilding(downstream_want) => Want::UpstreamFailed( + downstream_want + .upstream_failed(newly_failed_wants.to_vec(), timestamp), + ), + _ => { + panic!("BUG: Want {} should be UpstreamBuilding here", want_id); + } + }; + + self.wants.insert(want_id, transitioned); + } + } + + /// Build a mapping from partition references to the want IDs that will build them + /// Used to track which upstream wants a downstream want depends on after a dep miss + fn build_partition_to_want_mapping( + &self, + want_events: &[Event], + ) -> std::collections::HashMap { + let mut partition_to_want_map: std::collections::HashMap = + std::collections::HashMap::new(); + + for event_item in want_events { + if let Event::WantCreateV1(want_create) = event_item { + for pref in &want_create.partitions { + partition_to_want_map.insert(pref.r#ref.clone(), want_create.want_id.clone()); + } + } + } + + partition_to_want_map + } + + /// Collect upstream want IDs that a servicing want now depends on based on dep misses + /// Returns a deduplicated, sorted list of upstream want IDs + fn collect_upstream_want_ids( + &self, + servicing_want: &crate::WantAttributedPartitions, + missing_deps: &[crate::MissingDeps], + partition_to_want_map: &std::collections::HashMap, + ) -> Vec { + let mut new_upstream_want_ids = Vec::new(); + + for missing_dep in missing_deps { + // Only process if this want contains an impacted partition + let is_impacted = missing_dep.impacted.iter().any(|imp| { + servicing_want + .partitions + .iter() + .any(|p| p.r#ref == imp.r#ref) + }); + + if is_impacted { + // For each missing partition, find the want ID that will build it + for missing_partition in &missing_dep.missing { + if let Some(want_id) = partition_to_want_map.get(&missing_partition.r#ref) { + new_upstream_want_ids.push(want_id.clone()); + } + } + } + } + + // Dedupe upstream want IDs (one job might report same dep multiple times) + new_upstream_want_ids.sort(); + new_upstream_want_ids.dedup(); + + new_upstream_want_ids + } + + /// Transition wants to UpstreamBuilding when they have missing dependencies + /// Handles Building → UpstreamBuilding and UpstreamBuilding → UpstreamBuilding (add upstreams) + fn transition_wants_to_upstream_building( + &mut self, + servicing_wants: &[crate::WantAttributedPartitions], + missing_deps: &[crate::MissingDeps], + partition_to_want_map: &std::collections::HashMap, + ) { + // For each want serviced by this job run, check if it was impacted by missing deps + for servicing_want in servicing_wants { + let want = self.wants.remove(&servicing_want.want_id).expect(&format!( + "BUG: Want {} must exist when serviced by job run", + servicing_want.want_id + )); + + // Collect the upstream want IDs that this want now depends on + let new_upstream_want_ids = + self.collect_upstream_want_ids(servicing_want, missing_deps, partition_to_want_map); + + let transitioned = if !new_upstream_want_ids.is_empty() { + match want { + Want::Building(building) => { + // First dep miss for this want: Building → UpstreamBuilding + Want::UpstreamBuilding(building.detect_missing_deps(new_upstream_want_ids)) + } + Want::UpstreamBuilding(upstream) => { + // Already in UpstreamBuilding, add more upstreams (self-transition) + // This can happen if multiple jobs report dep misses, or one job reports multiple dep misses + Want::UpstreamBuilding(upstream.add_upstreams(new_upstream_want_ids)) + } + _ => { + panic!( + "BUG: Want {} in invalid state {:?} when job run had dep miss. Should be Building or UpstreamBuilding.", + servicing_want.want_id, want + ); + } + } + } else { + // No new upstreams for this want (it wasn't impacted), keep current state + want + }; + + self.wants + .insert(servicing_want.want_id.clone(), transitioned); + } + } + /// Handles reacting to events, updating state, and erroring if its an invalid state transition /// Event handlers can return vecs of events that will then be appended to the BEL pub fn handle_event(&mut self, event: &Event) -> Result, DatabuildError> { @@ -203,33 +671,7 @@ impl BuildState { } // Transition partitions to Building state - for pref in &job_run.building_partitions { - if let Some(partition) = self.partitions.remove(&pref.r#ref) { - // Partition exists - transition based on current state - let transitioned = - match partition { - // Valid: Missing -> Building - Partition::Missing(missing) => Partition::Building( - missing.start_building(event.job_run_id.clone()), - ), - // Invalid state: partition should not already be Building, Live, Failed, or Tainted - _ => return Err(format!( - "Invalid state: partition {} cannot start building from state {:?}", - pref.r#ref, partition - ) - .into()), - }; - self.partitions.insert(pref.r#ref.clone(), transitioned); - } else { - // Partition doesn't exist yet - create in Missing then transition to Building - let missing = Partition::new_missing(pref.clone()); - if let Partition::Missing(m) = missing { - let building = m.start_building(event.job_run_id.clone()); - self.partitions - .insert(pref.r#ref.clone(), Partition::Building(building)); - } - } - } + self.transition_partitions_to_building(&job_run.building_partitions, &event.job_run_id); self.job_runs .insert(event.job_run_id.clone(), job_run.clone()); @@ -272,158 +714,13 @@ impl BuildState { let newly_live_partitions: Vec = job_run.building_partitions.clone(); // Update partitions being built by this job (strict type-safe transitions) - for pref in &newly_live_partitions { - let partition = self.partitions.remove(&pref.r#ref).ok_or_else(|| { - format!( - "Partition {} must exist and be in Building state before completion", - pref.r#ref - ) - })?; - - // ONLY valid transition: Building -> Live - let transitioned = match partition { - Partition::Building(building) => { - Partition::Live(building.complete( - event.job_run_id.clone(), - current_timestamp() - )) - } - // All other states are invalid - _ => { - return Err(format!( - "Invalid state: partition {} must be Building to transition to Live, found {:?}", - pref.r#ref, partition - ).into()) - } - }; - self.partitions.insert(pref.r#ref.clone(), transitioned); - } + self.transition_partitions_to_live(&newly_live_partitions, &event.job_run_id, current_timestamp()); // Building → Successful (when all partitions Live) - let mut newly_successful_wants: Vec = Vec::new(); - - for pref in &newly_live_partitions { - let want_ids = self - .partitions - .get(&pref.r#ref) - .map(|p| p.want_ids().clone()) - .unwrap_or_default(); - - for want_id in want_ids { - let want = self.wants.remove(&want_id).expect(&format!( - "BUG: Want {} must exist when referenced by partition", - want_id - )); - - let transitioned = match want { - Want::Building(building) => { - // Check if ALL partitions for this want are now Live - let all_partitions_live = building.want.partitions.iter().all(|p| { - self.partitions - .get(&p.r#ref) - .map(|partition| partition.is_live()) - .unwrap_or(false) - }); - - if all_partitions_live { - newly_successful_wants.push(want_id.clone()); - Want::Successful( - building.complete(event.job_run_id.clone(), current_timestamp()), - ) - } else { - Want::Building(building) // Still building other partitions - } - } - _ => { - panic!( - "BUG: Want {} in invalid state {:?} when partition {} became Live. Should be Building.", - want_id, want, pref.r#ref - ); - } - }; - - self.wants.insert(want_id.clone(), transitioned); - } - } + let newly_successful_wants = self.complete_successful_wants(&newly_live_partitions, &event.job_run_id, current_timestamp()); // UpstreamBuilding → Idle/Building (for downstream wants waiting on newly successful wants) - // Only check wants that are waiting for the newly successful wants - // TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants - let downstream_wants_to_check: Vec = self - .wants - .iter() - .filter_map(|(id, want)| { - match want { - Want::UpstreamBuilding(downstream_want) => { - // Is this downstream want waiting for any of the newly successful wants? - let is_affected = downstream_want - .state - .upstream_want_ids - .iter() - .any(|up_id| newly_successful_wants.contains(up_id)); - if is_affected { Some(id.clone()) } else { None } - } - _ => None, - } - }) - .collect(); - - for want_id in downstream_wants_to_check { - let want = self - .wants - .remove(&want_id) - .expect(&format!("BUG: Want {} must exist", want_id)); - - let transitioned = match want { - Want::UpstreamBuilding(downstream_want) => { - // Check if ALL of this downstream want's upstream dependencies are now Successful - let all_upstreams_successful = downstream_want - .state - .upstream_want_ids - .iter() - .all(|up_want_id| { - self.wants - .get(up_want_id) - .map(|w| matches!(w, Want::Successful(_))) - .unwrap_or(false) - }); - - if all_upstreams_successful { - // Check if any of this want's partitions are still being built - // If a job dep-missed, its partitions transitioned back to Missing - // But other jobs might still be building other partitions for this want - let any_partition_building = - downstream_want.want.partitions.iter().any(|p| { - self.partitions - .get(&p.r#ref) - .map(|partition| matches!(partition, Partition::Building(_))) - .unwrap_or(false) - }); - - if any_partition_building { - // Some partitions still being built, continue in Building state - Want::Building( - downstream_want.continue_building( - event.job_run_id.clone(), - current_timestamp(), - ), - ) - } else { - // No partitions being built, become schedulable again - Want::Idle(downstream_want.upstreams_satisfied()) - } - } else { - // Upstreams not all satisfied yet, stay in UpstreamBuilding - Want::UpstreamBuilding(downstream_want) - } - } - _ => { - panic!("BUG: Want {} should be UpstreamBuilding here", want_id); - } - }; - - self.wants.insert(want_id, transitioned); - } + self.unblock_downstream_wants(&newly_successful_wants, &event.job_run_id, current_timestamp()); Ok(vec![]) } @@ -439,111 +736,13 @@ impl BuildState { let failed_partitions: Vec = job_run.building_partitions.clone(); // Transition partitions using strict type-safe methods - for pref in &failed_partitions { - let partition = self.partitions.remove(&pref.r#ref).ok_or_else(|| { - format!( - "Partition {} must exist and be in Building state before failure", - pref.r#ref - ) - })?; - - // ONLY valid transition: Building -> Failed - let transitioned = match partition { - Partition::Building(building) => { - Partition::Failed(building.fail( - event.job_run_id.clone(), - current_timestamp() - )) - } - // All other states are invalid - _ => { - return Err(format!( - "Invalid state: partition {} must be Building to transition to Failed, found {:?}", - pref.r#ref, partition - ).into()) - } - }; - self.partitions.insert(pref.r#ref.clone(), transitioned); - } + self.transition_partitions_to_failed(&failed_partitions, &event.job_run_id, current_timestamp()); // Building → Failed (for wants directly building failed partitions) - let mut newly_failed_wants: Vec = Vec::new(); - - for pref in &failed_partitions { - let want_ids = self - .partitions - .get(&pref.r#ref) - .map(|p| p.want_ids().clone()) - .unwrap_or_default(); - - for want_id in want_ids { - let want = self.wants.remove(&want_id).expect(&format!( - "BUG: Want {} must exist when referenced by partition", - want_id - )); - - let transitioned = match want { - Want::Building(building) => { - newly_failed_wants.push(want_id.clone()); - Want::Failed( - building.fail(vec![pref.clone()], "Partition build failed".to_string()), - ) - } - // Failed → Failed: add new failed partition to existing failed state - Want::Failed(failed) => { - Want::Failed(failed.add_failed_partitions(vec![pref.clone()])) - } - _ => { - panic!( - "BUG: Want {} in invalid state {:?} when partition {} failed. Should be Building or Failed.", - want_id, want, pref.r#ref - ); - } - }; - - self.wants.insert(want_id.clone(), transitioned); - } - } + let newly_failed_wants = self.fail_directly_affected_wants(&failed_partitions); // UpstreamBuilding → UpstreamFailed (for downstream wants waiting on newly failed wants) - // TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants - let downstream_wants_to_fail: Vec = self - .wants - .iter() - .filter_map(|(id, want)| { - match want { - Want::UpstreamBuilding(downstream_want) => { - // Is this downstream want waiting for any of the newly failed wants? - let is_affected = downstream_want - .state - .upstream_want_ids - .iter() - .any(|up_id| newly_failed_wants.contains(up_id)); - if is_affected { Some(id.clone()) } else { None } - } - _ => None, - } - }) - .collect(); - - for want_id in downstream_wants_to_fail { - let want = self - .wants - .remove(&want_id) - .expect(&format!("BUG: Want {} must exist", want_id)); - - let transitioned = match want { - Want::UpstreamBuilding(downstream_want) => Want::UpstreamFailed( - downstream_want - .upstream_failed(newly_failed_wants.clone(), current_timestamp()), - ), - _ => { - panic!("BUG: Want {} should be UpstreamBuilding here", want_id); - } - }; - - self.wants.insert(want_id, transitioned); - } + self.cascade_failures_to_downstream_wants(&newly_failed_wants, current_timestamp()); Ok(vec![]) } @@ -572,28 +771,7 @@ impl BuildState { .ok_or(format!("No servicing wants found"))?; // Transition partitions back to Missing since this job can't build them yet - for pref in &job_run_detail.building_partitions { - let partition = self.partitions.remove(&pref.r#ref).ok_or_else(|| { - format!( - "Partition {} must exist and be in Building state during dep_miss", - pref.r#ref - ) - })?; - - // Only valid transition: Building -> Missing - let transitioned = match partition { - Partition::Building(building) => Partition::Missing(building.reset_to_missing()), - // All other states are invalid - _ => { - return Err(format!( - "Invalid state: partition {} must be Building during dep_miss, found {:?}", - pref.r#ref, partition - ) - .into()); - } - }; - self.partitions.insert(pref.r#ref.clone(), transitioned); - } + self.reset_partitions_to_missing(&job_run_detail.building_partitions); // Create wants from dep misses let want_events = missing_deps_to_want_events( @@ -609,76 +787,14 @@ impl BuildState { // 2. Transition the current want to UpstreamBuilding, tracking which upstream wants it's waiting for // Build a map: partition_ref -> want_id that will build it - // This lets us track which upstream wants the current want depends on - let mut partition_to_want_map: std::collections::HashMap = - std::collections::HashMap::new(); - for event_item in &want_events { - if let Event::WantCreateV1(want_create) = event_item { - for pref in &want_create.partitions { - partition_to_want_map.insert(pref.r#ref.clone(), want_create.want_id.clone()); - } - } - } + let partition_to_want_map = self.build_partition_to_want_mapping(&want_events); - // For each want serviced by this job run, check if it was impacted by missing deps - for servicing_want in &job_run_detail.servicing_wants { - let want = self.wants.remove(&servicing_want.want_id).expect(&format!( - "BUG: Want {} must exist when serviced by job run", - servicing_want.want_id - )); - - // Collect the upstream want IDs that this want now depends on - let mut new_upstream_want_ids = Vec::new(); - - for missing_dep in &event.missing_deps { - // Only process if this want contains an impacted partition - let is_impacted = missing_dep.impacted.iter().any(|imp| { - servicing_want - .partitions - .iter() - .any(|p| p.r#ref == imp.r#ref) - }); - - if is_impacted { - // For each missing partition, find the want ID that will build it - for missing_partition in &missing_dep.missing { - if let Some(want_id) = partition_to_want_map.get(&missing_partition.r#ref) { - new_upstream_want_ids.push(want_id.clone()); - } - } - } - } - - // Dedupe upstream want IDs (one job might report same dep multiple times) - new_upstream_want_ids.sort(); - new_upstream_want_ids.dedup(); - - let transitioned = if !new_upstream_want_ids.is_empty() { - match want { - Want::Building(building) => { - // First dep miss for this want: Building → UpstreamBuilding - Want::UpstreamBuilding(building.detect_missing_deps(new_upstream_want_ids)) - } - Want::UpstreamBuilding(upstream) => { - // Already in UpstreamBuilding, add more upstreams (self-transition) - // This can happen if multiple jobs report dep misses, or one job reports multiple dep misses - Want::UpstreamBuilding(upstream.add_upstreams(new_upstream_want_ids)) - } - _ => { - panic!( - "BUG: Want {} in invalid state {:?} when job run had dep miss. Should be Building or UpstreamBuilding.", - servicing_want.want_id, want - ); - } - } - } else { - // No new upstreams for this want (it wasn't impacted), keep current state - want - }; - - self.wants - .insert(servicing_want.want_id.clone(), transitioned); - } + // Transition servicing wants to UpstreamBuilding when they have missing dependencies + self.transition_wants_to_upstream_building( + &job_run_detail.servicing_wants, + &event.missing_deps, + &partition_to_want_map, + ); Ok(want_events) }