diff --git a/databuild/build_state.rs b/databuild/build_state.rs deleted file mode 100644 index 3e5d7ae..0000000 --- a/databuild/build_state.rs +++ /dev/null @@ -1,2340 +0,0 @@ -use crate::data_build_event::Event; -use crate::data_deps::{WantTimestamps, missing_deps_to_want_events}; -use crate::event_source::Source as EventSourceVariant; -use crate::job_run_state::{JobRun, JobRunWithState, QueuedState as JobQueuedState}; -use crate::partition_state::{ - BuildingPartitionRef, BuildingState, FailedPartitionRef, LivePartitionRef, Partition, - PartitionWithState, TaintedPartitionRef, UpForRetryPartitionRef, UpstreamBuildingPartitionRef, - UpstreamFailedPartitionRef, -}; -use crate::util::current_timestamp; -use crate::want_state::{ - FailedWantId, IdleState as WantIdleState, NewState as WantNewState, SuccessfulWantId, Want, - WantWithState, -}; -use crate::{ - JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1, - JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1, - ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, - ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, - PartitionRef, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantCancelEventV1, - WantCreateEventV1, WantDetail, -}; -use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; -use tracing; -use uuid::Uuid; - -/** -Design Notes - -The build state struct is the heart of the service and orchestrator, adapting build events to -higher level questions about build state. One temptation is to implement the build state as a set -of hierarchically defined reducers, to achieve information hiding and factor system capabilities and -state tracking simply. Unfortunately, to update state based on an event, you need a mutable borrow -of some part of the build state (that the reducer controls, for instance), and an immutable borrow -of the whole state for read/query purposes. The whole state needs to be available to handle state -updates like "this is the list of currently active job runs" in response to a job run event. Put -simply, this isn't possible without introducing some locking of the whole state and mutable state -subset, since they would conflict (the mutable subset would have already been borrowed, so can't -be borrowed immutably as part of the whole state borrow). You might also define a "query" phase -in which reducers query the state based on the received event, but that just increases complexity. - -Instead, databuild opts for an entity-component system (ECS) that just provides the whole build -state mutably to all state update functionality, trusting that we know how to use it responsibly. -This means no boxing or "query phase", and means we can have all state updates happen as map lookups -and updates, which is exceptionally fast. The states of the different entities are managed by state -machines, in a pseudo-colored-petri-net style (only pseudo because we haven't formalized it). It is -critical that these state machines, their states, and their transitions are type-safe. -*/ - -/// Tracks all application state, defines valid state transitions, and manages cross-state machine -/// state transitions (e.g. job run success resulting in partition going from Building to Live) -#[derive(Debug, Clone, Default)] -pub struct BuildState { - // Core entity storage - wants: BTreeMap, - taints: BTreeMap, - job_runs: BTreeMap, - - // UUID-based partition indexing - partitions_by_uuid: BTreeMap, - canonical_partitions: BTreeMap, // partition ref → current UUID - - // Inverted indexes - wants_for_partition: BTreeMap>, // partition ref → want_ids - downstream_waiting: BTreeMap>, // upstream ref → partition UUIDs waiting for it -} - -impl BuildState { - /// Reconstruct BuildState from a sequence of events (for read path in web server) - /// This allows the web server to rebuild state from BEL storage without holding a lock - pub fn from_events(events: &[crate::DataBuildEvent]) -> Self { - let mut state = BuildState::default(); - for event in events { - if let Some(ref inner_event) = event.event { - // handle_event returns Vec for cascading events, but we ignore them - // since we're replaying from a complete event log - state.handle_event(inner_event); - } - } - state - } - - pub fn count_job_runs(&self) -> usize { - self.job_runs.len() - } - - // ===== New UUID-based partition access methods ===== - - /// Get the canonical partition for a ref (the current/active partition instance) - pub fn get_canonical_partition(&self, partition_ref: &str) -> Option<&Partition> { - self.canonical_partitions - .get(partition_ref) - .and_then(|uuid| self.partitions_by_uuid.get(uuid)) - } - - /// Get the canonical partition UUID for a ref - pub fn get_canonical_partition_uuid(&self, partition_ref: &str) -> Option { - self.canonical_partitions.get(partition_ref).copied() - } - - /// Get a partition by its UUID - pub fn get_partition_by_uuid(&self, uuid: Uuid) -> Option<&Partition> { - self.partitions_by_uuid.get(&uuid) - } - - /// Take the canonical partition for a ref (removes from partitions_by_uuid for state transition) - /// The canonical_partitions mapping is NOT removed - caller must update it if creating a new partition - fn take_canonical_partition(&mut self, partition_ref: &str) -> Option { - self.canonical_partitions - .get(partition_ref) - .copied() - .and_then(|uuid| self.partitions_by_uuid.remove(&uuid)) - } - - /// Get want IDs for a partition ref (from inverted index) - pub fn get_wants_for_partition(&self, partition_ref: &str) -> &[String] { - self.wants_for_partition - .get(partition_ref) - .map(|v| v.as_slice()) - .unwrap_or(&[]) - } - - /// Register a want in the wants_for_partition inverted index - fn register_want_for_partitions(&mut self, want_id: &str, partition_refs: &[PartitionRef]) { - for pref in partition_refs { - let want_ids = self - .wants_for_partition - .entry(pref.r#ref.clone()) - .or_insert_with(Vec::new); - if !want_ids.contains(&want_id.to_string()) { - want_ids.push(want_id.to_string()); - } - } - } - - /// Create a new partition in Building state and update indexes - fn create_partition_building(&mut self, job_run_id: &str, partition_ref: PartitionRef) -> Uuid { - let partition = - PartitionWithState::::new(job_run_id.to_string(), partition_ref.clone()); - let uuid = partition.uuid; - - // Update indexes - self.partitions_by_uuid - .insert(uuid, Partition::Building(partition)); - self.canonical_partitions - .insert(partition_ref.r#ref.clone(), uuid); - - tracing::info!( - partition = %partition_ref.r#ref, - uuid = %uuid, - job_run_id = %job_run_id, - "Partition: Created in Building state" - ); - - uuid - } - - /// Update a partition in the indexes (after state transition) - fn update_partition(&mut self, partition: Partition) { - let uuid = partition.uuid(); - - // Update in UUID map - self.partitions_by_uuid.insert(uuid, partition); - } - - /// Handle creation of a derivative want (created due to job dep miss) - /// - /// When a job reports missing dependencies, it returns WantCreateV1 events for those missing partitions. - /// Those events get appended to the BEL and eventually processed by handle_want_create(). - /// - /// This function is called when we detect a derivative want (has source.job_triggered) and transitions - /// the impacted wants to UpstreamBuilding state, tracking the derivative want ID as an upstream dependency. - /// - /// KEY INSIGHT: We must use the actual want_id from the WantCreateV1 event, not synthetic UUIDs generated - /// during event processing. This ensures replay works correctly - the same want IDs are used both during - /// original execution and during replay from the BEL. - fn handle_derivative_want_creation( - &mut self, - derivative_want_id: &str, - derivative_want_partitions: &[PartitionRef], - source_job_run_id: &str, - ) { - // Look up the job run that triggered this derivative want - // This job run must be in DepMiss state because it reported missing dependencies - let job_run = self.job_runs.get(source_job_run_id).expect(&format!( - "BUG: Job run {} must exist when derivative want created", - source_job_run_id - )); - - // Extract the missing deps from the DepMiss job run - let missing_deps = match job_run { - JobRun::DepMiss(dep_miss) => dep_miss.get_missing_deps(), - _ => { - panic!( - "BUG: Job run {} must be in DepMiss state when derivative want created, found {:?}", - source_job_run_id, job_run - ); - } - }; - - // Find which MissingDeps entry corresponds to this derivative want - // The derivative want was created for a specific set of missing partitions, - // and we need to find which downstream partitions are impacted by those missing partitions - for md in missing_deps { - // Check if this derivative want's partitions match the missing partitions in this entry - // We need exact match because one dep miss event can create multiple derivative wants - let partitions_match = md.missing.iter().all(|missing_ref| { - derivative_want_partitions - .iter() - .any(|p| p.r#ref == missing_ref.r#ref) - }) && derivative_want_partitions.len() == md.missing.len(); - - if partitions_match { - // Now we know which partitions are impacted by this missing dependency - let impacted_partition_refs: Vec = - md.impacted.iter().map(|p| p.r#ref.clone()).collect(); - - tracing::debug!( - derivative_want_id = %derivative_want_id, - source_job_run_id = %source_job_run_id, - missing_partitions = ?derivative_want_partitions.iter().map(|p| &p.r#ref).collect::>(), - impacted_partitions = ?impacted_partition_refs, - "Processing derivative want creation" - ); - - // Find all wants that include these impacted partitions - // These are the wants that need to wait for the derivative want to complete - let mut impacted_want_ids: std::collections::HashSet = - std::collections::HashSet::new(); - for partition_ref in &impacted_partition_refs { - for want_id in self.get_wants_for_partition(partition_ref) { - impacted_want_ids.insert(want_id.clone()); - } - } - - // Transition each impacted want to UpstreamBuilding, tracking this derivative want as an upstream - for want_id in impacted_want_ids { - let want = self.wants.remove(&want_id).expect(&format!( - "BUG: Want {} must exist when processing derivative want", - want_id - )); - - let transitioned = match want { - Want::Building(building) => { - // First dep miss for this want: Building → UpstreamBuilding - tracing::info!( - want_id = %want_id, - derivative_want_id = %derivative_want_id, - "Want: Building → UpstreamBuilding (first missing dep detected)" - ); - Want::UpstreamBuilding( - building.detect_missing_deps(vec![derivative_want_id.to_string()]), - ) - } - Want::UpstreamBuilding(upstream) => { - // Additional dep miss: UpstreamBuilding → UpstreamBuilding (add another upstream) - // This can happen if multiple jobs report dep misses for different upstreams - tracing::info!( - want_id = %want_id, - derivative_want_id = %derivative_want_id, - "Want: UpstreamBuilding → UpstreamBuilding (additional upstream added)" - ); - Want::UpstreamBuilding( - upstream.add_upstreams(vec![derivative_want_id.to_string()]), - ) - } - _ => { - panic!( - "BUG: Want {} in invalid state {:?} when processing derivative want. Should be Building or UpstreamBuilding.", - want_id, want - ); - } - }; - - self.wants.insert(want_id, transitioned); - } - } - } - } - - /// Create partitions in Building state - /// Used when a job run starts building partitions. - /// Note: Partitions no longer have a Missing state - they start directly as Building. - fn transition_partitions_to_building( - &mut self, - partition_refs: &[BuildingPartitionRef], - job_run_id: &str, - ) { - for building_ref in partition_refs { - if let Some(partition) = self.get_canonical_partition(&building_ref.0.r#ref).cloned() { - // Partition already exists - this is an error unless we're retrying from UpForRetry - match partition { - Partition::UpForRetry(_) => { - // Valid: UpForRetry -> Building (retry after deps satisfied) - // Old partition stays in partitions_by_uuid as historical record - // Create new Building partition with fresh UUID - let uuid = - self.create_partition_building(job_run_id, building_ref.0.clone()); - tracing::info!( - partition = %building_ref.0.r#ref, - job_run_id = %job_run_id, - uuid = %uuid, - "Partition: UpForRetry → Building (retry)" - ); - } - _ => { - panic!( - "BUG: Invalid state - partition {} cannot start building from state {:?}", - building_ref.0.r#ref, partition - ) - } - } - } else { - // Partition doesn't exist yet - create directly in Building state - let uuid = self.create_partition_building(job_run_id, building_ref.0.clone()); - tracing::info!( - partition = %building_ref.0.r#ref, - job_run_id = %job_run_id, - uuid = %uuid, - "Partition: (new) → Building" - ); - } - } - } - - /// Transition partitions from Building to Live state - /// Used when a job run successfully completes - fn transition_partitions_to_live( - &mut self, - partition_refs: &[LivePartitionRef], - job_run_id: &str, - timestamp: u64, - ) { - for pref in partition_refs { - let partition = self - .take_canonical_partition(&pref.0.r#ref) - .expect(&format!( - "BUG: Partition {} must exist and be in Building state before completion", - pref.0.r#ref - )); - - // ONLY valid transition: Building -> Live - let transitioned = match partition { - Partition::Building(building) => { - tracing::info!( - partition = %pref.0.r#ref, - job_run_id = %job_run_id, - "Partition: Building → Live" - ); - Partition::Live(building.complete(timestamp)) - } - // All other states are invalid - _ => { - panic!( - "BUG: Invalid state - partition {} must be Building to transition to Live, found {:?}", - pref.0.r#ref, partition - ) - } - }; - self.update_partition(transitioned); - } - } - - /// Transition partitions from Building to Failed state - /// Used when a job run fails - fn transition_partitions_to_failed( - &mut self, - partition_refs: &[FailedPartitionRef], - job_run_id: &str, - timestamp: u64, - ) { - for pref in partition_refs { - let partition = self - .take_canonical_partition(&pref.0.r#ref) - .expect(&format!( - "BUG: Partition {} must exist and be in Building state before failure", - pref.0.r#ref - )); - - // ONLY valid transition: Building -> Failed - let transitioned = match partition { - Partition::Building(building) => { - tracing::info!( - partition = %pref.0.r#ref, - job_run_id = %job_run_id, - "Partition: Building → Failed" - ); - Partition::Failed(building.fail(timestamp)) - } - // All other states are invalid - _ => { - panic!( - "BUG: Invalid state - partition {} must be Building to transition to Failed, found {:?}", - pref.0.r#ref, partition - ) - } - }; - self.update_partition(transitioned); - } - } - - /// Transition partitions from Building to UpstreamBuilding state - /// Used when a job run encounters missing dependencies and cannot proceed. - /// The partition waits for its upstream deps to be built before becoming UpForRetry. - fn transition_partitions_to_upstream_building( - &mut self, - partition_refs: &[BuildingPartitionRef], - missing_deps: Vec, - ) { - for building_ref in partition_refs { - let partition = self - .take_canonical_partition(&building_ref.0.r#ref) - .expect(&format!( - "BUG: Partition {} must exist and be in Building state during dep_miss", - building_ref.0.r#ref - )); - - // Only valid transition: Building -> UpstreamBuilding - let transitioned = match partition { - Partition::Building(building) => { - let partition_uuid = building.uuid; - tracing::info!( - partition = %building_ref.0.r#ref, - uuid = %partition_uuid, - missing_deps = ?missing_deps.iter().map(|p| &p.r#ref).collect::>(), - "Partition: Building → UpstreamBuilding (dep miss)" - ); - - // Update downstream_waiting index: for each missing dep, record that this partition is waiting - for missing_dep in &missing_deps { - self.downstream_waiting - .entry(missing_dep.r#ref.clone()) - .or_default() - .push(partition_uuid); - } - - Partition::UpstreamBuilding(building.dep_miss(missing_deps.clone())) - } - // All other states are invalid - _ => { - panic!( - "BUG: Invalid state - partition {} must be Building during dep_miss, found {:?}", - building_ref.0.r#ref, partition - ) - } - }; - self.update_partition(transitioned); - } - } - - /// Transition partitions from UpstreamBuilding to UpForRetry when their upstream deps become Live. - /// This should be called when partitions become Live to check if any downstream partitions can now retry. - /// Uses the `downstream_waiting` index for O(1) lookup of affected partitions. - fn unblock_downstream_partitions(&mut self, newly_live_partition_refs: &[LivePartitionRef]) { - // Collect UUIDs of partitions that might be unblocked using the inverted index - let mut uuids_to_check: Vec = Vec::new(); - for live_ref in newly_live_partition_refs { - if let Some(waiting_uuids) = self.downstream_waiting.get(&live_ref.0.r#ref) { - uuids_to_check.extend(waiting_uuids.iter().cloned()); - } - } - - // Deduplicate UUIDs (a partition might be waiting for multiple deps that all became live) - uuids_to_check.sort(); - uuids_to_check.dedup(); - - for uuid in uuids_to_check { - // Get partition by UUID - it might have been transitioned already or no longer exist - let Some(partition) = self.partitions_by_uuid.get(&uuid).cloned() else { - continue; - }; - - let partition_ref = partition.partition_ref().r#ref.clone(); - - // Only process UpstreamBuilding partitions - if let Partition::UpstreamBuilding(mut upstream_building) = partition { - // Remove satisfied deps from missing_deps - for live_ref in newly_live_partition_refs { - upstream_building - .state - .missing_deps - .retain(|d| d.r#ref != live_ref.0.r#ref); - // Also remove from downstream_waiting index - if let Some(waiting) = self.downstream_waiting.get_mut(&live_ref.0.r#ref) { - waiting.retain(|u| *u != uuid); - } - } - - let transitioned = if upstream_building.state.missing_deps.is_empty() { - // All deps satisfied, transition to UpForRetry - tracing::info!( - partition = %partition_ref, - uuid = %uuid, - "Partition: UpstreamBuilding → UpForRetry (all upstreams satisfied)" - ); - Partition::UpForRetry(upstream_building.upstreams_satisfied()) - } else { - // Still waiting for more deps - tracing::debug!( - partition = %partition_ref, - uuid = %uuid, - remaining_deps = ?upstream_building.state.missing_deps.iter().map(|d| &d.r#ref).collect::>(), - "Partition remains in UpstreamBuilding (still waiting for deps)" - ); - Partition::UpstreamBuilding(upstream_building) - }; - - self.update_partition(transitioned); - } - } - } - - /// Cascade failures to downstream partitions when their upstream dependencies fail. - /// Transitions UpstreamBuilding → UpstreamFailed for partitions waiting on failed upstreams. - /// Uses the `downstream_waiting` index for O(1) lookup of affected partitions. - fn cascade_failures_to_downstream_partitions( - &mut self, - failed_partition_refs: &[FailedPartitionRef], - ) { - // Collect UUIDs of partitions that are waiting for the failed partitions - let mut uuids_to_fail: Vec = Vec::new(); - for failed_ref in failed_partition_refs { - if let Some(waiting_uuids) = self.downstream_waiting.get(&failed_ref.0.r#ref) { - uuids_to_fail.extend(waiting_uuids.iter().cloned()); - } - } - - // Deduplicate UUIDs - uuids_to_fail.sort(); - uuids_to_fail.dedup(); - - for uuid in uuids_to_fail { - // Get partition by UUID - let Some(partition) = self.partitions_by_uuid.get(&uuid).cloned() else { - continue; - }; - - let partition_ref = partition.partition_ref().r#ref.clone(); - - // Only process UpstreamBuilding partitions - if let Partition::UpstreamBuilding(upstream_building) = partition { - // Collect which upstream refs failed - let failed_upstream_refs: Vec = failed_partition_refs - .iter() - .filter(|f| { - upstream_building - .state - .missing_deps - .iter() - .any(|d| d.r#ref == f.0.r#ref) - }) - .map(|f| f.0.clone()) - .collect(); - - if !failed_upstream_refs.is_empty() { - tracing::info!( - partition = %partition_ref, - uuid = %uuid, - failed_upstreams = ?failed_upstream_refs.iter().map(|p| &p.r#ref).collect::>(), - "Partition: UpstreamBuilding → UpstreamFailed (upstream failed)" - ); - - // Remove from downstream_waiting index for all deps - for dep in &upstream_building.state.missing_deps { - if let Some(waiting) = self.downstream_waiting.get_mut(&dep.r#ref) { - waiting.retain(|u| *u != uuid); - } - } - - // Transition to UpstreamFailed - let transitioned = Partition::UpstreamFailed( - upstream_building - .upstream_failed(failed_upstream_refs, current_timestamp()), - ); - self.update_partition(transitioned); - } - } - } - } - - /// Complete wants when all their partitions become Live - /// Transitions Building → Successful, returns list of newly successful want IDs - fn complete_successful_wants( - &mut self, - newly_live_partitions: &[LivePartitionRef], - job_run_id: &str, - timestamp: u64, - ) -> Vec { - let mut newly_successful_wants: Vec = Vec::new(); - - for pref in newly_live_partitions { - let want_ids: Vec = self.get_wants_for_partition(&pref.0.r#ref).to_vec(); - - for want_id in want_ids { - let want = self.wants.remove(&want_id).expect(&format!( - "BUG: Want {} must exist when referenced by partition", - want_id - )); - - let transitioned = match want { - Want::Building(building) => { - // Check if ALL partitions for this want are now Live - let all_partitions_live = building.want.partitions.iter().all(|p| { - self.get_canonical_partition(&p.r#ref) - .map(|partition| partition.is_live()) - .unwrap_or(false) - }); - - if all_partitions_live { - let successful_want = - building.complete(job_run_id.to_string(), timestamp); - tracing::info!( - want_id = %want_id, - job_run_id = %job_run_id, - "Want: Building → Successful" - ); - newly_successful_wants.push(successful_want.get_id()); - Want::Successful(successful_want) - } else { - Want::Building(building) // Still building other partitions - } - } - _ => { - panic!( - "BUG: Want {} in invalid state {:?} when partition {} became Live. Should be Building.", - want_id, want, pref.0.r#ref - ); - } - }; - - self.wants.insert(want_id.clone(), transitioned); - } - } - - newly_successful_wants - } - - /// Fail wants when their partitions fail - /// Transitions Building → Failed, and adds to already-failed wants - /// Returns list of newly failed want IDs for downstream cascade - fn fail_directly_affected_wants( - &mut self, - failed_partitions: &[FailedPartitionRef], - ) -> Vec { - let mut newly_failed_wants: Vec = Vec::new(); - - for pref in failed_partitions { - let want_ids: Vec = self.get_wants_for_partition(&pref.0.r#ref).to_vec(); - - for want_id in want_ids { - let want = self.wants.remove(&want_id).expect(&format!( - "BUG: Want {} must exist when referenced by partition", - want_id - )); - - let transitioned = match want { - Want::Building(building) => { - let failed = building - .fail(vec![pref.0.clone()], "Partition build failed".to_string()); - newly_failed_wants.push(failed.get_id()); - Want::Failed(failed) - } - // Failed → Failed: add new failed partition to existing failed state - Want::Failed(failed) => { - Want::Failed(failed.add_failed_partitions(vec![pref.clone()])) - } - _ => { - panic!( - "BUG: Want {} in invalid state {:?} when partition {} failed. Should be Building or Failed.", - want_id, want, pref.0.r#ref - ); - } - }; - - self.wants.insert(want_id.clone(), transitioned); - } - } - - newly_failed_wants - } - - /// Unblock downstream wants when their upstream dependencies succeed - /// Transitions UpstreamBuilding → Idle (when ready) or Building (when partitions already building) - fn unblock_downstream_wants( - &mut self, - newly_successful_wants: &[SuccessfulWantId], - job_run_id: &str, - timestamp: u64, - ) { - tracing::debug!( - newly_successful_wants = ?newly_successful_wants - .iter() - .map(|w| &w.0) - .collect::>(), - "Checking downstream wants for unblocking" - ); - // Find downstream wants that are waiting for any of the newly successful wants - // TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants - let downstream_wants_to_check: Vec = self - .wants - .iter() - .filter_map(|(id, want)| { - match want { - Want::UpstreamBuilding(downstream_want) => { - // Is this downstream want waiting for any of the newly successful wants? - let is_affected = - downstream_want.state.upstream_want_ids.iter().any(|up_id| { - newly_successful_wants.iter().any(|swid| &swid.0 == up_id) - }); - if is_affected { Some(id.clone()) } else { None } - } - _ => None, - } - }) - .collect(); - tracing::debug!( - downstream_wants_to_check = ?downstream_wants_to_check, - "Found downstream wants affected by upstream completion" - ); - - for want_id in downstream_wants_to_check { - let want = self - .wants - .remove(&want_id) - .expect(&format!("BUG: Want {} must exist", want_id)); - - let transitioned = match want { - Want::UpstreamBuilding(downstream_want) => { - tracing::debug!( - want_id = %want_id, - upstreams = ?downstream_want.state.upstream_want_ids, - "Checking if all upstreams are satisfied" - ); - // Check if ALL of this downstream want's upstream dependencies are now Successful - let all_upstreams_successful = downstream_want - .state - .upstream_want_ids - .iter() - .all(|up_want_id| { - self.wants - .get(up_want_id) - .map(|w| matches!(w, Want::Successful(_))) - .unwrap_or(false) - }); - tracing::debug!( - want_id = %want_id, - all_upstreams_successful = %all_upstreams_successful, - "Upstream satisfaction check complete" - ); - - if all_upstreams_successful { - // Check if any of this want's partitions are still being built - // If a job dep-missed, its partitions transitioned back to Missing - // But other jobs might still be building other partitions for this want - let any_partition_building = - downstream_want.want.partitions.iter().any(|p| { - self.get_canonical_partition(&p.r#ref) - .map(|partition| matches!(partition, Partition::Building(_))) - .unwrap_or(false) - }); - tracing::debug!( - want_id = %want_id, - any_partition_building = %any_partition_building, - "Partition building status check" - ); - - if any_partition_building { - // Some partitions still being built, continue in Building state - tracing::info!( - want_id = %want_id, - job_run_id = %job_run_id, - "Want: UpstreamBuilding → Building (upstreams satisfied, partitions building)" - ); - Want::Building( - downstream_want - .continue_building(job_run_id.to_string(), timestamp), - ) - } else { - // No partitions being built, become schedulable again - tracing::info!( - want_id = %want_id, - "Want: UpstreamBuilding → Idle (upstreams satisfied, ready to schedule)" - ); - Want::Idle(downstream_want.upstreams_satisfied()) - } - } else { - // Upstreams not all satisfied yet, stay in UpstreamBuilding - tracing::debug!( - want_id = %want_id, - "Want remains in UpstreamBuilding state (upstreams not yet satisfied)" - ); - Want::UpstreamBuilding(downstream_want) - } - } - _ => { - panic!("BUG: Want {} should be UpstreamBuilding here", want_id); - } - }; - - self.wants.insert(want_id, transitioned); - } - } - - /// Cascade failures to downstream wants when their upstream dependencies fail - /// Transitions UpstreamBuilding → UpstreamFailed - fn cascade_failures_to_downstream_wants( - &mut self, - newly_failed_wants: &[FailedWantId], - timestamp: u64, - ) { - // Find downstream wants that are waiting for any of the newly failed wants - // TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants - let downstream_wants_to_fail: Vec = self - .wants - .iter() - .filter_map(|(id, want)| { - match want { - Want::UpstreamBuilding(downstream_want) => { - // Is this downstream want waiting for any of the newly failed wants? - let is_affected = - downstream_want.state.upstream_want_ids.iter().any(|up_id| { - newly_failed_wants.iter().any(|fwid| &fwid.0 == up_id) - }); - if is_affected { Some(id.clone()) } else { None } - } - _ => None, - } - }) - .collect(); - - for want_id in downstream_wants_to_fail { - let want = self - .wants - .remove(&want_id) - .expect(&format!("BUG: Want {} must exist", want_id)); - - let transitioned = match want { - Want::UpstreamBuilding(downstream_want) => Want::UpstreamFailed( - downstream_want.upstream_failed( - newly_failed_wants - .iter() - .map(|fwid| fwid.0.clone()) - .collect(), - timestamp, - ), - ), - _ => { - panic!("BUG: Want {} should be UpstreamBuilding here", want_id); - } - }; - - self.wants.insert(want_id, transitioned); - } - } - - /// Handles reacting to events, updating state, and erroring if its an invalid state transition - /// Event handlers can return vecs of events that will then be appended to the BEL - pub fn handle_event(&mut self, event: &Event) -> Vec { - match event { - // JobRun events - Event::JobRunBufferV1(e) => self.handle_job_run_buffer(e), - Event::JobRunHeartbeatV1(e) => self.handle_job_run_heartbeat(e), - Event::JobRunFailureV1(e) => self.handle_job_run_failure(e), - Event::JobRunCancelV1(e) => self.handle_job_run_cancel(e), - Event::JobRunSuccessV1(e) => self.handle_job_run_success(e), - Event::JobRunMissingDepsV1(e) => self.handle_job_run_dep_miss(e), - // Want events - Event::WantCreateV1(e) => self.handle_want_create(e), - Event::WantCancelV1(e) => self.handle_want_cancel(e), - // Taint events - Event::TaintCreateV1(e) => self.handle_taint_create(e), - Event::TaintCancelV1(e) => self.handle_taint_delete(e), - // Ruh roh! - _ => panic!("Unhandled event type! {:?}", event), - } - } - - fn handle_want_create(&mut self, event: &WantCreateEventV1) -> Vec { - // Create want in New state from event - let want_new: WantWithState = event.clone().into(); - - // Log creation with derivative vs user-created distinction - let is_derivative = if let Some(source) = &event.source { - if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source { - tracing::info!( - want_id = %event.want_id, - partitions = ?event.partitions.iter().map(|p| &p.r#ref).collect::>(), - source_job_run_id = %job_triggered.job_run_id, - "Want created (derivative - auto-created due to missing dependency)" - ); - true - } else { - false - } - } else { - tracing::info!( - want_id = %event.want_id, - partitions = ?event.partitions.iter().map(|p| &p.r#ref).collect::>(), - "Want created (user-requested)" - ); - false - }; - - // Register this want with all its partitions (via inverted index) - self.register_want_for_partitions(&event.want_id, &event.partitions); - - // Sense canonical partition states and determine initial want state - // Priority order: Failed > UpstreamFailed > AllLive > Building > UpstreamBuilding > UpForRetry > Idle - let mut failed_partitions: Vec = Vec::new(); - let mut upstream_failed_wants: Vec = Vec::new(); - let mut all_live = true; - let mut any_building = false; - let mut any_upstream_building = false; - let mut any_up_for_retry = false; - let mut building_started_at: Option = None; - - for pref in &event.partitions { - match self.get_canonical_partition(&pref.r#ref) { - Some(partition) => match partition { - Partition::Failed(_f) => { - failed_partitions.push(pref.clone()); - all_live = false; - } - Partition::UpstreamFailed(uf) => { - // Track which upstream refs failed - for failed_ref in &uf.state.failed_upstream_refs { - upstream_failed_wants.push(failed_ref.r#ref.clone()); - } - all_live = false; - } - Partition::Live(_) => { - // Contributes to all_live check - } - Partition::Building(_b) => { - any_building = true; - all_live = false; - // Track when building started (use earliest if multiple) - if building_started_at.is_none() { - building_started_at = Some(current_timestamp()); - } - } - Partition::UpstreamBuilding(_) => { - any_upstream_building = true; - all_live = false; - } - Partition::UpForRetry(_) => { - any_up_for_retry = true; - all_live = false; - } - Partition::Tainted(_) => { - // Tainted partitions need rebuild - all_live = false; - } - }, - None => { - // Partition doesn't exist - needs to be built - all_live = false; - } - } - } - - // Transition from New to appropriate state based on sensing - let final_want: Want = if !failed_partitions.is_empty() { - tracing::info!( - want_id = %event.want_id, - failed_partitions = ?failed_partitions.iter().map(|p| &p.r#ref).collect::>(), - "Want: New → Failed (partition already failed)" - ); - Want::Failed( - want_new.to_failed(failed_partitions, "Partition already failed".to_string()), - ) - } else if !upstream_failed_wants.is_empty() { - tracing::info!( - want_id = %event.want_id, - upstream_failed = ?upstream_failed_wants, - "Want: New → UpstreamFailed (upstream already failed)" - ); - Want::UpstreamFailed(want_new.to_upstream_failed(upstream_failed_wants)) - } else if all_live && !event.partitions.is_empty() { - tracing::info!( - want_id = %event.want_id, - "Want: New → Successful (all partitions already live)" - ); - Want::Successful(want_new.to_successful(current_timestamp())) - } else if any_building { - tracing::info!( - want_id = %event.want_id, - "Want: New → Building (partitions being built)" - ); - Want::Building( - want_new.to_building(building_started_at.unwrap_or_else(current_timestamp)), - ) - } else if any_upstream_building { - // For upstream building, we need the upstream want IDs - // For now, transition to Idle and let derivative want handling take care of it - tracing::info!( - want_id = %event.want_id, - "Want: New → Idle (upstream building - will be updated by derivative want handling)" - ); - Want::Idle(want_new.to_idle()) - } else { - // Partitions don't exist, or are UpForRetry - want is schedulable - tracing::info!( - want_id = %event.want_id, - "Want: New → Idle (ready to schedule)" - ); - Want::Idle(want_new.to_idle()) - }; - - self.wants.insert(event.want_id.clone(), final_want); - - // If this is a derivative want (triggered by a job's dep miss), transition impacted wants to UpstreamBuilding - if is_derivative { - if let Some(source) = &event.source { - if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source { - self.handle_derivative_want_creation( - &event.want_id, - &event.partitions, - &job_triggered.job_run_id, - ); - } - } - } - - vec![] - } - - fn handle_want_cancel(&mut self, event: &WantCancelEventV1) -> Vec { - // TODO actually cancel in-progress job runs that no longer have a sponsoring want - - // Type-safe transition (API layer should prevent canceling terminal wants) - let want = self.wants.remove(&event.want_id).expect(&format!( - "BUG: Want {} must exist when cancel event received", - event.want_id - )); - - let canceled = match want { - Want::New(new_want) => { - Want::Canceled(new_want.cancel(event.source.clone(), event.comment.clone())) - } - Want::Idle(idle) => { - Want::Canceled(idle.cancel(event.source.clone(), event.comment.clone())) - } - Want::Building(building) => Want::Canceled(building.cancel( - event.source.clone(), - current_timestamp(), - event.comment.clone(), - )), - Want::UpstreamBuilding(upstream) => Want::Canceled(upstream.cancel( - event.source.clone(), - current_timestamp(), - event.comment.clone(), - )), - // Terminal states: panic because API should have prevented this - Want::Successful(_) | Want::Failed(_) | Want::UpstreamFailed(_) | Want::Canceled(_) => { - panic!( - "BUG: Received WantCancelEvent for want {} in terminal state {:?}. API layer should prevent this.", - event.want_id, want - ); - } - }; - self.wants.insert(event.want_id.clone(), canceled); - - vec![] - } - - fn handle_job_run_buffer(&mut self, event: &JobRunBufferEventV1) -> Vec { - // No job run should exist - if it does, that's a BUG in the orchestrator - if self.job_runs.get(&event.job_run_id).is_some() { - panic!( - "BUG: Job run ID collision on job run ID {}. Orchestrator should generate unique IDs.", - event.job_run_id - ); - } - - // Create job run in Queued state - let queued: JobRunWithState = event.clone().into(); - - // Transition wants to Building - // Valid states when job buffer event arrives: - // - Idle: First job starting for this want (normal case) - // - Building: Another job already started for this want (multiple jobs can service same want) - // Invalid states (panic - indicates orchestrator bug): - // - UpstreamBuilding: Not schedulable, waiting for dependencies - // - Successful/Failed/UpstreamFailed/Canceled: Terminal states, not schedulable - for wap in &queued.info.servicing_wants { - let want = self.wants.remove(&wap.want_id).expect(&format!( - "BUG: Want {} must exist when job buffer event received", - wap.want_id - )); - - let transitioned = match want { - Want::New(new_want) => { - // Want was just created and hasn't fully sensed yet - transition to Building - // This can happen if want creation and job buffer happen in quick succession - tracing::info!( - want_id = %wap.want_id, - job_run_id = %event.job_run_id, - "Want: New → Building (job scheduled before sensing completed)" - ); - Want::Building(new_want.to_building(current_timestamp())) - } - Want::Idle(idle) => { - // First job starting for this want - tracing::info!( - want_id = %wap.want_id, - job_run_id = %event.job_run_id, - "Want: Idle → Building (job scheduled)" - ); - Want::Building(idle.start_building(current_timestamp())) - } - Want::Building(building) => { - // Another job already started, stay in Building (no-op) - Want::Building(building) - } - _ => { - panic!( - "BUG: Want {} in invalid state {:?} for job buffer. Only New, Idle or Building wants should be scheduled.", - wap.want_id, want - ); - } - }; - - self.wants.insert(wap.want_id.clone(), transitioned); - } - - // Get building partition refs from queued job - job is source of truth for building partitions - let building_refs: Vec = queued - .info - .building_partitions - .iter() - .map(|p| BuildingPartitionRef(p.clone())) - .collect(); - - // Transition partitions to Building state - self.transition_partitions_to_building(&building_refs, &event.job_run_id); - - self.job_runs - .insert(event.job_run_id.clone(), JobRun::Queued(queued)); - vec![] - } - - fn handle_job_run_heartbeat(&mut self, event: &JobRunHeartbeatEventV1) -> Vec { - let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!( - "BUG: Job run {} must exist when heartbeat received", - event.job_run_id - )); - - let running = match job_run { - // First heartbeat: Queued -> Running - JobRun::Queued(queued) => { - tracing::info!( - job_run_id = %event.job_run_id, - "JobRun: Queued → Running" - ); - queued.start_running(current_timestamp()) - } - // Subsequent heartbeat: update timestamp - JobRun::Running(running) => running.heartbeat(current_timestamp()), - _ => { - panic!( - "BUG: Heartbeat received for job run {} in invalid state {:?}", - event.job_run_id, job_run - ); - } - }; - - self.job_runs - .insert(event.job_run_id.clone(), JobRun::Running(running)); - vec![] - } - - fn handle_job_run_success(&mut self, event: &JobRunSuccessEventV1) -> Vec { - let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!( - "BUG: Job run {} must exist when success event received", - event.job_run_id - )); - - let succeeded = match job_run { - JobRun::Running(running) => { - tracing::info!( - job_run_id = %event.job_run_id, - "JobRun: Running → Succeeded" - ); - running.succeed(current_timestamp()) - } - _ => { - panic!( - "BUG: Success event received for job run {} in invalid state {:?}. Job must be Running to succeed.", - event.job_run_id, job_run - ); - } - }; - - // Job run success is SOURCE of truth that partitions are live - let newly_live_partitions = succeeded.get_completed_partitions(); - - // Update partitions being built by this job (strict type-safe transitions) - self.transition_partitions_to_live( - &newly_live_partitions, - &event.job_run_id, - current_timestamp(), - ); - - // UpstreamBuilding → UpForRetry (for downstream partitions waiting on newly live partitions) - self.unblock_downstream_partitions(&newly_live_partitions); - - // Building → Successful (when all partitions Live) - let newly_successful_wants: Vec = self.complete_successful_wants( - &newly_live_partitions, - &event.job_run_id, - current_timestamp(), - ); - - // UpstreamBuilding → Idle/Building (for downstream wants waiting on newly successful wants) - self.unblock_downstream_wants( - &newly_successful_wants, - &event.job_run_id, - current_timestamp(), - ); - - self.job_runs - .insert(event.job_run_id.clone(), JobRun::Succeeded(succeeded)); - vec![] - } - - fn handle_job_run_failure(&mut self, event: &JobRunFailureEventV1) -> Vec { - let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!( - "BUG: Job run {} must exist when failure event received", - event.job_run_id - )); - - let failed = match job_run { - JobRun::Running(running) => { - tracing::info!( - job_run_id = %event.job_run_id, - reason = %event.reason, - "JobRun: Running → Failed" - ); - running.fail(current_timestamp(), event.reason.clone()) - } - _ => { - panic!( - "BUG: Failure event received for job run {} in invalid state {:?}. Job must be Running to fail.", - event.job_run_id, job_run - ); - } - }; - - // Job run failure is SOURCE of truth that partitions failed - let failed_partitions = failed.get_failed_partitions(); - - // Transition partitions using strict type-safe methods - self.transition_partitions_to_failed( - &failed_partitions, - &event.job_run_id, - current_timestamp(), - ); - - // UpstreamBuilding → UpstreamFailed (for downstream partitions waiting on failed upstreams) - self.cascade_failures_to_downstream_partitions(&failed_partitions); - - // Building → Failed (for wants directly building failed partitions) - let newly_failed_wants: Vec = - self.fail_directly_affected_wants(&failed_partitions); - - // UpstreamBuilding → UpstreamFailed (for downstream wants waiting on newly failed wants) - self.cascade_failures_to_downstream_wants(&newly_failed_wants, current_timestamp()); - - self.job_runs - .insert(event.job_run_id.clone(), JobRun::Failed(failed)); - vec![] - } - - fn handle_job_run_cancel(&mut self, event: &JobRunCancelEventV1) -> Vec { - let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!( - "BUG: Job run {} must exist when cancel event received", - event.job_run_id - )); - - let canceled = match job_run { - JobRun::Queued(queued) => queued.cancel( - current_timestamp(), - event.source.clone(), - event.comment.clone().unwrap_or_default(), - ), - JobRun::Running(running) => running.cancel( - current_timestamp(), - event.source.clone(), - event.comment.clone().unwrap_or_default(), - ), - _ => { - panic!( - "BUG: Cancel event received for job run {} in invalid state {:?}", - event.job_run_id, job_run - ); - } - }; - - // Canceled job means building partitions should be removed (they never completed). - // In the new model without Missing state, partitions are only created when jobs - // start building them, and removed if the job is canceled before completion. - let building_refs_to_reset = canceled.get_building_partitions_to_reset(); - for building_ref in &building_refs_to_reset { - // Remove from UUID map and canonical map - if let Some(uuid) = self.canonical_partitions.remove(&building_ref.0.r#ref) { - self.partitions_by_uuid.remove(&uuid); - } - tracing::info!( - partition = %building_ref.0.r#ref, - "Partition removed (job canceled)" - ); - } - - self.job_runs - .insert(event.job_run_id.clone(), JobRun::Canceled(canceled)); - vec![] - } - - pub fn handle_job_run_dep_miss(&mut self, event: &JobRunMissingDepsEventV1) -> Vec { - let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!( - "BUG: Job run {} must exist when dep miss event received", - event.job_run_id - )); - - let dep_miss = match job_run { - JobRun::Running(running) => { - tracing::info!( - job_run_id = %event.job_run_id, - missing_deps = ?event.missing_deps.iter() - .flat_map(|md| md.missing.iter().map(|p| &p.r#ref)) - .collect::>(), - "JobRun: Running → DepMiss (missing dependencies detected)" - ); - running.dep_miss( - current_timestamp(), - event.missing_deps.clone(), - event.read_deps.clone(), - ) - } - _ => { - panic!( - "BUG: DepMiss event received for job run {} in invalid state {:?}. Job must be Running to hit dep miss.", - event.job_run_id, job_run - ); - } - }; - - // Infer data/SLA timestamps from servicing wants - let want_timestamps: WantTimestamps = dep_miss - .info - .servicing_wants - .iter() - .flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into())) - .reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b)) - .expect("BUG: No servicing wants found"); - - // Collect all missing deps into a flat list of partition refs - let all_missing_deps: Vec = event - .missing_deps - .iter() - .flat_map(|md| md.missing.clone()) - .collect(); - - // Transition partitions from Building to UpstreamBuilding since this job can't build them yet - let building_refs_to_reset = dep_miss.get_building_partitions_to_reset(); - self.transition_partitions_to_upstream_building(&building_refs_to_reset, all_missing_deps); - - // Generate WantCreateV1 events for the missing dependencies - // These events will be returned and appended to the BEL by BuildEventLog.append_event() - let want_events = missing_deps_to_want_events( - dep_miss.get_missing_deps().to_vec(), - &event.job_run_id, - want_timestamps, - ); - - // Store the job run in DepMiss state so we can access the missing_deps later - // When the derivative WantCreateV1 events get processed by handle_want_create(), - // they will look up this job run and use handle_derivative_want_creation() to - // transition impacted wants to UpstreamBuilding with the correct want IDs. - // - // KEY: We do NOT transition wants here because the want_events have randomly generated UUIDs - // that won't match during replay. Instead, we transition wants when processing the actual - // WantCreateV1 events that get written to and read from the BEL. - self.job_runs - .insert(event.job_run_id.clone(), JobRun::DepMiss(dep_miss)); - - // Return derivative want events to be appended to the BEL - want_events - } - - fn handle_taint_create(&mut self, event: &TaintCreateEventV1) -> Vec { - // Store the taint detail - let taint_detail = TaintDetail { - taint_id: event.taint_id.clone(), - root_taint_id: event.root_taint_id.clone(), - parent_taint_id: event.parent_taint_id.clone(), - partitions: event.partitions.clone(), - source: event.source.clone(), - comment: event.comment.clone(), - }; - self.taints.insert(event.taint_id.clone(), taint_detail); - - // Transition affected partitions to Tainted state - for pref in &event.partitions { - if let Some(partition) = self.take_canonical_partition(&pref.r#ref) { - let transitioned = match partition { - Partition::Live(live) => { - tracing::info!( - partition = %pref.r#ref, - taint_id = %event.taint_id, - "Partition: Live → Tainted" - ); - Partition::Tainted(live.taint(event.taint_id.clone(), current_timestamp())) - } - Partition::Tainted(tainted) => { - // Add additional taint to already-tainted partition - tracing::info!( - partition = %pref.r#ref, - taint_id = %event.taint_id, - "Partition: Tainted → Tainted (adding taint)" - ); - Partition::Tainted(tainted.add_taint(event.taint_id.clone())) - } - other => { - // For non-Live/Tainted partitions (Building, UpstreamBuilding, etc.), - // we can't taint them - log a warning and skip - tracing::warn!( - partition = %pref.r#ref, - taint_id = %event.taint_id, - state = ?other, - "Cannot taint partition in non-Live state, skipping" - ); - other - } - }; - self.update_partition(transitioned); - } else { - // Partition doesn't exist yet - this is fine, taint will apply when it's built - tracing::debug!( - partition = %pref.r#ref, - taint_id = %event.taint_id, - "Taint targeting non-existent partition, will apply when built" - ); - } - } - - vec![] - } - - fn handle_taint_delete(&mut self, event: &TaintCancelEventV1) -> Vec { - // Remove the taint from our tracking - if let Some(taint) = self.taints.remove(&event.taint_id) { - tracing::info!( - taint_id = %event.taint_id, - partitions = ?taint.partitions.iter().map(|p| &p.r#ref).collect::>(), - "Taint canceled/deleted" - ); - - // Note: We do NOT automatically un-taint partitions when a taint is canceled. - // Once tainted, partitions remain tainted until they are rebuilt. - // The taint_ids on the partition are historical records of why it was tainted. - } else { - tracing::warn!( - taint_id = %event.taint_id, - "Attempted to cancel non-existent taint" - ); - } - - vec![] - } - - fn with_wants(self, wants: BTreeMap) -> Self { - Self { wants, ..self } - } - - #[cfg(test)] - fn with_partitions(self, old_partitions: BTreeMap) -> Self { - use crate::partition_state::PartitionWithState; - - let mut canonical_partitions: BTreeMap = BTreeMap::new(); - let mut partitions_by_uuid: BTreeMap = BTreeMap::new(); - - // Convert PartitionDetail to Live partitions for testing - for (key, detail) in old_partitions { - let partition_ref = detail.r#ref.clone().unwrap_or_default(); - // Create a deterministic UUID for test data - let uuid = - crate::partition_state::derive_partition_uuid("test_job_run", &partition_ref.r#ref); - let live_partition = Partition::Live(PartitionWithState { - uuid, - partition_ref, - state: crate::partition_state::LiveState { - built_at: 0, - built_by: "test_job_run".to_string(), - }, - }); - - canonical_partitions.insert(key, uuid); - partitions_by_uuid.insert(uuid, live_partition); - } - - Self { - canonical_partitions, - partitions_by_uuid, - ..self - } - } - - pub fn get_want(&self, want_id: &str) -> Option { - self.wants.get(want_id).map(|w| w.to_detail()) - } - pub fn get_taint(&self, taint_id: &str) -> Option { - self.taints.get(taint_id).cloned() - } - pub fn get_partition(&self, partition_id: &str) -> Option { - self.get_canonical_partition(partition_id) - .map(|p| p.to_detail()) - } - pub fn get_job_run(&self, job_run_id: &str) -> Option { - self.job_runs.get(job_run_id).map(|jr| jr.to_detail()) - } - - pub fn list_wants(&self, request: &ListWantsRequest) -> ListWantsResponse { - let page = request.page.unwrap_or(0); - let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE); - - let start = page * page_size; - - // Paginate first, then convert only the needed wants to WantDetail - let data: Vec = self - .wants - .values() - .skip(start as usize) - .take(page_size as usize) - .map(|w| w.to_detail()) - .collect(); - - ListWantsResponse { - data, - match_count: self.wants.len() as u64, - page, - page_size, - } - } - - pub fn list_taints(&self, request: &ListTaintsRequest) -> ListTaintsResponse { - let page = request.page.unwrap_or(0); - let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE); - ListTaintsResponse { - data: list_state_items(&self.taints, page, page_size), - match_count: self.wants.len() as u64, - page, - page_size, - } - } - - pub fn list_partitions(&self, request: &ListPartitionsRequest) -> ListPartitionsResponse { - let page = request.page.unwrap_or(0); - let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE); - // Convert canonical partitions to PartitionDetail for API - let partition_details: BTreeMap = self - .canonical_partitions - .iter() - .filter_map(|(k, uuid)| { - self.partitions_by_uuid - .get(uuid) - .map(|p| (k.clone(), p.to_detail())) - }) - .collect(); - ListPartitionsResponse { - data: list_state_items(&partition_details, page, page_size), - match_count: self.canonical_partitions.len() as u64, - page, - page_size, - } - } - - pub fn list_job_runs(&self, request: &ListJobRunsRequest) -> ListJobRunsResponse { - let page = request.page.unwrap_or(0); - let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE); - - let start = page * page_size; - let data: Vec = self - .job_runs - .values() - .skip(start as usize) - .take(page_size as usize) - .map(|jr| jr.to_detail()) - .collect(); - - ListJobRunsResponse { - data, - match_count: self.job_runs.len() as u64, - page, - page_size, - } - } - - /** - Wants are schedulable when their upstreams are ready and target partitions are not already building - */ - pub fn want_schedulability(&self, want: &WantDetail) -> WantSchedulability { - // Check upstream partition statuses (dependencies) - let mut live: Vec = Vec::new(); - let mut tainted: Vec = Vec::new(); - let mut not_ready: Vec = Vec::new(); // Partitions that don't exist or aren't Live - - for upstream_ref in &want.upstreams { - match self.get_canonical_partition(&upstream_ref.r#ref) { - Some(partition) => { - match partition { - Partition::Live(p) => live.push(p.get_ref()), - Partition::Tainted(p) => tainted.push(p.get_ref()), - // All other states (Building, UpstreamBuilding, UpForRetry, Failed, UpstreamFailed) mean upstream is not ready - _ => not_ready.push(upstream_ref.clone()), - } - } - None => { - // Partition doesn't exist yet - it's not ready - not_ready.push(upstream_ref.clone()); - } - } - } - - // Check target partition statuses (what this want is trying to build) - // If any target partition is already Building, this want should wait - let mut building: Vec = Vec::new(); - for target_ref in &want.partitions { - if let Some(partition) = self.get_canonical_partition(&target_ref.r#ref) { - if let Partition::Building(p) = partition { - building.push(p.get_ref()); - } - } - } - - WantSchedulability { - want: want.clone(), - status: WantUpstreamStatus { - live, - tainted, - not_ready, - building, - }, - } - } - - pub fn wants_schedulability(&self) -> WantsSchedulability { - WantsSchedulability( - self.wants - .values() - // Use type-safe is_schedulable() - only Idle wants are schedulable - .filter(|w| w.is_schedulable()) - .map(|w| self.want_schedulability(&w.to_detail())) - .collect(), - ) - } -} - -/// The status of partitions required by a want to build (sensed from dep miss job run) -#[derive(Debug, Clone, Serialize, Deserialize, Default)] -pub struct WantUpstreamStatus { - pub live: Vec, - pub tainted: Vec, - /// Upstream partitions that are not ready (don't exist, or are in Building/UpstreamBuilding/UpForRetry/Failed/UpstreamFailed states) - pub not_ready: Vec, - /// Target partitions that are currently being built by another job - pub building: Vec, -} - -#[derive(Debug, Clone, Serialize, Deserialize, Default)] -pub struct WantSchedulability { - pub want: WantDetail, - pub status: WantUpstreamStatus, -} - -#[derive(Debug, Clone, Serialize, Deserialize, Default)] -pub struct WantsSchedulability(pub Vec); - -impl WantsSchedulability { - pub fn schedulable_wants(self) -> Vec { - self.0 - .iter() - .filter_map(|ws| match ws.is_schedulable() { - false => None, - true => Some(ws.want.clone()), - }) - .collect() - } -} - -impl WantSchedulability { - pub fn is_schedulable(&self) -> bool { - // Want is schedulable if: - // - No not-ready upstream dependencies (must all be Live or Tainted) - // - No tainted upstream dependencies - // - No target partitions currently being built by another job - self.status.not_ready.is_empty() - && self.status.tainted.is_empty() - && self.status.building.is_empty() - } -} - -fn list_state_items(map: &BTreeMap, page: u64, page_size: u64) -> Vec { - // TODO when we add filtering, can we add it generically via some trait or filter object that can be provided? - let start = page * page_size; - let end = start + page_size; - map.values() - .skip(start as usize) - .take(end as usize) - .cloned() - .collect() -} - -mod consts { - pub const DEFAULT_PAGE_SIZE: u64 = 100; -} - -#[cfg(test)] -mod tests { - mod schedulable_wants { - use crate::build_state::BuildState; - use crate::want_state::{IdleState as WantIdleState, Want, WantInfo, WantWithState}; - use crate::{PartitionDetail, PartitionRef, PartitionStatus, WantDetail, WantStatus}; - use std::collections::BTreeMap; - - impl WantDetail { - fn with_partitions(self, partitions: Vec) -> Self { - Self { partitions, ..self } - } - fn with_upstreams(self, upstreams: Vec) -> Self { - Self { upstreams, ..self } - } - fn with_status(self, status: Option) -> Self { - Self { status, ..self } - } - } - - impl PartitionDetail { - fn with_status(self, status: Option) -> Self { - Self { status, ..self } - } - fn with_ref(self, r#ref: Option) -> Self { - Self { r#ref, ..self } - } - } - - #[test] - fn test_empty_wants_noop() { - assert_eq!(BuildState::default().wants_schedulability().0.len(), 0); - } - - // A want with satisfied upstreams (incl "none") should be schedulable - #[test] - fn test_simple_want_with_live_upstream_is_schedulable() { - // Given... - let test_partition = "test_partition"; - let state = BuildState::default() - .with_wants(BTreeMap::from([( - "foo".to_string(), - Want::Idle(WantWithState { - want: WantInfo { - partitions: vec![test_partition.into()], - ..Default::default() - }, - state: WantIdleState {}, - }), - )])) - .with_partitions(BTreeMap::from([( - test_partition.to_string(), - PartitionDetail::default().with_ref(Some(test_partition.into())), - )])); - - // Should... - let schedulability = state.wants_schedulability(); - let ws = schedulability.0.first().unwrap(); - assert!(ws.is_schedulable()); - } - } - - mod sqlite_build_state { - mod want { - use crate::build_state::BuildState; - use crate::data_build_event::Event; - use crate::{MissingDeps, WantCancelEventV1, WantCreateEventV1, WantDetail}; - - #[test] - fn test_should_create_want() { - let mut e = WantCreateEventV1::default(); - e.want_id = "1234".to_string(); - e.partitions = vec!["mypart".into()]; - - let mut state = BuildState::default(); - state.handle_event(&e.clone().into()); - let want = state.get_want("1234").unwrap(); - let mut expected: WantDetail = e.into(); - // Into will set this field as current timestamp - expected.last_updated_timestamp = want.last_updated_timestamp; - assert_eq!(want, expected); - } - - #[test] - fn test_should_cancel_want() { - let mut e = WantCreateEventV1::default(); - e.want_id = "1234".to_string(); - e.partitions = vec!["mypart".into()]; - - let mut state = BuildState::default(); - state.handle_event(&e.clone().into()); - - // Should be able to cancel - let mut e = WantCancelEventV1::default(); - e.want_id = "1234".to_string(); - state.handle_event(&e.clone().into()); - let want = state.get_want("1234").unwrap(); - - assert_eq!( - want.status, - Some(crate::WantStatusCode::WantCanceled.into()) - ); - } - - #[test] - fn test_multihop_dependency_replay() { - use crate::data_build_event::Event; - use crate::{ - JobRunBufferEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, - JobRunSuccessEventV1, MissingDeps, PartitionRef, WantAttributedPartitions, - WantCreateEventV1, - }; - - let mut state = BuildState::default(); - let mut events = vec![]; - - // 1. Create want for data/beta - let beta_want_id = "beta-want".to_string(); - let mut create_beta = WantCreateEventV1::default(); - create_beta.want_id = beta_want_id.clone(); - create_beta.partitions = vec![PartitionRef { - r#ref: "data/beta".to_string(), - }]; - events.push(Event::WantCreateV1(create_beta)); - - // 2. Queue beta job (first attempt) - let beta_job_1_id = "beta-job-1".to_string(); - let mut buffer_beta_1 = JobRunBufferEventV1::default(); - buffer_beta_1.job_run_id = beta_job_1_id.clone(); - buffer_beta_1.job_label = "//job_beta".to_string(); - buffer_beta_1.want_attributed_partitions = vec![WantAttributedPartitions { - want_id: beta_want_id.clone(), - partitions: vec![PartitionRef { - r#ref: "data/beta".to_string(), - }], - }]; - buffer_beta_1.building_partitions = vec![PartitionRef { - r#ref: "data/beta".to_string(), - }]; - events.push(Event::JobRunBufferV1(buffer_beta_1)); - - // 3. Beta job starts running - let mut heartbeat_beta_1 = JobRunHeartbeatEventV1::default(); - heartbeat_beta_1.job_run_id = beta_job_1_id.clone(); - events.push(Event::JobRunHeartbeatV1(heartbeat_beta_1)); - - // 4. Beta job reports missing dependency on data/alpha - let mut dep_miss_beta_1 = JobRunMissingDepsEventV1::default(); - dep_miss_beta_1.job_run_id = beta_job_1_id.clone(); - dep_miss_beta_1.missing_deps = vec![MissingDeps { - impacted: vec![PartitionRef { - r#ref: "data/beta".to_string(), - }], - missing: vec![PartitionRef { - r#ref: "data/alpha".to_string(), - }], - }]; - events.push(Event::JobRunMissingDepsV1(dep_miss_beta_1)); - - // 5. Create derivative want for data/alpha - let alpha_want_id = "alpha-want".to_string(); - let mut create_alpha = WantCreateEventV1::default(); - create_alpha.want_id = alpha_want_id.clone(); - create_alpha.partitions = vec![PartitionRef { - r#ref: "data/alpha".to_string(), - }]; - events.push(Event::WantCreateV1(create_alpha)); - - // 6. Queue alpha job - let alpha_job_id = "alpha-job".to_string(); - let mut buffer_alpha = JobRunBufferEventV1::default(); - buffer_alpha.job_run_id = alpha_job_id.clone(); - buffer_alpha.job_label = "//job_alpha".to_string(); - buffer_alpha.want_attributed_partitions = vec![WantAttributedPartitions { - want_id: alpha_want_id.clone(), - partitions: vec![PartitionRef { - r#ref: "data/alpha".to_string(), - }], - }]; - buffer_alpha.building_partitions = vec![PartitionRef { - r#ref: "data/alpha".to_string(), - }]; - events.push(Event::JobRunBufferV1(buffer_alpha)); - - // 7. Alpha job starts running - let mut heartbeat_alpha = JobRunHeartbeatEventV1::default(); - heartbeat_alpha.job_run_id = alpha_job_id.clone(); - events.push(Event::JobRunHeartbeatV1(heartbeat_alpha)); - - // 8. Alpha job succeeds - let mut success_alpha = JobRunSuccessEventV1::default(); - success_alpha.job_run_id = alpha_job_id.clone(); - events.push(Event::JobRunSuccessV1(success_alpha)); - - // 9. Queue beta job again (second attempt) - THIS IS THE CRITICAL MOMENT - let beta_job_2_id = "beta-job-2".to_string(); - let mut buffer_beta_2 = JobRunBufferEventV1::default(); - buffer_beta_2.job_run_id = beta_job_2_id.clone(); - buffer_beta_2.job_label = "//job_beta".to_string(); - buffer_beta_2.want_attributed_partitions = vec![WantAttributedPartitions { - want_id: beta_want_id.clone(), - partitions: vec![PartitionRef { - r#ref: "data/beta".to_string(), - }], - }]; - buffer_beta_2.building_partitions = vec![PartitionRef { - r#ref: "data/beta".to_string(), - }]; - events.push(Event::JobRunBufferV1(buffer_beta_2)); - - // 10. Beta job starts running - let mut heartbeat_beta_2 = JobRunHeartbeatEventV1::default(); - heartbeat_beta_2.job_run_id = beta_job_2_id.clone(); - events.push(Event::JobRunHeartbeatV1(heartbeat_beta_2)); - - // 11. Beta job succeeds - let mut success_beta_2 = JobRunSuccessEventV1::default(); - success_beta_2.job_run_id = beta_job_2_id.clone(); - events.push(Event::JobRunSuccessV1(success_beta_2)); - - // Process all events - this simulates replay - for event in &events { - state.handle_event(event); - } - - // Verify final state - let beta_want = state.get_want(&beta_want_id).unwrap(); - assert_eq!( - beta_want.status, - Some(crate::WantStatusCode::WantSuccessful.into()), - "Beta want should be successful after multi-hop dependency resolution" - ); - - let alpha_want = state.get_want(&alpha_want_id).unwrap(); - assert_eq!( - alpha_want.status, - Some(crate::WantStatusCode::WantSuccessful.into()), - "Alpha want should be successful" - ); - } - - /// Test that multiple concurrent wants for the same partition all transition correctly. - /// This was the original bug that motivated the UUID refactor. - #[test] - fn test_concurrent_wants_same_partition() { - use crate::data_build_event::Event; - use crate::{ - JobRunBufferEventV1, JobRunHeartbeatEventV1, PartitionRef, - WantAttributedPartitions, WantCreateEventV1, - }; - - let mut state = BuildState::default(); - - // 1. Create Want 1 for data/beta - let want_1_id = "want-1".to_string(); - let mut create_want_1 = WantCreateEventV1::default(); - create_want_1.want_id = want_1_id.clone(); - create_want_1.partitions = vec![PartitionRef { - r#ref: "data/beta".to_string(), - }]; - state.handle_event(&Event::WantCreateV1(create_want_1)); - - // Want 1 should be Idle (no partition exists yet) - let want_1 = state.get_want(&want_1_id).unwrap(); - assert_eq!( - want_1.status, - Some(crate::WantStatusCode::WantIdle.into()), - "Want 1 should be Idle initially" - ); - - // 2. Create Want 2 for the same partition - let want_2_id = "want-2".to_string(); - let mut create_want_2 = WantCreateEventV1::default(); - create_want_2.want_id = want_2_id.clone(); - create_want_2.partitions = vec![PartitionRef { - r#ref: "data/beta".to_string(), - }]; - state.handle_event(&Event::WantCreateV1(create_want_2)); - - // Want 2 should also be Idle - let want_2 = state.get_want(&want_2_id).unwrap(); - assert_eq!( - want_2.status, - Some(crate::WantStatusCode::WantIdle.into()), - "Want 2 should be Idle initially" - ); - - // Verify inverted index has both wants - let wants_for_beta = state.get_wants_for_partition("data/beta"); - assert!( - wants_for_beta.contains(&want_1_id), - "wants_for_partition should contain want-1" - ); - assert!( - wants_for_beta.contains(&want_2_id), - "wants_for_partition should contain want-2" - ); - - // 3. Job buffers for data/beta - both wants should transition to Building - let job_run_id = "job-1".to_string(); - let mut buffer_event = JobRunBufferEventV1::default(); - buffer_event.job_run_id = job_run_id.clone(); - buffer_event.job_label = "//job_beta".to_string(); - buffer_event.want_attributed_partitions = vec![ - WantAttributedPartitions { - want_id: want_1_id.clone(), - partitions: vec![PartitionRef { - r#ref: "data/beta".to_string(), - }], - }, - WantAttributedPartitions { - want_id: want_2_id.clone(), - partitions: vec![PartitionRef { - r#ref: "data/beta".to_string(), - }], - }, - ]; - buffer_event.building_partitions = vec![PartitionRef { - r#ref: "data/beta".to_string(), - }]; - state.handle_event(&Event::JobRunBufferV1(buffer_event)); - - // Start the job - let mut heartbeat = JobRunHeartbeatEventV1::default(); - heartbeat.job_run_id = job_run_id.clone(); - state.handle_event(&Event::JobRunHeartbeatV1(heartbeat)); - - // Both wants should now be Building - let want_1 = state.get_want(&want_1_id).unwrap(); - assert_eq!( - want_1.status, - Some(crate::WantStatusCode::WantBuilding.into()), - "Want 1 should be Building after job starts" - ); - - let want_2 = state.get_want(&want_2_id).unwrap(); - assert_eq!( - want_2.status, - Some(crate::WantStatusCode::WantBuilding.into()), - "Want 2 should be Building after job starts" - ); - - // Partition should exist and be Building - let partition = state.get_partition("data/beta").unwrap(); - assert_eq!( - partition.status, - Some(crate::PartitionStatusCode::PartitionBuilding.into()), - "Partition should be Building" - ); - } - } - - mod partition_lifecycle { - use crate::build_state::BuildState; - use crate::data_build_event::Event; - use crate::{ - JobRunBufferEventV1, JobRunFailureEventV1, JobRunHeartbeatEventV1, - JobRunMissingDepsEventV1, JobRunSuccessEventV1, MissingDeps, PartitionRef, - WantAttributedPartitions, WantCreateEventV1, - }; - - /// Test that upstream failure cascades to downstream partitions. - /// When an upstream partition fails, downstream partitions in UpstreamBuilding - /// should transition to UpstreamFailed. - #[test] - fn test_upstream_failure_cascades_to_downstream() { - let mut state = BuildState::default(); - - // 1. Create want for data/beta - let beta_want_id = "beta-want".to_string(); - let mut create_beta = WantCreateEventV1::default(); - create_beta.want_id = beta_want_id.clone(); - create_beta.partitions = vec![PartitionRef { - r#ref: "data/beta".to_string(), - }]; - state.handle_event(&Event::WantCreateV1(create_beta)); - - // 2. Job buffers for beta - let beta_job_id = "beta-job".to_string(); - let mut buffer_beta = JobRunBufferEventV1::default(); - buffer_beta.job_run_id = beta_job_id.clone(); - buffer_beta.job_label = "//job_beta".to_string(); - buffer_beta.want_attributed_partitions = vec![WantAttributedPartitions { - want_id: beta_want_id.clone(), - partitions: vec![PartitionRef { - r#ref: "data/beta".to_string(), - }], - }]; - buffer_beta.building_partitions = vec![PartitionRef { - r#ref: "data/beta".to_string(), - }]; - state.handle_event(&Event::JobRunBufferV1(buffer_beta)); - - // Start beta job - let mut heartbeat_beta = JobRunHeartbeatEventV1::default(); - heartbeat_beta.job_run_id = beta_job_id.clone(); - state.handle_event(&Event::JobRunHeartbeatV1(heartbeat_beta)); - - // 3. Beta job dep misses on data/alpha - // This returns derivative want events that we MUST process - let mut dep_miss = JobRunMissingDepsEventV1::default(); - dep_miss.job_run_id = beta_job_id.clone(); - dep_miss.missing_deps = vec![MissingDeps { - impacted: vec![PartitionRef { - r#ref: "data/beta".to_string(), - }], - missing: vec![PartitionRef { - r#ref: "data/alpha".to_string(), - }], - }]; - let derivative_events = state.handle_event(&Event::JobRunMissingDepsV1(dep_miss)); - - // Beta partition should be UpstreamBuilding - let beta_partition = state.get_partition("data/beta").unwrap(); - assert_eq!( - beta_partition.status.as_ref().unwrap().name, - "PartitionUpstreamBuilding", - "Beta partition should be UpstreamBuilding after dep miss" - ); - - // 4. Process the derivative want event for alpha - // This will create the alpha want AND transition beta want to UpstreamBuilding - assert_eq!( - derivative_events.len(), - 1, - "Should have one derivative want event" - ); - let alpha_want_id = match &derivative_events[0] { - Event::WantCreateV1(e) => e.want_id.clone(), - _ => panic!("Expected WantCreateV1 event"), - }; - state.handle_event(&derivative_events[0]); - - // Now beta want should be UpstreamBuilding (waiting for alpha want) - let beta_want_after_derivative = state.get_want(&beta_want_id).unwrap(); - assert_eq!( - beta_want_after_derivative.status, - Some(crate::WantStatusCode::WantUpstreamBuilding.into()), - "Beta want should be UpstreamBuilding after derivative want processed" - ); - - // 5. Job buffers for alpha - let alpha_job_id = "alpha-job".to_string(); - let mut buffer_alpha = JobRunBufferEventV1::default(); - buffer_alpha.job_run_id = alpha_job_id.clone(); - buffer_alpha.job_label = "//job_alpha".to_string(); - buffer_alpha.want_attributed_partitions = vec![WantAttributedPartitions { - want_id: alpha_want_id.clone(), - partitions: vec![PartitionRef { - r#ref: "data/alpha".to_string(), - }], - }]; - buffer_alpha.building_partitions = vec![PartitionRef { - r#ref: "data/alpha".to_string(), - }]; - state.handle_event(&Event::JobRunBufferV1(buffer_alpha)); - - // Start alpha job - let mut heartbeat_alpha = JobRunHeartbeatEventV1::default(); - heartbeat_alpha.job_run_id = alpha_job_id.clone(); - state.handle_event(&Event::JobRunHeartbeatV1(heartbeat_alpha)); - - // 6. Alpha job FAILS - let mut failure_alpha = JobRunFailureEventV1::default(); - failure_alpha.job_run_id = alpha_job_id.clone(); - failure_alpha.reason = "Test failure".to_string(); - state.handle_event(&Event::JobRunFailureV1(failure_alpha)); - - // Alpha partition should be Failed - let alpha_partition = state.get_partition("data/alpha").unwrap(); - assert_eq!( - alpha_partition.status, - Some(crate::PartitionStatusCode::PartitionFailed.into()), - "Alpha partition should be Failed" - ); - - // Beta partition should cascade to UpstreamFailed - let beta_partition = state.get_partition("data/beta").unwrap(); - assert_eq!( - beta_partition.status.as_ref().unwrap().name, - "PartitionUpstreamFailed", - "Beta partition should cascade to UpstreamFailed when alpha fails" - ); - - // Beta want should also be UpstreamFailed - let beta_want = state.get_want(&beta_want_id).unwrap(); - assert_eq!( - beta_want.status, - Some(crate::WantStatusCode::WantUpstreamFailed.into()), - "Beta want should be UpstreamFailed" - ); - } - - /// Test that partition retry creates a new UUID while preserving the old one. - #[test] - fn test_partition_retry_creates_new_uuid() { - let mut state = BuildState::default(); - - // 1. Create want for data/beta - let beta_want_id = "beta-want".to_string(); - let mut create_beta = WantCreateEventV1::default(); - create_beta.want_id = beta_want_id.clone(); - create_beta.partitions = vec![PartitionRef { - r#ref: "data/beta".to_string(), - }]; - state.handle_event(&Event::WantCreateV1(create_beta)); - - // 2. First job buffers for beta (creates uuid-1) - let beta_job_1_id = "beta-job-1".to_string(); - let mut buffer_beta_1 = JobRunBufferEventV1::default(); - buffer_beta_1.job_run_id = beta_job_1_id.clone(); - buffer_beta_1.job_label = "//job_beta".to_string(); - buffer_beta_1.want_attributed_partitions = vec![WantAttributedPartitions { - want_id: beta_want_id.clone(), - partitions: vec![PartitionRef { - r#ref: "data/beta".to_string(), - }], - }]; - buffer_beta_1.building_partitions = vec![PartitionRef { - r#ref: "data/beta".to_string(), - }]; - state.handle_event(&Event::JobRunBufferV1(buffer_beta_1)); - - // Get UUID-1 - let partition_after_first_buffer = state.get_partition("data/beta").unwrap(); - let uuid_1 = partition_after_first_buffer.uuid.clone(); - - // Start job - let mut heartbeat_1 = JobRunHeartbeatEventV1::default(); - heartbeat_1.job_run_id = beta_job_1_id.clone(); - state.handle_event(&Event::JobRunHeartbeatV1(heartbeat_1)); - - // 3. Job dep misses on data/alpha - let mut dep_miss = JobRunMissingDepsEventV1::default(); - dep_miss.job_run_id = beta_job_1_id.clone(); - dep_miss.missing_deps = vec![MissingDeps { - impacted: vec![PartitionRef { - r#ref: "data/beta".to_string(), - }], - missing: vec![PartitionRef { - r#ref: "data/alpha".to_string(), - }], - }]; - state.handle_event(&Event::JobRunMissingDepsV1(dep_miss)); - - // 4. Create and complete alpha (to satisfy the dep) - let alpha_want_id = "alpha-want".to_string(); - let mut create_alpha = WantCreateEventV1::default(); - create_alpha.want_id = alpha_want_id.clone(); - create_alpha.partitions = vec![PartitionRef { - r#ref: "data/alpha".to_string(), - }]; - state.handle_event(&Event::WantCreateV1(create_alpha)); - - let alpha_job_id = "alpha-job".to_string(); - let mut buffer_alpha = JobRunBufferEventV1::default(); - buffer_alpha.job_run_id = alpha_job_id.clone(); - buffer_alpha.job_label = "//job_alpha".to_string(); - buffer_alpha.want_attributed_partitions = vec![WantAttributedPartitions { - want_id: alpha_want_id.clone(), - partitions: vec![PartitionRef { - r#ref: "data/alpha".to_string(), - }], - }]; - buffer_alpha.building_partitions = vec![PartitionRef { - r#ref: "data/alpha".to_string(), - }]; - state.handle_event(&Event::JobRunBufferV1(buffer_alpha)); - - let mut heartbeat_alpha = JobRunHeartbeatEventV1::default(); - heartbeat_alpha.job_run_id = alpha_job_id.clone(); - state.handle_event(&Event::JobRunHeartbeatV1(heartbeat_alpha)); - - let mut success_alpha = JobRunSuccessEventV1::default(); - success_alpha.job_run_id = alpha_job_id.clone(); - state.handle_event(&Event::JobRunSuccessV1(success_alpha)); - - // Beta partition should now be UpForRetry - let beta_partition = state.get_partition("data/beta").unwrap(); - assert_eq!( - beta_partition.status.as_ref().unwrap().name, - "PartitionUpForRetry", - "Beta partition should be UpForRetry after alpha succeeds" - ); - - // 5. Second job buffers for beta retry (creates uuid-2) - let beta_job_2_id = "beta-job-2".to_string(); - let mut buffer_beta_2 = JobRunBufferEventV1::default(); - buffer_beta_2.job_run_id = beta_job_2_id.clone(); - buffer_beta_2.job_label = "//job_beta".to_string(); - buffer_beta_2.want_attributed_partitions = vec![WantAttributedPartitions { - want_id: beta_want_id.clone(), - partitions: vec![PartitionRef { - r#ref: "data/beta".to_string(), - }], - }]; - buffer_beta_2.building_partitions = vec![PartitionRef { - r#ref: "data/beta".to_string(), - }]; - state.handle_event(&Event::JobRunBufferV1(buffer_beta_2)); - - // Get UUID-2 - let partition_after_retry = state.get_partition("data/beta").unwrap(); - let uuid_2 = partition_after_retry.uuid.clone(); - - // UUIDs should be different - assert_ne!( - uuid_1, uuid_2, - "New job should create a new UUID for the partition" - ); - - // New partition should be Building - assert_eq!( - partition_after_retry.status, - Some(crate::PartitionStatusCode::PartitionBuilding.into()), - "Retry partition should be Building" - ); - } - } - } -} diff --git a/databuild/build_state/event_handlers.rs b/databuild/build_state/event_handlers.rs new file mode 100644 index 0000000..52f050a --- /dev/null +++ b/databuild/build_state/event_handlers.rs @@ -0,0 +1,1196 @@ +//! Event handlers for BuildState +//! +//! Each handler processes a specific event type, updating state and potentially +//! returning derivative events to be appended to the BEL. + +use crate::data_build_event::Event; +use crate::data_deps::{WantTimestamps, missing_deps_to_want_events}; +use crate::event_source::Source as EventSourceVariant; +use crate::job_run_state::{JobRun, JobRunWithState, QueuedState as JobQueuedState}; +use crate::partition_state::{BuildingPartitionRef, Partition}; +use crate::util::current_timestamp; +use crate::want_state::{NewState as WantNewState, Want, WantWithState}; +use crate::{ + JobRunBufferEventV1, JobRunCancelEventV1, JobRunFailureEventV1, JobRunHeartbeatEventV1, + JobRunMissingDepsEventV1, JobRunSuccessEventV1, PartitionRef, TaintCancelEventV1, + TaintCreateEventV1, TaintDetail, WantCancelEventV1, WantCreateEventV1, +}; + +use super::BuildState; + +impl BuildState { + /// Handles reacting to events, updating state, and erroring if its an invalid state transition + /// Event handlers can return vecs of events that will then be appended to the BEL + pub fn handle_event(&mut self, event: &Event) -> Vec { + match event { + // JobRun events + Event::JobRunBufferV1(e) => self.handle_job_run_buffer(e), + Event::JobRunHeartbeatV1(e) => self.handle_job_run_heartbeat(e), + Event::JobRunFailureV1(e) => self.handle_job_run_failure(e), + Event::JobRunCancelV1(e) => self.handle_job_run_cancel(e), + Event::JobRunSuccessV1(e) => self.handle_job_run_success(e), + Event::JobRunMissingDepsV1(e) => self.handle_job_run_dep_miss(e), + // Want events + Event::WantCreateV1(e) => self.handle_want_create(e), + Event::WantCancelV1(e) => self.handle_want_cancel(e), + // Taint events + Event::TaintCreateV1(e) => self.handle_taint_create(e), + Event::TaintCancelV1(e) => self.handle_taint_delete(e), + // Ruh roh! + _ => panic!("Unhandled event type! {:?}", event), + } + } + + pub(crate) fn handle_want_create(&mut self, event: &WantCreateEventV1) -> Vec { + // Create want in New state from event + let want_new: WantWithState = event.clone().into(); + + // Log creation with derivative vs user-created distinction + let is_derivative = if let Some(source) = &event.source { + if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source { + tracing::info!( + want_id = %event.want_id, + partitions = ?event.partitions.iter().map(|p| &p.r#ref).collect::>(), + source_job_run_id = %job_triggered.job_run_id, + "Want created (derivative - auto-created due to missing dependency)" + ); + true + } else { + false + } + } else { + tracing::info!( + want_id = %event.want_id, + partitions = ?event.partitions.iter().map(|p| &p.r#ref).collect::>(), + "Want created (user-requested)" + ); + false + }; + + // Register this want with all its partitions (via inverted index) + self.register_want_for_partitions(&event.want_id, &event.partitions); + + // Sense canonical partition states and determine initial want state + // Priority order: Failed > UpstreamFailed > AllLive > Building > UpstreamBuilding > UpForRetry > Idle + let mut failed_partitions: Vec = Vec::new(); + let mut upstream_failed_wants: Vec = Vec::new(); + let mut all_live = true; + let mut any_building = false; + let mut any_upstream_building = false; + let mut any_up_for_retry = false; + let mut building_started_at: Option = None; + + for pref in &event.partitions { + match self.get_canonical_partition(&pref.r#ref) { + Some(partition) => match partition { + Partition::Failed(_f) => { + failed_partitions.push(pref.clone()); + all_live = false; + } + Partition::UpstreamFailed(uf) => { + // Track which upstream refs failed + for failed_ref in &uf.state.failed_upstream_refs { + upstream_failed_wants.push(failed_ref.r#ref.clone()); + } + all_live = false; + } + Partition::Live(_) => { + // Contributes to all_live check + } + Partition::Building(_b) => { + any_building = true; + all_live = false; + // Track when building started (use earliest if multiple) + if building_started_at.is_none() { + building_started_at = Some(current_timestamp()); + } + } + Partition::UpstreamBuilding(_) => { + any_upstream_building = true; + all_live = false; + } + Partition::UpForRetry(_) => { + any_up_for_retry = true; + all_live = false; + } + Partition::Tainted(_) => { + // Tainted partitions need rebuild + all_live = false; + } + }, + None => { + // Partition doesn't exist - needs to be built + all_live = false; + } + } + } + + // Transition from New to appropriate state based on sensing + let final_want: Want = if !failed_partitions.is_empty() { + tracing::info!( + want_id = %event.want_id, + failed_partitions = ?failed_partitions.iter().map(|p| &p.r#ref).collect::>(), + "Want: New → Failed (partition already failed)" + ); + Want::Failed( + want_new.to_failed(failed_partitions, "Partition already failed".to_string()), + ) + } else if !upstream_failed_wants.is_empty() { + tracing::info!( + want_id = %event.want_id, + upstream_failed = ?upstream_failed_wants, + "Want: New → UpstreamFailed (upstream already failed)" + ); + Want::UpstreamFailed(want_new.to_upstream_failed(upstream_failed_wants)) + } else if all_live && !event.partitions.is_empty() { + tracing::info!( + want_id = %event.want_id, + "Want: New → Successful (all partitions already live)" + ); + Want::Successful(want_new.to_successful(current_timestamp())) + } else if any_building { + tracing::info!( + want_id = %event.want_id, + "Want: New → Building (partitions being built)" + ); + Want::Building( + want_new.to_building(building_started_at.unwrap_or_else(current_timestamp)), + ) + } else if any_upstream_building { + // For upstream building, we need the upstream want IDs + // For now, transition to Idle and let derivative want handling take care of it + tracing::info!( + want_id = %event.want_id, + "Want: New → Idle (upstream building - will be updated by derivative want handling)" + ); + Want::Idle(want_new.to_idle()) + } else { + // Partitions don't exist, or are UpForRetry - want is schedulable + tracing::info!( + want_id = %event.want_id, + "Want: New → Idle (ready to schedule)" + ); + Want::Idle(want_new.to_idle()) + }; + + self.wants.insert(event.want_id.clone(), final_want); + + // If this is a derivative want (triggered by a job's dep miss), transition impacted wants to UpstreamBuilding + if is_derivative { + if let Some(source) = &event.source { + if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source { + self.handle_derivative_want_creation( + &event.want_id, + &event.partitions, + &job_triggered.job_run_id, + ); + } + } + } + + vec![] + } + + pub(crate) fn handle_want_cancel(&mut self, event: &WantCancelEventV1) -> Vec { + // TODO actually cancel in-progress job runs that no longer have a sponsoring want + + // Type-safe transition (API layer should prevent canceling terminal wants) + let want = self.wants.remove(&event.want_id).expect(&format!( + "BUG: Want {} must exist when cancel event received", + event.want_id + )); + + let canceled = match want { + Want::New(new_want) => { + Want::Canceled(new_want.cancel(event.source.clone(), event.comment.clone())) + } + Want::Idle(idle) => { + Want::Canceled(idle.cancel(event.source.clone(), event.comment.clone())) + } + Want::Building(building) => Want::Canceled(building.cancel( + event.source.clone(), + current_timestamp(), + event.comment.clone(), + )), + Want::UpstreamBuilding(upstream) => Want::Canceled(upstream.cancel( + event.source.clone(), + current_timestamp(), + event.comment.clone(), + )), + // Terminal states: panic because API should have prevented this + Want::Successful(_) | Want::Failed(_) | Want::UpstreamFailed(_) | Want::Canceled(_) => { + panic!( + "BUG: Received WantCancelEvent for want {} in terminal state {:?}. API layer should prevent this.", + event.want_id, want + ); + } + }; + self.wants.insert(event.want_id.clone(), canceled); + + vec![] + } + + pub(crate) fn handle_job_run_buffer(&mut self, event: &JobRunBufferEventV1) -> Vec { + // No job run should exist - if it does, that's a BUG in the orchestrator + if self.job_runs.get(&event.job_run_id).is_some() { + panic!( + "BUG: Job run ID collision on job run ID {}. Orchestrator should generate unique IDs.", + event.job_run_id + ); + } + + // Create job run in Queued state + let queued: JobRunWithState = event.clone().into(); + + // Transition wants to Building + // Valid states when job buffer event arrives: + // - Idle: First job starting for this want (normal case) + // - Building: Another job already started for this want (multiple jobs can service same want) + // Invalid states (panic - indicates orchestrator bug): + // - UpstreamBuilding: Not schedulable, waiting for dependencies + // - Successful/Failed/UpstreamFailed/Canceled: Terminal states, not schedulable + for wap in &queued.info.servicing_wants { + let want = self.wants.remove(&wap.want_id).expect(&format!( + "BUG: Want {} must exist when job buffer event received", + wap.want_id + )); + + let transitioned = match want { + Want::New(new_want) => { + // Want was just created and hasn't fully sensed yet - transition to Building + // This can happen if want creation and job buffer happen in quick succession + tracing::info!( + want_id = %wap.want_id, + job_run_id = %event.job_run_id, + "Want: New → Building (job scheduled before sensing completed)" + ); + Want::Building(new_want.to_building(current_timestamp())) + } + Want::Idle(idle) => { + // First job starting for this want + tracing::info!( + want_id = %wap.want_id, + job_run_id = %event.job_run_id, + "Want: Idle → Building (job scheduled)" + ); + Want::Building(idle.start_building(current_timestamp())) + } + Want::Building(building) => { + // Another job already started, stay in Building (no-op) + Want::Building(building) + } + _ => { + panic!( + "BUG: Want {} in invalid state {:?} for job buffer. Only New, Idle or Building wants should be scheduled.", + wap.want_id, want + ); + } + }; + + self.wants.insert(wap.want_id.clone(), transitioned); + } + + // Get building partition refs from queued job - job is source of truth for building partitions + let building_refs: Vec = queued + .info + .building_partitions + .iter() + .map(|p| BuildingPartitionRef(p.clone())) + .collect(); + + // Transition partitions to Building state + self.transition_partitions_to_building(&building_refs, &event.job_run_id); + + self.job_runs + .insert(event.job_run_id.clone(), JobRun::Queued(queued)); + vec![] + } + + pub(crate) fn handle_job_run_heartbeat(&mut self, event: &JobRunHeartbeatEventV1) -> Vec { + let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!( + "BUG: Job run {} must exist when heartbeat received", + event.job_run_id + )); + + let running = match job_run { + // First heartbeat: Queued -> Running + JobRun::Queued(queued) => { + tracing::info!( + job_run_id = %event.job_run_id, + "JobRun: Queued → Running" + ); + queued.start_running(current_timestamp()) + } + // Subsequent heartbeat: update timestamp + JobRun::Running(running) => running.heartbeat(current_timestamp()), + _ => { + panic!( + "BUG: Heartbeat received for job run {} in invalid state {:?}", + event.job_run_id, job_run + ); + } + }; + + self.job_runs + .insert(event.job_run_id.clone(), JobRun::Running(running)); + vec![] + } + + pub(crate) fn handle_job_run_success(&mut self, event: &JobRunSuccessEventV1) -> Vec { + let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!( + "BUG: Job run {} must exist when success event received", + event.job_run_id + )); + + let succeeded = match job_run { + JobRun::Running(running) => { + tracing::info!( + job_run_id = %event.job_run_id, + "JobRun: Running → Succeeded" + ); + running.succeed(current_timestamp()) + } + _ => { + panic!( + "BUG: Success event received for job run {} in invalid state {:?}. Job must be Running to succeed.", + event.job_run_id, job_run + ); + } + }; + + // Job run success is SOURCE of truth that partitions are live + let newly_live_partitions = succeeded.get_completed_partitions(); + + // Update partitions being built by this job (strict type-safe transitions) + self.transition_partitions_to_live( + &newly_live_partitions, + &event.job_run_id, + current_timestamp(), + ); + + // UpstreamBuilding → UpForRetry (for downstream partitions waiting on newly live partitions) + self.unblock_downstream_partitions(&newly_live_partitions); + + // Building → Successful (when all partitions Live) + let newly_successful_wants = self.complete_successful_wants( + &newly_live_partitions, + &event.job_run_id, + current_timestamp(), + ); + + // UpstreamBuilding → Idle/Building (for downstream wants waiting on newly successful wants) + self.unblock_downstream_wants( + &newly_successful_wants, + &event.job_run_id, + current_timestamp(), + ); + + self.job_runs + .insert(event.job_run_id.clone(), JobRun::Succeeded(succeeded)); + vec![] + } + + pub(crate) fn handle_job_run_failure(&mut self, event: &JobRunFailureEventV1) -> Vec { + let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!( + "BUG: Job run {} must exist when failure event received", + event.job_run_id + )); + + let failed = match job_run { + JobRun::Running(running) => { + tracing::info!( + job_run_id = %event.job_run_id, + reason = %event.reason, + "JobRun: Running → Failed" + ); + running.fail(current_timestamp(), event.reason.clone()) + } + _ => { + panic!( + "BUG: Failure event received for job run {} in invalid state {:?}. Job must be Running to fail.", + event.job_run_id, job_run + ); + } + }; + + // Job run failure is SOURCE of truth that partitions failed + let failed_partitions = failed.get_failed_partitions(); + + // Transition partitions using strict type-safe methods + self.transition_partitions_to_failed( + &failed_partitions, + &event.job_run_id, + current_timestamp(), + ); + + // UpstreamBuilding → UpstreamFailed (for downstream partitions waiting on failed upstreams) + self.cascade_failures_to_downstream_partitions(&failed_partitions); + + // Building → Failed (for wants directly building failed partitions) + let newly_failed_wants = self.fail_directly_affected_wants(&failed_partitions); + + // UpstreamBuilding → UpstreamFailed (for downstream wants waiting on newly failed wants) + self.cascade_failures_to_downstream_wants(&newly_failed_wants, current_timestamp()); + + self.job_runs + .insert(event.job_run_id.clone(), JobRun::Failed(failed)); + vec![] + } + + pub(crate) fn handle_job_run_cancel(&mut self, event: &JobRunCancelEventV1) -> Vec { + let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!( + "BUG: Job run {} must exist when cancel event received", + event.job_run_id + )); + + let canceled = match job_run { + JobRun::Queued(queued) => queued.cancel( + current_timestamp(), + event.source.clone(), + event.comment.clone().unwrap_or_default(), + ), + JobRun::Running(running) => running.cancel( + current_timestamp(), + event.source.clone(), + event.comment.clone().unwrap_or_default(), + ), + _ => { + panic!( + "BUG: Cancel event received for job run {} in invalid state {:?}", + event.job_run_id, job_run + ); + } + }; + + // Canceled job means building partitions should be removed (they never completed). + // In the new model without Missing state, partitions are only created when jobs + // start building them, and removed if the job is canceled before completion. + let building_refs_to_reset = canceled.get_building_partitions_to_reset(); + for building_ref in &building_refs_to_reset { + // Remove from UUID map and canonical map + if let Some(uuid) = self.canonical_partitions.remove(&building_ref.0.r#ref) { + self.partitions_by_uuid.remove(&uuid); + } + tracing::info!( + partition = %building_ref.0.r#ref, + "Partition removed (job canceled)" + ); + } + + self.job_runs + .insert(event.job_run_id.clone(), JobRun::Canceled(canceled)); + vec![] + } + + pub fn handle_job_run_dep_miss(&mut self, event: &JobRunMissingDepsEventV1) -> Vec { + let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!( + "BUG: Job run {} must exist when dep miss event received", + event.job_run_id + )); + + let dep_miss = match job_run { + JobRun::Running(running) => { + tracing::info!( + job_run_id = %event.job_run_id, + missing_deps = ?event.missing_deps.iter() + .flat_map(|md| md.missing.iter().map(|p| &p.r#ref)) + .collect::>(), + "JobRun: Running → DepMiss (missing dependencies detected)" + ); + running.dep_miss( + current_timestamp(), + event.missing_deps.clone(), + event.read_deps.clone(), + ) + } + _ => { + panic!( + "BUG: DepMiss event received for job run {} in invalid state {:?}. Job must be Running to hit dep miss.", + event.job_run_id, job_run + ); + } + }; + + // Infer data/SLA timestamps from servicing wants + let want_timestamps: WantTimestamps = dep_miss + .info + .servicing_wants + .iter() + .flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into())) + .reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b)) + .expect("BUG: No servicing wants found"); + + // Collect all missing deps into a flat list of partition refs + let all_missing_deps: Vec = event + .missing_deps + .iter() + .flat_map(|md| md.missing.clone()) + .collect(); + + // Transition partitions from Building to UpstreamBuilding since this job can't build them yet + let building_refs_to_reset = dep_miss.get_building_partitions_to_reset(); + self.transition_partitions_to_upstream_building(&building_refs_to_reset, all_missing_deps); + + // Generate WantCreateV1 events for the missing dependencies + // These events will be returned and appended to the BEL by BuildEventLog.append_event() + let want_events = missing_deps_to_want_events( + dep_miss.get_missing_deps().to_vec(), + &event.job_run_id, + want_timestamps, + ); + + // Store the job run in DepMiss state so we can access the missing_deps later + // When the derivative WantCreateV1 events get processed by handle_want_create(), + // they will look up this job run and use handle_derivative_want_creation() to + // transition impacted wants to UpstreamBuilding with the correct want IDs. + // + // KEY: We do NOT transition wants here because the want_events have randomly generated UUIDs + // that won't match during replay. Instead, we transition wants when processing the actual + // WantCreateV1 events that get written to and read from the BEL. + self.job_runs + .insert(event.job_run_id.clone(), JobRun::DepMiss(dep_miss)); + + // Return derivative want events to be appended to the BEL + want_events + } + + pub(crate) fn handle_taint_create(&mut self, event: &TaintCreateEventV1) -> Vec { + // Store the taint detail + let taint_detail = TaintDetail { + taint_id: event.taint_id.clone(), + root_taint_id: event.root_taint_id.clone(), + parent_taint_id: event.parent_taint_id.clone(), + partitions: event.partitions.clone(), + source: event.source.clone(), + comment: event.comment.clone(), + }; + self.taints.insert(event.taint_id.clone(), taint_detail); + + // Transition affected partitions to Tainted state + for pref in &event.partitions { + if let Some(partition) = self.take_canonical_partition(&pref.r#ref) { + let transitioned = match partition { + Partition::Live(live) => { + tracing::info!( + partition = %pref.r#ref, + taint_id = %event.taint_id, + "Partition: Live → Tainted" + ); + Partition::Tainted(live.taint(event.taint_id.clone(), current_timestamp())) + } + Partition::Tainted(tainted) => { + // Add additional taint to already-tainted partition + tracing::info!( + partition = %pref.r#ref, + taint_id = %event.taint_id, + "Partition: Tainted → Tainted (adding taint)" + ); + Partition::Tainted(tainted.add_taint(event.taint_id.clone())) + } + other => { + // For non-Live/Tainted partitions (Building, UpstreamBuilding, etc.), + // we can't taint them - log a warning and skip + tracing::warn!( + partition = %pref.r#ref, + taint_id = %event.taint_id, + state = ?other, + "Cannot taint partition in non-Live state, skipping" + ); + other + } + }; + self.update_partition(transitioned); + } else { + // Partition doesn't exist yet - this is fine, taint will apply when it's built + tracing::debug!( + partition = %pref.r#ref, + taint_id = %event.taint_id, + "Taint targeting non-existent partition, will apply when built" + ); + } + } + + vec![] + } + + pub(crate) fn handle_taint_delete(&mut self, event: &TaintCancelEventV1) -> Vec { + // Remove the taint from our tracking + if let Some(taint) = self.taints.remove(&event.taint_id) { + tracing::info!( + taint_id = %event.taint_id, + partitions = ?taint.partitions.iter().map(|p| &p.r#ref).collect::>(), + "Taint canceled/deleted" + ); + + // Note: We do NOT automatically un-taint partitions when a taint is canceled. + // Once tainted, partitions remain tainted until they are rebuilt. + // The taint_ids on the partition are historical records of why it was tainted. + } else { + tracing::warn!( + taint_id = %event.taint_id, + "Attempted to cancel non-existent taint" + ); + } + + vec![] + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{MissingDeps, WantAttributedPartitions}; + + mod want { + use super::*; + use crate::WantDetail; + + #[test] + fn test_should_create_want() { + let mut e = WantCreateEventV1::default(); + e.want_id = "1234".to_string(); + e.partitions = vec!["mypart".into()]; + + let mut state = BuildState::default(); + state.handle_event(&e.clone().into()); + let want = state.get_want("1234").unwrap(); + let mut expected: WantDetail = e.into(); + // Into will set this field as current timestamp + expected.last_updated_timestamp = want.last_updated_timestamp; + assert_eq!(want, expected); + } + + #[test] + fn test_should_cancel_want() { + let mut e = WantCreateEventV1::default(); + e.want_id = "1234".to_string(); + e.partitions = vec!["mypart".into()]; + + let mut state = BuildState::default(); + state.handle_event(&e.clone().into()); + + // Should be able to cancel + let mut e = WantCancelEventV1::default(); + e.want_id = "1234".to_string(); + state.handle_event(&e.clone().into()); + let want = state.get_want("1234").unwrap(); + + assert_eq!( + want.status, + Some(crate::WantStatusCode::WantCanceled.into()) + ); + } + + #[test] + fn test_multihop_dependency_replay() { + use crate::{ + JobRunBufferEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, + JobRunSuccessEventV1, MissingDeps, PartitionRef, WantAttributedPartitions, + WantCreateEventV1, + }; + + let mut state = BuildState::default(); + let mut events = vec![]; + + // 1. Create want for data/beta + let beta_want_id = "beta-want".to_string(); + let mut create_beta = WantCreateEventV1::default(); + create_beta.want_id = beta_want_id.clone(); + create_beta.partitions = vec![PartitionRef { + r#ref: "data/beta".to_string(), + }]; + events.push(Event::WantCreateV1(create_beta)); + + // 2. Queue beta job (first attempt) + let beta_job_1_id = "beta-job-1".to_string(); + let mut buffer_beta_1 = JobRunBufferEventV1::default(); + buffer_beta_1.job_run_id = beta_job_1_id.clone(); + buffer_beta_1.job_label = "//job_beta".to_string(); + buffer_beta_1.want_attributed_partitions = vec![WantAttributedPartitions { + want_id: beta_want_id.clone(), + partitions: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + }]; + buffer_beta_1.building_partitions = vec![PartitionRef { + r#ref: "data/beta".to_string(), + }]; + events.push(Event::JobRunBufferV1(buffer_beta_1)); + + // 3. Beta job starts running + let mut heartbeat_beta_1 = JobRunHeartbeatEventV1::default(); + heartbeat_beta_1.job_run_id = beta_job_1_id.clone(); + events.push(Event::JobRunHeartbeatV1(heartbeat_beta_1)); + + // 4. Beta job reports missing dependency on data/alpha + let mut dep_miss_beta_1 = JobRunMissingDepsEventV1::default(); + dep_miss_beta_1.job_run_id = beta_job_1_id.clone(); + dep_miss_beta_1.missing_deps = vec![MissingDeps { + impacted: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + missing: vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }], + }]; + events.push(Event::JobRunMissingDepsV1(dep_miss_beta_1)); + + // 5. Create derivative want for data/alpha + let alpha_want_id = "alpha-want".to_string(); + let mut create_alpha = WantCreateEventV1::default(); + create_alpha.want_id = alpha_want_id.clone(); + create_alpha.partitions = vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }]; + events.push(Event::WantCreateV1(create_alpha)); + + // 6. Queue alpha job + let alpha_job_id = "alpha-job".to_string(); + let mut buffer_alpha = JobRunBufferEventV1::default(); + buffer_alpha.job_run_id = alpha_job_id.clone(); + buffer_alpha.job_label = "//job_alpha".to_string(); + buffer_alpha.want_attributed_partitions = vec![WantAttributedPartitions { + want_id: alpha_want_id.clone(), + partitions: vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }], + }]; + buffer_alpha.building_partitions = vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }]; + events.push(Event::JobRunBufferV1(buffer_alpha)); + + // 7. Alpha job starts running + let mut heartbeat_alpha = JobRunHeartbeatEventV1::default(); + heartbeat_alpha.job_run_id = alpha_job_id.clone(); + events.push(Event::JobRunHeartbeatV1(heartbeat_alpha)); + + // 8. Alpha job succeeds + let mut success_alpha = JobRunSuccessEventV1::default(); + success_alpha.job_run_id = alpha_job_id.clone(); + events.push(Event::JobRunSuccessV1(success_alpha)); + + // 9. Queue beta job again (second attempt) - THIS IS THE CRITICAL MOMENT + let beta_job_2_id = "beta-job-2".to_string(); + let mut buffer_beta_2 = JobRunBufferEventV1::default(); + buffer_beta_2.job_run_id = beta_job_2_id.clone(); + buffer_beta_2.job_label = "//job_beta".to_string(); + buffer_beta_2.want_attributed_partitions = vec![WantAttributedPartitions { + want_id: beta_want_id.clone(), + partitions: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + }]; + buffer_beta_2.building_partitions = vec![PartitionRef { + r#ref: "data/beta".to_string(), + }]; + events.push(Event::JobRunBufferV1(buffer_beta_2)); + + // 10. Beta job starts running + let mut heartbeat_beta_2 = JobRunHeartbeatEventV1::default(); + heartbeat_beta_2.job_run_id = beta_job_2_id.clone(); + events.push(Event::JobRunHeartbeatV1(heartbeat_beta_2)); + + // 11. Beta job succeeds + let mut success_beta_2 = JobRunSuccessEventV1::default(); + success_beta_2.job_run_id = beta_job_2_id.clone(); + events.push(Event::JobRunSuccessV1(success_beta_2)); + + // Process all events - this simulates replay + for event in &events { + state.handle_event(event); + } + + // Verify final state + let beta_want = state.get_want(&beta_want_id).unwrap(); + assert_eq!( + beta_want.status, + Some(crate::WantStatusCode::WantSuccessful.into()), + "Beta want should be successful after multi-hop dependency resolution" + ); + + let alpha_want = state.get_want(&alpha_want_id).unwrap(); + assert_eq!( + alpha_want.status, + Some(crate::WantStatusCode::WantSuccessful.into()), + "Alpha want should be successful" + ); + } + + /// Test that multiple concurrent wants for the same partition all transition correctly. + /// This was the original bug that motivated the UUID refactor. + #[test] + fn test_concurrent_wants_same_partition() { + use crate::{ + JobRunBufferEventV1, JobRunHeartbeatEventV1, PartitionRef, + WantAttributedPartitions, WantCreateEventV1, + }; + + let mut state = BuildState::default(); + + // 1. Create Want 1 for data/beta + let want_1_id = "want-1".to_string(); + let mut create_want_1 = WantCreateEventV1::default(); + create_want_1.want_id = want_1_id.clone(); + create_want_1.partitions = vec![PartitionRef { + r#ref: "data/beta".to_string(), + }]; + state.handle_event(&Event::WantCreateV1(create_want_1)); + + // Want 1 should be Idle (no partition exists yet) + let want_1 = state.get_want(&want_1_id).unwrap(); + assert_eq!( + want_1.status, + Some(crate::WantStatusCode::WantIdle.into()), + "Want 1 should be Idle initially" + ); + + // 2. Create Want 2 for the same partition + let want_2_id = "want-2".to_string(); + let mut create_want_2 = WantCreateEventV1::default(); + create_want_2.want_id = want_2_id.clone(); + create_want_2.partitions = vec![PartitionRef { + r#ref: "data/beta".to_string(), + }]; + state.handle_event(&Event::WantCreateV1(create_want_2)); + + // Want 2 should also be Idle + let want_2 = state.get_want(&want_2_id).unwrap(); + assert_eq!( + want_2.status, + Some(crate::WantStatusCode::WantIdle.into()), + "Want 2 should be Idle initially" + ); + + // Verify inverted index has both wants + let wants_for_beta = state.get_wants_for_partition("data/beta"); + assert!( + wants_for_beta.contains(&want_1_id), + "wants_for_partition should contain want-1" + ); + assert!( + wants_for_beta.contains(&want_2_id), + "wants_for_partition should contain want-2" + ); + + // 3. Job buffers for data/beta - both wants should transition to Building + let job_run_id = "job-1".to_string(); + let mut buffer_event = JobRunBufferEventV1::default(); + buffer_event.job_run_id = job_run_id.clone(); + buffer_event.job_label = "//job_beta".to_string(); + buffer_event.want_attributed_partitions = vec![ + WantAttributedPartitions { + want_id: want_1_id.clone(), + partitions: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + }, + WantAttributedPartitions { + want_id: want_2_id.clone(), + partitions: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + }, + ]; + buffer_event.building_partitions = vec![PartitionRef { + r#ref: "data/beta".to_string(), + }]; + state.handle_event(&Event::JobRunBufferV1(buffer_event)); + + // Start the job + let mut heartbeat = JobRunHeartbeatEventV1::default(); + heartbeat.job_run_id = job_run_id.clone(); + state.handle_event(&Event::JobRunHeartbeatV1(heartbeat)); + + // Both wants should now be Building + let want_1 = state.get_want(&want_1_id).unwrap(); + assert_eq!( + want_1.status, + Some(crate::WantStatusCode::WantBuilding.into()), + "Want 1 should be Building after job starts" + ); + + let want_2 = state.get_want(&want_2_id).unwrap(); + assert_eq!( + want_2.status, + Some(crate::WantStatusCode::WantBuilding.into()), + "Want 2 should be Building after job starts" + ); + + // Partition should exist and be Building + let partition = state.get_partition("data/beta").unwrap(); + assert_eq!( + partition.status, + Some(crate::PartitionStatusCode::PartitionBuilding.into()), + "Partition should be Building" + ); + } + } + + mod partition_lifecycle { + use super::*; + use crate::{ + JobRunBufferEventV1, JobRunFailureEventV1, JobRunHeartbeatEventV1, + JobRunMissingDepsEventV1, JobRunSuccessEventV1, MissingDeps, PartitionRef, + WantAttributedPartitions, WantCreateEventV1, + }; + + /// Test that upstream failure cascades to downstream partitions. + /// When an upstream partition fails, downstream partitions in UpstreamBuilding + /// should transition to UpstreamFailed. + #[test] + fn test_upstream_failure_cascades_to_downstream() { + let mut state = BuildState::default(); + + // 1. Create want for data/beta + let beta_want_id = "beta-want".to_string(); + let mut create_beta = WantCreateEventV1::default(); + create_beta.want_id = beta_want_id.clone(); + create_beta.partitions = vec![PartitionRef { + r#ref: "data/beta".to_string(), + }]; + state.handle_event(&Event::WantCreateV1(create_beta)); + + // 2. Job buffers for beta + let beta_job_id = "beta-job".to_string(); + let mut buffer_beta = JobRunBufferEventV1::default(); + buffer_beta.job_run_id = beta_job_id.clone(); + buffer_beta.job_label = "//job_beta".to_string(); + buffer_beta.want_attributed_partitions = vec![WantAttributedPartitions { + want_id: beta_want_id.clone(), + partitions: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + }]; + buffer_beta.building_partitions = vec![PartitionRef { + r#ref: "data/beta".to_string(), + }]; + state.handle_event(&Event::JobRunBufferV1(buffer_beta)); + + // Start beta job + let mut heartbeat_beta = JobRunHeartbeatEventV1::default(); + heartbeat_beta.job_run_id = beta_job_id.clone(); + state.handle_event(&Event::JobRunHeartbeatV1(heartbeat_beta)); + + // 3. Beta job dep misses on data/alpha + // This returns derivative want events that we MUST process + let mut dep_miss = JobRunMissingDepsEventV1::default(); + dep_miss.job_run_id = beta_job_id.clone(); + dep_miss.missing_deps = vec![MissingDeps { + impacted: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + missing: vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }], + }]; + let derivative_events = state.handle_event(&Event::JobRunMissingDepsV1(dep_miss)); + + // Beta partition should be UpstreamBuilding + let beta_partition = state.get_partition("data/beta").unwrap(); + assert_eq!( + beta_partition.status.as_ref().unwrap().name, + "PartitionUpstreamBuilding", + "Beta partition should be UpstreamBuilding after dep miss" + ); + + // 4. Process the derivative want event for alpha + // This will create the alpha want AND transition beta want to UpstreamBuilding + assert_eq!( + derivative_events.len(), + 1, + "Should have one derivative want event" + ); + let alpha_want_id = match &derivative_events[0] { + Event::WantCreateV1(e) => e.want_id.clone(), + _ => panic!("Expected WantCreateV1 event"), + }; + state.handle_event(&derivative_events[0]); + + // Now beta want should be UpstreamBuilding (waiting for alpha want) + let beta_want_after_derivative = state.get_want(&beta_want_id).unwrap(); + assert_eq!( + beta_want_after_derivative.status, + Some(crate::WantStatusCode::WantUpstreamBuilding.into()), + "Beta want should be UpstreamBuilding after derivative want processed" + ); + + // 5. Job buffers for alpha + let alpha_job_id = "alpha-job".to_string(); + let mut buffer_alpha = JobRunBufferEventV1::default(); + buffer_alpha.job_run_id = alpha_job_id.clone(); + buffer_alpha.job_label = "//job_alpha".to_string(); + buffer_alpha.want_attributed_partitions = vec![WantAttributedPartitions { + want_id: alpha_want_id.clone(), + partitions: vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }], + }]; + buffer_alpha.building_partitions = vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }]; + state.handle_event(&Event::JobRunBufferV1(buffer_alpha)); + + // Start alpha job + let mut heartbeat_alpha = JobRunHeartbeatEventV1::default(); + heartbeat_alpha.job_run_id = alpha_job_id.clone(); + state.handle_event(&Event::JobRunHeartbeatV1(heartbeat_alpha)); + + // 6. Alpha job FAILS + let mut failure_alpha = JobRunFailureEventV1::default(); + failure_alpha.job_run_id = alpha_job_id.clone(); + failure_alpha.reason = "Test failure".to_string(); + state.handle_event(&Event::JobRunFailureV1(failure_alpha)); + + // Alpha partition should be Failed + let alpha_partition = state.get_partition("data/alpha").unwrap(); + assert_eq!( + alpha_partition.status, + Some(crate::PartitionStatusCode::PartitionFailed.into()), + "Alpha partition should be Failed" + ); + + // Beta partition should cascade to UpstreamFailed + let beta_partition = state.get_partition("data/beta").unwrap(); + assert_eq!( + beta_partition.status.as_ref().unwrap().name, + "PartitionUpstreamFailed", + "Beta partition should cascade to UpstreamFailed when alpha fails" + ); + + // Beta want should also be UpstreamFailed + let beta_want = state.get_want(&beta_want_id).unwrap(); + assert_eq!( + beta_want.status, + Some(crate::WantStatusCode::WantUpstreamFailed.into()), + "Beta want should be UpstreamFailed" + ); + } + + /// Test that partition retry creates a new UUID while preserving the old one. + #[test] + fn test_partition_retry_creates_new_uuid() { + let mut state = BuildState::default(); + + // 1. Create want for data/beta + let beta_want_id = "beta-want".to_string(); + let mut create_beta = WantCreateEventV1::default(); + create_beta.want_id = beta_want_id.clone(); + create_beta.partitions = vec![PartitionRef { + r#ref: "data/beta".to_string(), + }]; + state.handle_event(&Event::WantCreateV1(create_beta)); + + // 2. First job buffers for beta (creates uuid-1) + let beta_job_1_id = "beta-job-1".to_string(); + let mut buffer_beta_1 = JobRunBufferEventV1::default(); + buffer_beta_1.job_run_id = beta_job_1_id.clone(); + buffer_beta_1.job_label = "//job_beta".to_string(); + buffer_beta_1.want_attributed_partitions = vec![WantAttributedPartitions { + want_id: beta_want_id.clone(), + partitions: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + }]; + buffer_beta_1.building_partitions = vec![PartitionRef { + r#ref: "data/beta".to_string(), + }]; + state.handle_event(&Event::JobRunBufferV1(buffer_beta_1)); + + // Get UUID-1 + let partition_after_first_buffer = state.get_partition("data/beta").unwrap(); + let uuid_1 = partition_after_first_buffer.uuid.clone(); + + // Start job + let mut heartbeat_1 = JobRunHeartbeatEventV1::default(); + heartbeat_1.job_run_id = beta_job_1_id.clone(); + state.handle_event(&Event::JobRunHeartbeatV1(heartbeat_1)); + + // 3. Job dep misses on data/alpha + let mut dep_miss = JobRunMissingDepsEventV1::default(); + dep_miss.job_run_id = beta_job_1_id.clone(); + dep_miss.missing_deps = vec![MissingDeps { + impacted: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + missing: vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }], + }]; + state.handle_event(&Event::JobRunMissingDepsV1(dep_miss)); + + // 4. Create and complete alpha (to satisfy the dep) + let alpha_want_id = "alpha-want".to_string(); + let mut create_alpha = WantCreateEventV1::default(); + create_alpha.want_id = alpha_want_id.clone(); + create_alpha.partitions = vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }]; + state.handle_event(&Event::WantCreateV1(create_alpha)); + + let alpha_job_id = "alpha-job".to_string(); + let mut buffer_alpha = JobRunBufferEventV1::default(); + buffer_alpha.job_run_id = alpha_job_id.clone(); + buffer_alpha.job_label = "//job_alpha".to_string(); + buffer_alpha.want_attributed_partitions = vec![WantAttributedPartitions { + want_id: alpha_want_id.clone(), + partitions: vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }], + }]; + buffer_alpha.building_partitions = vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }]; + state.handle_event(&Event::JobRunBufferV1(buffer_alpha)); + + let mut heartbeat_alpha = JobRunHeartbeatEventV1::default(); + heartbeat_alpha.job_run_id = alpha_job_id.clone(); + state.handle_event(&Event::JobRunHeartbeatV1(heartbeat_alpha)); + + let mut success_alpha = JobRunSuccessEventV1::default(); + success_alpha.job_run_id = alpha_job_id.clone(); + state.handle_event(&Event::JobRunSuccessV1(success_alpha)); + + // Beta partition should now be UpForRetry + let beta_partition = state.get_partition("data/beta").unwrap(); + assert_eq!( + beta_partition.status.as_ref().unwrap().name, + "PartitionUpForRetry", + "Beta partition should be UpForRetry after alpha succeeds" + ); + + // 5. Second job buffers for beta retry (creates uuid-2) + let beta_job_2_id = "beta-job-2".to_string(); + let mut buffer_beta_2 = JobRunBufferEventV1::default(); + buffer_beta_2.job_run_id = beta_job_2_id.clone(); + buffer_beta_2.job_label = "//job_beta".to_string(); + buffer_beta_2.want_attributed_partitions = vec![WantAttributedPartitions { + want_id: beta_want_id.clone(), + partitions: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + }]; + buffer_beta_2.building_partitions = vec![PartitionRef { + r#ref: "data/beta".to_string(), + }]; + state.handle_event(&Event::JobRunBufferV1(buffer_beta_2)); + + // Get UUID-2 + let partition_after_retry = state.get_partition("data/beta").unwrap(); + let uuid_2 = partition_after_retry.uuid.clone(); + + // UUIDs should be different + assert_ne!( + uuid_1, uuid_2, + "New job should create a new UUID for the partition" + ); + + // New partition should be Building + assert_eq!( + partition_after_retry.status, + Some(crate::PartitionStatusCode::PartitionBuilding.into()), + "Retry partition should be Building" + ); + } + } +} diff --git a/databuild/build_state/mod.rs b/databuild/build_state/mod.rs new file mode 100644 index 0000000..e827ddf --- /dev/null +++ b/databuild/build_state/mod.rs @@ -0,0 +1,181 @@ +//! Build State - the heart of databuild's orchestration system +//! +//! The BuildState struct tracks all application state, defines valid state transitions, +//! and manages cross-state machine state transitions (e.g. job run success resulting +//! in partition going from Building to Live). +//! +//! See docs/design/build-state-semantics.md for the full conceptual model. + +mod event_handlers; +mod partition_transitions; +mod queries; +mod schedulability; +mod want_transitions; + +use crate::job_run_state::JobRun; +use crate::partition_state::Partition; +use crate::want_state::Want; +use crate::{PartitionRef, TaintDetail}; +use std::collections::BTreeMap; +use uuid::Uuid; + +// Re-export public types +pub use schedulability::{WantSchedulability, WantUpstreamStatus, WantsSchedulability}; + +/** +Design Notes + +The build state struct is the heart of the service and orchestrator, adapting build events to +higher level questions about build state. One temptation is to implement the build state as a set +of hierarchically defined reducers, to achieve information hiding and factor system capabilities and +state tracking simply. Unfortunately, to update state based on an event, you need a mutable borrow +of some part of the build state (that the reducer controls, for instance), and an immutable borrow +of the whole state for read/query purposes. The whole state needs to be available to handle state +updates like "this is the list of currently active job runs" in response to a job run event. Put +simply, this isn't possible without introducing some locking of the whole state and mutable state +subset, since they would conflict (the mutable subset would have already been borrowed, so can't +be borrowed immutably as part of the whole state borrow). You might also define a "query" phase +in which reducers query the state based on the received event, but that just increases complexity. + +Instead, databuild opts for an entity-component system (ECS) that just provides the whole build +state mutably to all state update functionality, trusting that we know how to use it responsibly. +This means no boxing or "query phase", and means we can have all state updates happen as map lookups +and updates, which is exceptionally fast. The states of the different entities are managed by state +machines, in a pseudo-colored-petri-net style (only pseudo because we haven't formalized it). It is +critical that these state machines, their states, and their transitions are type-safe. +*/ + +/// Tracks all application state, defines valid state transitions, and manages cross-state machine +/// state transitions (e.g. job run success resulting in partition going from Building to Live) +#[derive(Debug, Clone, Default)] +pub struct BuildState { + // Core entity storage + pub(crate) wants: BTreeMap, + pub(crate) taints: BTreeMap, + pub(crate) job_runs: BTreeMap, + + // UUID-based partition indexing + pub(crate) partitions_by_uuid: BTreeMap, + pub(crate) canonical_partitions: BTreeMap, // partition ref → current UUID + + // Inverted indexes + pub(crate) wants_for_partition: BTreeMap>, // partition ref → want_ids + pub(crate) downstream_waiting: BTreeMap>, // upstream ref → partition UUIDs waiting for it +} + +impl BuildState { + /// Reconstruct BuildState from a sequence of events (for read path in web server) + /// This allows the web server to rebuild state from BEL storage without holding a lock + pub fn from_events(events: &[crate::DataBuildEvent]) -> Self { + let mut state = BuildState::default(); + for event in events { + if let Some(ref inner_event) = event.event { + // handle_event returns Vec for cascading events, but we ignore them + // since we're replaying from a complete event log + state.handle_event(inner_event); + } + } + state + } + + pub fn count_job_runs(&self) -> usize { + self.job_runs.len() + } + + // ===== UUID-based partition access methods ===== + + /// Get the canonical partition for a ref (the current/active partition instance) + pub fn get_canonical_partition(&self, partition_ref: &str) -> Option<&Partition> { + self.canonical_partitions + .get(partition_ref) + .and_then(|uuid| self.partitions_by_uuid.get(uuid)) + } + + /// Get the canonical partition UUID for a ref + pub fn get_canonical_partition_uuid(&self, partition_ref: &str) -> Option { + self.canonical_partitions.get(partition_ref).copied() + } + + /// Get a partition by its UUID + pub fn get_partition_by_uuid(&self, uuid: Uuid) -> Option<&Partition> { + self.partitions_by_uuid.get(&uuid) + } + + /// Take the canonical partition for a ref (removes from partitions_by_uuid for state transition) + /// The canonical_partitions mapping is NOT removed - caller must update it if creating a new partition + pub(crate) fn take_canonical_partition(&mut self, partition_ref: &str) -> Option { + self.canonical_partitions + .get(partition_ref) + .copied() + .and_then(|uuid| self.partitions_by_uuid.remove(&uuid)) + } + + /// Get want IDs for a partition ref (from inverted index) + pub fn get_wants_for_partition(&self, partition_ref: &str) -> &[String] { + self.wants_for_partition + .get(partition_ref) + .map(|v| v.as_slice()) + .unwrap_or(&[]) + } + + /// Register a want in the wants_for_partition inverted index + pub(crate) fn register_want_for_partitions(&mut self, want_id: &str, partition_refs: &[PartitionRef]) { + for pref in partition_refs { + let want_ids = self + .wants_for_partition + .entry(pref.r#ref.clone()) + .or_insert_with(Vec::new); + if !want_ids.contains(&want_id.to_string()) { + want_ids.push(want_id.to_string()); + } + } + } + + /// Update a partition in the indexes (after state transition) + pub(crate) fn update_partition(&mut self, partition: Partition) { + let uuid = partition.uuid(); + self.partitions_by_uuid.insert(uuid, partition); + } + + // Test helpers + pub(crate) fn with_wants(self, wants: BTreeMap) -> Self { + Self { wants, ..self } + } + + #[cfg(test)] + pub(crate) fn with_partitions(self, old_partitions: BTreeMap) -> Self { + use crate::partition_state::PartitionWithState; + + let mut canonical_partitions: BTreeMap = BTreeMap::new(); + let mut partitions_by_uuid: BTreeMap = BTreeMap::new(); + + // Convert PartitionDetail to Live partitions for testing + for (key, detail) in old_partitions { + let partition_ref = detail.r#ref.clone().unwrap_or_default(); + // Create a deterministic UUID for test data + let uuid = + crate::partition_state::derive_partition_uuid("test_job_run", &partition_ref.r#ref); + let live_partition = Partition::Live(PartitionWithState { + uuid, + partition_ref, + state: crate::partition_state::LiveState { + built_at: 0, + built_by: "test_job_run".to_string(), + }, + }); + + canonical_partitions.insert(key, uuid); + partitions_by_uuid.insert(uuid, live_partition); + } + + Self { + canonical_partitions, + partitions_by_uuid, + ..self + } + } +} + +pub(crate) mod consts { + pub const DEFAULT_PAGE_SIZE: u64 = 100; +} diff --git a/databuild/build_state/partition_transitions.rs b/databuild/build_state/partition_transitions.rs new file mode 100644 index 0000000..9753cca --- /dev/null +++ b/databuild/build_state/partition_transitions.rs @@ -0,0 +1,338 @@ +//! Partition state transition logic +//! +//! Methods for transitioning partitions between states (Building, Live, Failed, +//! UpstreamBuilding, UpForRetry, UpstreamFailed) and managing downstream dependencies. + +use crate::partition_state::{ + BuildingPartitionRef, BuildingState, FailedPartitionRef, LivePartitionRef, Partition, + PartitionWithState, +}; +use crate::util::current_timestamp; +use crate::PartitionRef; +use uuid::Uuid; + +use super::BuildState; + +impl BuildState { + /// Create a new partition in Building state and update indexes + pub(crate) fn create_partition_building(&mut self, job_run_id: &str, partition_ref: PartitionRef) -> Uuid { + let partition = + PartitionWithState::::new(job_run_id.to_string(), partition_ref.clone()); + let uuid = partition.uuid; + + // Update indexes + self.partitions_by_uuid + .insert(uuid, Partition::Building(partition)); + self.canonical_partitions + .insert(partition_ref.r#ref.clone(), uuid); + + tracing::info!( + partition = %partition_ref.r#ref, + uuid = %uuid, + job_run_id = %job_run_id, + "Partition: Created in Building state" + ); + + uuid + } + + /// Create partitions in Building state + /// Used when a job run starts building partitions. + /// Note: Partitions no longer have a Missing state - they start directly as Building. + pub(crate) fn transition_partitions_to_building( + &mut self, + partition_refs: &[BuildingPartitionRef], + job_run_id: &str, + ) { + for building_ref in partition_refs { + if let Some(partition) = self.get_canonical_partition(&building_ref.0.r#ref).cloned() { + // Partition already exists - this is an error unless we're retrying from UpForRetry + match partition { + Partition::UpForRetry(_) => { + // Valid: UpForRetry -> Building (retry after deps satisfied) + // Old partition stays in partitions_by_uuid as historical record + // Create new Building partition with fresh UUID + let uuid = + self.create_partition_building(job_run_id, building_ref.0.clone()); + tracing::info!( + partition = %building_ref.0.r#ref, + job_run_id = %job_run_id, + uuid = %uuid, + "Partition: UpForRetry → Building (retry)" + ); + } + _ => { + panic!( + "BUG: Invalid state - partition {} cannot start building from state {:?}", + building_ref.0.r#ref, partition + ) + } + } + } else { + // Partition doesn't exist yet - create directly in Building state + let uuid = self.create_partition_building(job_run_id, building_ref.0.clone()); + tracing::info!( + partition = %building_ref.0.r#ref, + job_run_id = %job_run_id, + uuid = %uuid, + "Partition: (new) → Building" + ); + } + } + } + + /// Transition partitions from Building to Live state + /// Used when a job run successfully completes + pub(crate) fn transition_partitions_to_live( + &mut self, + partition_refs: &[LivePartitionRef], + job_run_id: &str, + timestamp: u64, + ) { + for pref in partition_refs { + let partition = self + .take_canonical_partition(&pref.0.r#ref) + .expect(&format!( + "BUG: Partition {} must exist and be in Building state before completion", + pref.0.r#ref + )); + + // ONLY valid transition: Building -> Live + let transitioned = match partition { + Partition::Building(building) => { + tracing::info!( + partition = %pref.0.r#ref, + job_run_id = %job_run_id, + "Partition: Building → Live" + ); + Partition::Live(building.complete(timestamp)) + } + // All other states are invalid + _ => { + panic!( + "BUG: Invalid state - partition {} must be Building to transition to Live, found {:?}", + pref.0.r#ref, partition + ) + } + }; + self.update_partition(transitioned); + } + } + + /// Transition partitions from Building to Failed state + /// Used when a job run fails + pub(crate) fn transition_partitions_to_failed( + &mut self, + partition_refs: &[FailedPartitionRef], + job_run_id: &str, + timestamp: u64, + ) { + for pref in partition_refs { + let partition = self + .take_canonical_partition(&pref.0.r#ref) + .expect(&format!( + "BUG: Partition {} must exist and be in Building state before failure", + pref.0.r#ref + )); + + // ONLY valid transition: Building -> Failed + let transitioned = match partition { + Partition::Building(building) => { + tracing::info!( + partition = %pref.0.r#ref, + job_run_id = %job_run_id, + "Partition: Building → Failed" + ); + Partition::Failed(building.fail(timestamp)) + } + // All other states are invalid + _ => { + panic!( + "BUG: Invalid state - partition {} must be Building to transition to Failed, found {:?}", + pref.0.r#ref, partition + ) + } + }; + self.update_partition(transitioned); + } + } + + /// Transition partitions from Building to UpstreamBuilding state + /// Used when a job run encounters missing dependencies and cannot proceed. + /// The partition waits for its upstream deps to be built before becoming UpForRetry. + pub(crate) fn transition_partitions_to_upstream_building( + &mut self, + partition_refs: &[BuildingPartitionRef], + missing_deps: Vec, + ) { + for building_ref in partition_refs { + let partition = self + .take_canonical_partition(&building_ref.0.r#ref) + .expect(&format!( + "BUG: Partition {} must exist and be in Building state during dep_miss", + building_ref.0.r#ref + )); + + // Only valid transition: Building -> UpstreamBuilding + let transitioned = match partition { + Partition::Building(building) => { + let partition_uuid = building.uuid; + tracing::info!( + partition = %building_ref.0.r#ref, + uuid = %partition_uuid, + missing_deps = ?missing_deps.iter().map(|p| &p.r#ref).collect::>(), + "Partition: Building → UpstreamBuilding (dep miss)" + ); + + // Update downstream_waiting index: for each missing dep, record that this partition is waiting + for missing_dep in &missing_deps { + self.downstream_waiting + .entry(missing_dep.r#ref.clone()) + .or_default() + .push(partition_uuid); + } + + Partition::UpstreamBuilding(building.dep_miss(missing_deps.clone())) + } + // All other states are invalid + _ => { + panic!( + "BUG: Invalid state - partition {} must be Building during dep_miss, found {:?}", + building_ref.0.r#ref, partition + ) + } + }; + self.update_partition(transitioned); + } + } + + /// Transition partitions from UpstreamBuilding to UpForRetry when their upstream deps become Live. + /// This should be called when partitions become Live to check if any downstream partitions can now retry. + /// Uses the `downstream_waiting` index for O(1) lookup of affected partitions. + pub(crate) fn unblock_downstream_partitions(&mut self, newly_live_partition_refs: &[LivePartitionRef]) { + // Collect UUIDs of partitions that might be unblocked using the inverted index + let mut uuids_to_check: Vec = Vec::new(); + for live_ref in newly_live_partition_refs { + if let Some(waiting_uuids) = self.downstream_waiting.get(&live_ref.0.r#ref) { + uuids_to_check.extend(waiting_uuids.iter().cloned()); + } + } + + // Deduplicate UUIDs (a partition might be waiting for multiple deps that all became live) + uuids_to_check.sort(); + uuids_to_check.dedup(); + + for uuid in uuids_to_check { + // Get partition by UUID - it might have been transitioned already or no longer exist + let Some(partition) = self.partitions_by_uuid.get(&uuid).cloned() else { + continue; + }; + + let partition_ref = partition.partition_ref().r#ref.clone(); + + // Only process UpstreamBuilding partitions + if let Partition::UpstreamBuilding(mut upstream_building) = partition { + // Remove satisfied deps from missing_deps + for live_ref in newly_live_partition_refs { + upstream_building + .state + .missing_deps + .retain(|d| d.r#ref != live_ref.0.r#ref); + // Also remove from downstream_waiting index + if let Some(waiting) = self.downstream_waiting.get_mut(&live_ref.0.r#ref) { + waiting.retain(|u| *u != uuid); + } + } + + let transitioned = if upstream_building.state.missing_deps.is_empty() { + // All deps satisfied, transition to UpForRetry + tracing::info!( + partition = %partition_ref, + uuid = %uuid, + "Partition: UpstreamBuilding → UpForRetry (all upstreams satisfied)" + ); + Partition::UpForRetry(upstream_building.upstreams_satisfied()) + } else { + // Still waiting for more deps + tracing::debug!( + partition = %partition_ref, + uuid = %uuid, + remaining_deps = ?upstream_building.state.missing_deps.iter().map(|d| &d.r#ref).collect::>(), + "Partition remains in UpstreamBuilding (still waiting for deps)" + ); + Partition::UpstreamBuilding(upstream_building) + }; + + self.update_partition(transitioned); + } + } + } + + /// Cascade failures to downstream partitions when their upstream dependencies fail. + /// Transitions UpstreamBuilding → UpstreamFailed for partitions waiting on failed upstreams. + /// Uses the `downstream_waiting` index for O(1) lookup of affected partitions. + pub(crate) fn cascade_failures_to_downstream_partitions( + &mut self, + failed_partition_refs: &[FailedPartitionRef], + ) { + // Collect UUIDs of partitions that are waiting for the failed partitions + let mut uuids_to_fail: Vec = Vec::new(); + for failed_ref in failed_partition_refs { + if let Some(waiting_uuids) = self.downstream_waiting.get(&failed_ref.0.r#ref) { + uuids_to_fail.extend(waiting_uuids.iter().cloned()); + } + } + + // Deduplicate UUIDs + uuids_to_fail.sort(); + uuids_to_fail.dedup(); + + for uuid in uuids_to_fail { + // Get partition by UUID + let Some(partition) = self.partitions_by_uuid.get(&uuid).cloned() else { + continue; + }; + + let partition_ref = partition.partition_ref().r#ref.clone(); + + // Only process UpstreamBuilding partitions + if let Partition::UpstreamBuilding(upstream_building) = partition { + // Collect which upstream refs failed + let failed_upstream_refs: Vec = failed_partition_refs + .iter() + .filter(|f| { + upstream_building + .state + .missing_deps + .iter() + .any(|d| d.r#ref == f.0.r#ref) + }) + .map(|f| f.0.clone()) + .collect(); + + if !failed_upstream_refs.is_empty() { + tracing::info!( + partition = %partition_ref, + uuid = %uuid, + failed_upstreams = ?failed_upstream_refs.iter().map(|p| &p.r#ref).collect::>(), + "Partition: UpstreamBuilding → UpstreamFailed (upstream failed)" + ); + + // Remove from downstream_waiting index for all deps + for dep in &upstream_building.state.missing_deps { + if let Some(waiting) = self.downstream_waiting.get_mut(&dep.r#ref) { + waiting.retain(|u| *u != uuid); + } + } + + // Transition to UpstreamFailed + let transitioned = Partition::UpstreamFailed( + upstream_building + .upstream_failed(failed_upstream_refs, current_timestamp()), + ); + self.update_partition(transitioned); + } + } + } + } +} diff --git a/databuild/build_state/queries.rs b/databuild/build_state/queries.rs new file mode 100644 index 0000000..1f8faaa --- /dev/null +++ b/databuild/build_state/queries.rs @@ -0,0 +1,118 @@ +//! Query methods for BuildState +//! +//! Read-only methods for accessing state (get_*, list_*) used by the API layer. + +use crate::{ + JobRunDetail, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, + ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, + ListWantsResponse, PartitionDetail, TaintDetail, WantDetail, +}; +use std::collections::BTreeMap; + +use super::{consts, BuildState}; + +impl BuildState { + pub fn get_want(&self, want_id: &str) -> Option { + self.wants.get(want_id).map(|w| w.to_detail()) + } + + pub fn get_taint(&self, taint_id: &str) -> Option { + self.taints.get(taint_id).cloned() + } + + pub fn get_partition(&self, partition_id: &str) -> Option { + self.get_canonical_partition(partition_id) + .map(|p| p.to_detail()) + } + + pub fn get_job_run(&self, job_run_id: &str) -> Option { + self.job_runs.get(job_run_id).map(|jr| jr.to_detail()) + } + + pub fn list_wants(&self, request: &ListWantsRequest) -> ListWantsResponse { + let page = request.page.unwrap_or(0); + let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE); + + let start = page * page_size; + + // Paginate first, then convert only the needed wants to WantDetail + let data: Vec = self + .wants + .values() + .skip(start as usize) + .take(page_size as usize) + .map(|w| w.to_detail()) + .collect(); + + ListWantsResponse { + data, + match_count: self.wants.len() as u64, + page, + page_size, + } + } + + pub fn list_taints(&self, request: &ListTaintsRequest) -> ListTaintsResponse { + let page = request.page.unwrap_or(0); + let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE); + ListTaintsResponse { + data: list_state_items(&self.taints, page, page_size), + match_count: self.wants.len() as u64, + page, + page_size, + } + } + + pub fn list_partitions(&self, request: &ListPartitionsRequest) -> ListPartitionsResponse { + let page = request.page.unwrap_or(0); + let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE); + // Convert canonical partitions to PartitionDetail for API + let partition_details: BTreeMap = self + .canonical_partitions + .iter() + .filter_map(|(k, uuid)| { + self.partitions_by_uuid + .get(uuid) + .map(|p| (k.clone(), p.to_detail())) + }) + .collect(); + ListPartitionsResponse { + data: list_state_items(&partition_details, page, page_size), + match_count: self.canonical_partitions.len() as u64, + page, + page_size, + } + } + + pub fn list_job_runs(&self, request: &ListJobRunsRequest) -> ListJobRunsResponse { + let page = request.page.unwrap_or(0); + let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE); + + let start = page * page_size; + let data: Vec = self + .job_runs + .values() + .skip(start as usize) + .take(page_size as usize) + .map(|jr| jr.to_detail()) + .collect(); + + ListJobRunsResponse { + data, + match_count: self.job_runs.len() as u64, + page, + page_size, + } + } +} + +fn list_state_items(map: &BTreeMap, page: u64, page_size: u64) -> Vec { + // TODO when we add filtering, can we add it generically via some trait or filter object that can be provided? + let start = page * page_size; + let end = start + page_size; + map.values() + .skip(start as usize) + .take(end as usize) + .cloned() + .collect() +} diff --git a/databuild/build_state/schedulability.rs b/databuild/build_state/schedulability.rs new file mode 100644 index 0000000..40236bf --- /dev/null +++ b/databuild/build_state/schedulability.rs @@ -0,0 +1,176 @@ +//! Want schedulability logic +//! +//! Types and methods for determining whether wants are schedulable based on +//! upstream partition states and target partition build status. + +use crate::partition_state::{ + BuildingPartitionRef, LivePartitionRef, Partition, TaintedPartitionRef, +}; +use crate::{PartitionRef, WantDetail}; +use serde::{Deserialize, Serialize}; + +use super::BuildState; + +/// The status of partitions required by a want to build (sensed from dep miss job run) +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct WantUpstreamStatus { + pub live: Vec, + pub tainted: Vec, + /// Upstream partitions that are not ready (don't exist, or are in Building/UpstreamBuilding/UpForRetry/Failed/UpstreamFailed states) + pub not_ready: Vec, + /// Target partitions that are currently being built by another job + pub building: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct WantSchedulability { + pub want: WantDetail, + pub status: WantUpstreamStatus, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct WantsSchedulability(pub Vec); + +impl WantsSchedulability { + pub fn schedulable_wants(self) -> Vec { + self.0 + .iter() + .filter_map(|ws| match ws.is_schedulable() { + false => None, + true => Some(ws.want.clone()), + }) + .collect() + } +} + +impl WantSchedulability { + pub fn is_schedulable(&self) -> bool { + // Want is schedulable if: + // - No not-ready upstream dependencies (must all be Live or Tainted) + // - No tainted upstream dependencies + // - No target partitions currently being built by another job + self.status.not_ready.is_empty() + && self.status.tainted.is_empty() + && self.status.building.is_empty() + } +} + +impl BuildState { + /// Wants are schedulable when their upstreams are ready and target partitions are not already building + pub fn want_schedulability(&self, want: &WantDetail) -> WantSchedulability { + // Check upstream partition statuses (dependencies) + let mut live: Vec = Vec::new(); + let mut tainted: Vec = Vec::new(); + let mut not_ready: Vec = Vec::new(); // Partitions that don't exist or aren't Live + + for upstream_ref in &want.upstreams { + match self.get_canonical_partition(&upstream_ref.r#ref) { + Some(partition) => { + match partition { + Partition::Live(p) => live.push(p.get_ref()), + Partition::Tainted(p) => tainted.push(p.get_ref()), + // All other states (Building, UpstreamBuilding, UpForRetry, Failed, UpstreamFailed) mean upstream is not ready + _ => not_ready.push(upstream_ref.clone()), + } + } + None => { + // Partition doesn't exist yet - it's not ready + not_ready.push(upstream_ref.clone()); + } + } + } + + // Check target partition statuses (what this want is trying to build) + // If any target partition is already Building, this want should wait + let mut building: Vec = Vec::new(); + for target_ref in &want.partitions { + if let Some(partition) = self.get_canonical_partition(&target_ref.r#ref) { + if let Partition::Building(p) = partition { + building.push(p.get_ref()); + } + } + } + + WantSchedulability { + want: want.clone(), + status: WantUpstreamStatus { + live, + tainted, + not_ready, + building, + }, + } + } + + pub fn wants_schedulability(&self) -> WantsSchedulability { + WantsSchedulability( + self.wants + .values() + // Use type-safe is_schedulable() - only Idle wants are schedulable + .filter(|w| w.is_schedulable()) + .map(|w| self.want_schedulability(&w.to_detail())) + .collect(), + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::want_state::{IdleState as WantIdleState, Want, WantInfo, WantWithState}; + use crate::{PartitionDetail, PartitionRef, PartitionStatus, WantStatus}; + use std::collections::BTreeMap; + + impl WantDetail { + fn with_partitions(self, partitions: Vec) -> Self { + Self { partitions, ..self } + } + fn with_upstreams(self, upstreams: Vec) -> Self { + Self { upstreams, ..self } + } + fn with_status(self, status: Option) -> Self { + Self { status, ..self } + } + } + + impl PartitionDetail { + fn with_status(self, status: Option) -> Self { + Self { status, ..self } + } + fn with_ref(self, r#ref: Option) -> Self { + Self { r#ref, ..self } + } + } + + #[test] + fn test_empty_wants_noop() { + assert_eq!(BuildState::default().wants_schedulability().0.len(), 0); + } + + // A want with satisfied upstreams (incl "none") should be schedulable + #[test] + fn test_simple_want_with_live_upstream_is_schedulable() { + // Given... + let test_partition = "test_partition"; + let state = BuildState::default() + .with_wants(BTreeMap::from([( + "foo".to_string(), + Want::Idle(WantWithState { + want: WantInfo { + partitions: vec![test_partition.into()], + ..Default::default() + }, + state: WantIdleState {}, + }), + )])) + .with_partitions(BTreeMap::from([( + test_partition.to_string(), + PartitionDetail::default().with_ref(Some(test_partition.into())), + )])); + + // Should... + let schedulability = state.wants_schedulability(); + let ws = schedulability.0.first().unwrap(); + assert!(ws.is_schedulable()); + } +} diff --git a/databuild/build_state/want_transitions.rs b/databuild/build_state/want_transitions.rs new file mode 100644 index 0000000..89d4543 --- /dev/null +++ b/databuild/build_state/want_transitions.rs @@ -0,0 +1,403 @@ +//! Want state transition logic +//! +//! Methods for transitioning wants between states and managing dependencies +//! between wants (derivative wants from dep misses). + +use crate::job_run_state::JobRun; +use crate::partition_state::{FailedPartitionRef, LivePartitionRef, Partition}; +use crate::want_state::{FailedWantId, SuccessfulWantId, Want}; +use crate::PartitionRef; + +use super::BuildState; + +impl BuildState { + /// Handle creation of a derivative want (created due to job dep miss) + /// + /// When a job reports missing dependencies, it returns WantCreateV1 events for those missing partitions. + /// Those events get appended to the BEL and eventually processed by handle_want_create(). + /// + /// This function is called when we detect a derivative want (has source.job_triggered) and transitions + /// the impacted wants to UpstreamBuilding state, tracking the derivative want ID as an upstream dependency. + /// + /// KEY INSIGHT: We must use the actual want_id from the WantCreateV1 event, not synthetic UUIDs generated + /// during event processing. This ensures replay works correctly - the same want IDs are used both during + /// original execution and during replay from the BEL. + pub(crate) fn handle_derivative_want_creation( + &mut self, + derivative_want_id: &str, + derivative_want_partitions: &[PartitionRef], + source_job_run_id: &str, + ) { + // Look up the job run that triggered this derivative want + // This job run must be in DepMiss state because it reported missing dependencies + let job_run = self.job_runs.get(source_job_run_id).expect(&format!( + "BUG: Job run {} must exist when derivative want created", + source_job_run_id + )); + + // Extract the missing deps from the DepMiss job run + let missing_deps = match job_run { + JobRun::DepMiss(dep_miss) => dep_miss.get_missing_deps(), + _ => { + panic!( + "BUG: Job run {} must be in DepMiss state when derivative want created, found {:?}", + source_job_run_id, job_run + ); + } + }; + + // Find which MissingDeps entry corresponds to this derivative want + // The derivative want was created for a specific set of missing partitions, + // and we need to find which downstream partitions are impacted by those missing partitions + for md in missing_deps { + // Check if this derivative want's partitions match the missing partitions in this entry + // We need exact match because one dep miss event can create multiple derivative wants + let partitions_match = md.missing.iter().all(|missing_ref| { + derivative_want_partitions + .iter() + .any(|p| p.r#ref == missing_ref.r#ref) + }) && derivative_want_partitions.len() == md.missing.len(); + + if partitions_match { + // Now we know which partitions are impacted by this missing dependency + let impacted_partition_refs: Vec = + md.impacted.iter().map(|p| p.r#ref.clone()).collect(); + + tracing::debug!( + derivative_want_id = %derivative_want_id, + source_job_run_id = %source_job_run_id, + missing_partitions = ?derivative_want_partitions.iter().map(|p| &p.r#ref).collect::>(), + impacted_partitions = ?impacted_partition_refs, + "Processing derivative want creation" + ); + + // Find all wants that include these impacted partitions + // These are the wants that need to wait for the derivative want to complete + let mut impacted_want_ids: std::collections::HashSet = + std::collections::HashSet::new(); + for partition_ref in &impacted_partition_refs { + for want_id in self.get_wants_for_partition(partition_ref) { + impacted_want_ids.insert(want_id.clone()); + } + } + + // Transition each impacted want to UpstreamBuilding, tracking this derivative want as an upstream + for want_id in impacted_want_ids { + let want = self.wants.remove(&want_id).expect(&format!( + "BUG: Want {} must exist when processing derivative want", + want_id + )); + + let transitioned = match want { + Want::Building(building) => { + // First dep miss for this want: Building → UpstreamBuilding + tracing::info!( + want_id = %want_id, + derivative_want_id = %derivative_want_id, + "Want: Building → UpstreamBuilding (first missing dep detected)" + ); + Want::UpstreamBuilding( + building.detect_missing_deps(vec![derivative_want_id.to_string()]), + ) + } + Want::UpstreamBuilding(upstream) => { + // Additional dep miss: UpstreamBuilding → UpstreamBuilding (add another upstream) + // This can happen if multiple jobs report dep misses for different upstreams + tracing::info!( + want_id = %want_id, + derivative_want_id = %derivative_want_id, + "Want: UpstreamBuilding → UpstreamBuilding (additional upstream added)" + ); + Want::UpstreamBuilding( + upstream.add_upstreams(vec![derivative_want_id.to_string()]), + ) + } + _ => { + panic!( + "BUG: Want {} in invalid state {:?} when processing derivative want. Should be Building or UpstreamBuilding.", + want_id, want + ); + } + }; + + self.wants.insert(want_id, transitioned); + } + } + } + } + + /// Complete wants when all their partitions become Live + /// Transitions Building → Successful, returns list of newly successful want IDs + pub(crate) fn complete_successful_wants( + &mut self, + newly_live_partitions: &[LivePartitionRef], + job_run_id: &str, + timestamp: u64, + ) -> Vec { + let mut newly_successful_wants: Vec = Vec::new(); + + for pref in newly_live_partitions { + let want_ids: Vec = self.get_wants_for_partition(&pref.0.r#ref).to_vec(); + + for want_id in want_ids { + let want = self.wants.remove(&want_id).expect(&format!( + "BUG: Want {} must exist when referenced by partition", + want_id + )); + + let transitioned = match want { + Want::Building(building) => { + // Check if ALL partitions for this want are now Live + let all_partitions_live = building.want.partitions.iter().all(|p| { + self.get_canonical_partition(&p.r#ref) + .map(|partition| partition.is_live()) + .unwrap_or(false) + }); + + if all_partitions_live { + let successful_want = + building.complete(job_run_id.to_string(), timestamp); + tracing::info!( + want_id = %want_id, + job_run_id = %job_run_id, + "Want: Building → Successful" + ); + newly_successful_wants.push(successful_want.get_id()); + Want::Successful(successful_want) + } else { + Want::Building(building) // Still building other partitions + } + } + _ => { + panic!( + "BUG: Want {} in invalid state {:?} when partition {} became Live. Should be Building.", + want_id, want, pref.0.r#ref + ); + } + }; + + self.wants.insert(want_id.clone(), transitioned); + } + } + + newly_successful_wants + } + + /// Fail wants when their partitions fail + /// Transitions Building → Failed, and adds to already-failed wants + /// Returns list of newly failed want IDs for downstream cascade + pub(crate) fn fail_directly_affected_wants( + &mut self, + failed_partitions: &[FailedPartitionRef], + ) -> Vec { + let mut newly_failed_wants: Vec = Vec::new(); + + for pref in failed_partitions { + let want_ids: Vec = self.get_wants_for_partition(&pref.0.r#ref).to_vec(); + + for want_id in want_ids { + let want = self.wants.remove(&want_id).expect(&format!( + "BUG: Want {} must exist when referenced by partition", + want_id + )); + + let transitioned = match want { + Want::Building(building) => { + let failed = building + .fail(vec![pref.0.clone()], "Partition build failed".to_string()); + newly_failed_wants.push(failed.get_id()); + Want::Failed(failed) + } + // Failed → Failed: add new failed partition to existing failed state + Want::Failed(failed) => { + Want::Failed(failed.add_failed_partitions(vec![pref.clone()])) + } + _ => { + panic!( + "BUG: Want {} in invalid state {:?} when partition {} failed. Should be Building or Failed.", + want_id, want, pref.0.r#ref + ); + } + }; + + self.wants.insert(want_id.clone(), transitioned); + } + } + + newly_failed_wants + } + + /// Unblock downstream wants when their upstream dependencies succeed + /// Transitions UpstreamBuilding → Idle (when ready) or Building (when partitions already building) + pub(crate) fn unblock_downstream_wants( + &mut self, + newly_successful_wants: &[SuccessfulWantId], + job_run_id: &str, + timestamp: u64, + ) { + tracing::debug!( + newly_successful_wants = ?newly_successful_wants + .iter() + .map(|w| &w.0) + .collect::>(), + "Checking downstream wants for unblocking" + ); + // Find downstream wants that are waiting for any of the newly successful wants + // TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants + let downstream_wants_to_check: Vec = self + .wants + .iter() + .filter_map(|(id, want)| { + match want { + Want::UpstreamBuilding(downstream_want) => { + // Is this downstream want waiting for any of the newly successful wants? + let is_affected = + downstream_want.state.upstream_want_ids.iter().any(|up_id| { + newly_successful_wants.iter().any(|swid| &swid.0 == up_id) + }); + if is_affected { Some(id.clone()) } else { None } + } + _ => None, + } + }) + .collect(); + tracing::debug!( + downstream_wants_to_check = ?downstream_wants_to_check, + "Found downstream wants affected by upstream completion" + ); + + for want_id in downstream_wants_to_check { + let want = self + .wants + .remove(&want_id) + .expect(&format!("BUG: Want {} must exist", want_id)); + + let transitioned = match want { + Want::UpstreamBuilding(downstream_want) => { + tracing::debug!( + want_id = %want_id, + upstreams = ?downstream_want.state.upstream_want_ids, + "Checking if all upstreams are satisfied" + ); + // Check if ALL of this downstream want's upstream dependencies are now Successful + let all_upstreams_successful = downstream_want + .state + .upstream_want_ids + .iter() + .all(|up_want_id| { + self.wants + .get(up_want_id) + .map(|w| matches!(w, Want::Successful(_))) + .unwrap_or(false) + }); + tracing::debug!( + want_id = %want_id, + all_upstreams_successful = %all_upstreams_successful, + "Upstream satisfaction check complete" + ); + + if all_upstreams_successful { + // Check if any of this want's partitions are still being built + // If a job dep-missed, its partitions transitioned back to Missing + // But other jobs might still be building other partitions for this want + let any_partition_building = + downstream_want.want.partitions.iter().any(|p| { + self.get_canonical_partition(&p.r#ref) + .map(|partition| matches!(partition, Partition::Building(_))) + .unwrap_or(false) + }); + tracing::debug!( + want_id = %want_id, + any_partition_building = %any_partition_building, + "Partition building status check" + ); + + if any_partition_building { + // Some partitions still being built, continue in Building state + tracing::info!( + want_id = %want_id, + job_run_id = %job_run_id, + "Want: UpstreamBuilding → Building (upstreams satisfied, partitions building)" + ); + Want::Building( + downstream_want + .continue_building(job_run_id.to_string(), timestamp), + ) + } else { + // No partitions being built, become schedulable again + tracing::info!( + want_id = %want_id, + "Want: UpstreamBuilding → Idle (upstreams satisfied, ready to schedule)" + ); + Want::Idle(downstream_want.upstreams_satisfied()) + } + } else { + // Upstreams not all satisfied yet, stay in UpstreamBuilding + tracing::debug!( + want_id = %want_id, + "Want remains in UpstreamBuilding state (upstreams not yet satisfied)" + ); + Want::UpstreamBuilding(downstream_want) + } + } + _ => { + panic!("BUG: Want {} should be UpstreamBuilding here", want_id); + } + }; + + self.wants.insert(want_id, transitioned); + } + } + + /// Cascade failures to downstream wants when their upstream dependencies fail + /// Transitions UpstreamBuilding → UpstreamFailed + pub(crate) fn cascade_failures_to_downstream_wants( + &mut self, + newly_failed_wants: &[FailedWantId], + timestamp: u64, + ) { + // Find downstream wants that are waiting for any of the newly failed wants + // TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants + let downstream_wants_to_fail: Vec = self + .wants + .iter() + .filter_map(|(id, want)| { + match want { + Want::UpstreamBuilding(downstream_want) => { + // Is this downstream want waiting for any of the newly failed wants? + let is_affected = + downstream_want.state.upstream_want_ids.iter().any(|up_id| { + newly_failed_wants.iter().any(|fwid| &fwid.0 == up_id) + }); + if is_affected { Some(id.clone()) } else { None } + } + _ => None, + } + }) + .collect(); + + for want_id in downstream_wants_to_fail { + let want = self + .wants + .remove(&want_id) + .expect(&format!("BUG: Want {} must exist", want_id)); + + let transitioned = match want { + Want::UpstreamBuilding(downstream_want) => Want::UpstreamFailed( + downstream_want.upstream_failed( + newly_failed_wants + .iter() + .map(|fwid| fwid.0.clone()) + .collect(), + timestamp, + ), + ), + _ => { + panic!("BUG: Want {} should be UpstreamBuilding here", want_id); + } + }; + + self.wants.insert(want_id, transitioned); + } + } +}