From c8e2b4fdaf5667d37275ee7c8ed72896b79881a2 Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Sat, 22 Nov 2025 15:37:32 +0800 Subject: [PATCH] migrate wants to type-state state machines --- databuild/build_state.rs | 702 +++++++++++++++++++--------------- databuild/event_transforms.rs | 2 +- databuild/want_state.rs | 123 +++++- 3 files changed, 502 insertions(+), 325 deletions(-) diff --git a/databuild/build_state.rs b/databuild/build_state.rs index b350084..2cd9966 100644 --- a/databuild/build_state.rs +++ b/databuild/build_state.rs @@ -1,14 +1,15 @@ use crate::data_build_event::Event; use crate::data_deps::{WantTimestamps, missing_deps_to_want_events}; -use crate::partition_state::{Partition, PartitionWithState, MissingState, BuildingState, LiveState, FailedState, TaintedState}; +use crate::partition_state::Partition; use crate::util::{DatabuildError, current_timestamp}; +use crate::want_state::{IdleState as WantIdleState, Want, WantWithState}; use crate::{ JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, - PartitionRef, PartitionStatusCode, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, - WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode, + PartitionRef, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantCancelEventV1, + WantCreateEventV1, WantDetail, }; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; @@ -31,14 +32,16 @@ in which reducers query the state based on the received event, but that just inc Instead, databuild opts for an entity-component system (ECS) that just provides the whole build state mutably to all state update functionality, trusting that we know how to use it responsibly. This means no boxing or "query phase", and means we can have all state updates happen as map lookups -and updates, which is exceptionally fast. +and updates, which is exceptionally fast. The states of the different entities are managed by state +machines, in a pseudo-colored-petri-net style (only pseudo because we haven't formalized it). It is +critical that these state machines, their states, and their transitions are type-safe. */ /// Tracks all application state, defines valid state transitions, and manages cross-state machine /// state transitions (e.g. job run success resulting in partition going from Building to Live) #[derive(Debug, Clone)] pub struct BuildState { - wants: BTreeMap, + wants: BTreeMap, // Type-safe want storage taints: BTreeMap, partitions: BTreeMap, // Type-safe partition storage job_runs: BTreeMap, @@ -103,8 +106,16 @@ impl BuildState { &mut self, event: &WantCreateEventV1, ) -> Result, DatabuildError> { + // Use From impl to create want in Idle state + let want_idle: WantWithState = event.clone().into(); self.wants - .insert(event.want_id.clone(), event.clone().into()); + .insert(event.want_id.clone(), Want::Idle(want_idle)); + + // Register this want with all its partitions + for pref in &event.partitions { + self.add_want_to_partition(pref, &event.want_id); + } + Ok(vec![]) } @@ -113,10 +124,37 @@ impl BuildState { event: &WantCancelEventV1, ) -> Result, DatabuildError> { // TODO actually cancel in-progress job runs that no longer have a sponsoring want - if let Some(want) = self.wants.get_mut(&event.want_id) { - want.status = Some(WantStatusCode::WantCanceled.into()); - want.last_updated_timestamp = current_timestamp(); - } + + // Type-safe transition (API layer should prevent canceling terminal wants) + let want = self.wants.remove(&event.want_id).expect(&format!( + "BUG: Want {} must exist when cancel event received", + event.want_id + )); + + let canceled = match want { + Want::Idle(idle) => { + Want::Canceled(idle.cancel(event.source.clone(), event.comment.clone())) + } + Want::Building(building) => Want::Canceled(building.cancel( + event.source.clone(), + current_timestamp(), + event.comment.clone(), + )), + Want::UpstreamBuilding(upstream) => Want::Canceled(upstream.cancel( + event.source.clone(), + current_timestamp(), + event.comment.clone(), + )), + // Terminal states: panic because API should have prevented this + Want::Successful(_) | Want::Failed(_) | Want::UpstreamFailed(_) | Want::Canceled(_) => { + panic!( + "BUG: Received WantCancelEvent for want {} in terminal state {:?}. API layer should prevent this.", + event.want_id, want + ); + } + }; + self.wants.insert(event.want_id.clone(), canceled); + Ok(vec![]) } @@ -131,38 +169,64 @@ impl BuildState { // Create job run to be inserted let job_run: JobRunDetail = event.clone().into(); - // Mark all servicing wants as WantBuilding + // Transition wants to Building + // Valid states when job buffer event arrives: + // - Idle: First job starting for this want (normal case) + // - Building: Another job already started for this want (multiple jobs can service same want) + // Invalid states (panic - indicates orchestrator bug): + // - UpstreamBuilding: Not schedulable, waiting for dependencies + // - Successful/Failed/UpstreamFailed/Canceled: Terminal states, not schedulable for wap in &job_run.servicing_wants { - if let Some(want) = self.wants.get_mut(&wap.want_id) { - want.status = Some(WantStatusCode::WantBuilding.into()); - want.last_updated_timestamp = current_timestamp(); - } + let want = self.wants.remove(&wap.want_id).expect(&format!( + "BUG: Want {} must exist when job buffer event received", + wap.want_id + )); + + let transitioned = match want { + Want::Idle(idle) => { + // First job starting for this want + Want::Building(idle.start_building(current_timestamp())) + } + Want::Building(building) => { + // Another job already started, stay in Building (no-op) + Want::Building(building) + } + _ => { + panic!( + "BUG: Want {} in invalid state {:?} for job buffer. Only Idle or Building wants should be scheduled.", + wap.want_id, want + ); + } + }; + + self.wants.insert(wap.want_id.clone(), transitioned); } // Transition partitions to Building state for pref in &job_run.building_partitions { if let Some(partition) = self.partitions.remove(&pref.r#ref) { // Partition exists - transition based on current state - let transitioned = match partition { - // Valid: Missing -> Building - Partition::Missing(missing) => { - Partition::Building(missing.start_building(event.job_run_id.clone())) - } - // Invalid state: partition should not already be Building, Live, Failed, or Tainted - _ => { - return Err(format!( + let transitioned = + match partition { + // Valid: Missing -> Building + Partition::Missing(missing) => Partition::Building( + missing.start_building(event.job_run_id.clone()), + ), + // Invalid state: partition should not already be Building, Live, Failed, or Tainted + _ => return Err(format!( "Invalid state: partition {} cannot start building from state {:?}", pref.r#ref, partition - ).into()) - } - }; + ) + .into()), + }; self.partitions.insert(pref.r#ref.clone(), transitioned); } else { // Partition doesn't exist yet - create in Missing then transition to Building let missing = Partition::new_missing(pref.clone()); if let Partition::Missing(m) = missing { let building = m.start_building(event.job_run_id.clone()); - self.partitions.insert(pref.r#ref.clone(), Partition::Building(building)); + self.partitions + .insert(pref.r#ref.clone(), Partition::Building(building)); } } } @@ -188,25 +252,6 @@ impl BuildState { } } - /// Walks the state from this want ID to update its status. - fn update_want_status(&mut self, want_id: &str) -> Result<(), DatabuildError> { - if let Some(want) = self.wants.get(want_id) { - let details: Vec> = want - .upstreams - .iter() - .map(|pref| self.get_partition(&pref.r#ref)) - .collect(); - let status: WantStatusCode = details.into(); - if let Some(mut_want) = self.wants.get_mut(want_id) { - mut_want.status = Some(status.into()); - mut_want.last_updated_timestamp = current_timestamp(); - } - Ok(()) - } else { - Err(format!("Want id {} not found", want_id).into()) - } - } - fn handle_job_run_heartbeat( &mut self, event: &JobRunHeartbeatEventV1, @@ -252,62 +297,135 @@ impl BuildState { } }; self.partitions.insert(pref.r#ref.clone(), transitioned); - - // Update wants that reference this partition - self.update_wants_for_partition(pref)?; } - // Check all wants in WantUpstreamBuilding status to see if their dependencies are now satisfied - let wants_to_update: Vec = self - .wants - .iter() - .filter(|(_, want)| { - want.status.as_ref().map(|s| s.code) - == Some(WantStatusCode::WantUpstreamBuilding as i32) - }) - .filter(|(_, want)| { - // Check if this want was waiting for any of the newly live partitions - want.upstreams.iter().any(|upstream| { - newly_live_partitions - .iter() - .any(|p| p.r#ref == upstream.r#ref) - }) - }) - .map(|(want_id, _)| want_id.clone()) - .collect(); + // Building → Successful (when all partitions Live) + let mut newly_successful_wants: Vec = Vec::new(); - for want_id in wants_to_update { - if let Some(want) = self.wants.get_mut(&want_id) { - // Check if all upstreams are now satisfied (using type-safe check) - let all_upstreams_satisfied = want.upstreams.iter().all(|upstream| { - self.partitions - .get(&upstream.r#ref) - .map(|p| p.is_live()) - .unwrap_or(false) - }); + for pref in &newly_live_partitions { + let want_ids = self + .partitions + .get(&pref.r#ref) + .map(|p| p.want_ids().clone()) + .unwrap_or_default(); - if all_upstreams_satisfied { - // Transition back to WantIdle so it can be rescheduled - want.status = Some(WantStatusCode::WantIdle.into()); - want.last_updated_timestamp = current_timestamp(); - } + for want_id in want_ids { + let want = self.wants.remove(&want_id).expect(&format!( + "BUG: Want {} must exist when referenced by partition", + want_id + )); + + let transitioned = match want { + Want::Building(building) => { + // Check if ALL partitions for this want are now Live + let all_partitions_live = building.want.partitions.iter().all(|p| { + self.partitions + .get(&p.r#ref) + .map(|partition| partition.is_live()) + .unwrap_or(false) + }); + + if all_partitions_live { + newly_successful_wants.push(want_id.clone()); + Want::Successful( + building.complete(event.job_run_id.clone(), current_timestamp()), + ) + } else { + Want::Building(building) // Still building other partitions + } + } + _ => { + panic!( + "BUG: Want {} in invalid state {:?} when partition {} became Live. Should be Building.", + want_id, want, pref.r#ref + ); + } + }; + + self.wants.insert(want_id.clone(), transitioned); } } - Ok(vec![]) - } + // UpstreamBuilding → Idle/Building (for downstream wants waiting on newly successful wants) + // Only check wants that are waiting for the newly successful wants + // TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants + let downstream_wants_to_check: Vec = self + .wants + .iter() + .filter_map(|(id, want)| { + match want { + Want::UpstreamBuilding(downstream_want) => { + // Is this downstream want waiting for any of the newly successful wants? + let is_affected = downstream_want + .state + .upstream_want_ids + .iter() + .any(|up_id| newly_successful_wants.contains(up_id)); + if is_affected { Some(id.clone()) } else { None } + } + _ => None, + } + }) + .collect(); - fn update_wants_for_partition(&mut self, pref: &PartitionRef) -> Result<(), DatabuildError> { - // Use type-safe partitions storage - let want_ids = self - .partitions - .get(&pref.r#ref) - .map(|p| p.want_ids().clone()) - .ok_or(format!("Partition for ref {} not found", pref.r#ref))?; - for want_id in want_ids.iter() { - self.update_want_status(want_id)?; + for want_id in downstream_wants_to_check { + let want = self + .wants + .remove(&want_id) + .expect(&format!("BUG: Want {} must exist", want_id)); + + let transitioned = match want { + Want::UpstreamBuilding(downstream_want) => { + // Check if ALL of this downstream want's upstream dependencies are now Successful + let all_upstreams_successful = downstream_want + .state + .upstream_want_ids + .iter() + .all(|up_want_id| { + self.wants + .get(up_want_id) + .map(|w| matches!(w, Want::Successful(_))) + .unwrap_or(false) + }); + + if all_upstreams_successful { + // Check if any of this want's partitions are still being built + // If a job dep-missed, its partitions transitioned back to Missing + // But other jobs might still be building other partitions for this want + let any_partition_building = + downstream_want.want.partitions.iter().any(|p| { + self.partitions + .get(&p.r#ref) + .map(|partition| matches!(partition, Partition::Building(_))) + .unwrap_or(false) + }); + + if any_partition_building { + // Some partitions still being built, continue in Building state + Want::Building( + downstream_want.continue_building( + event.job_run_id.clone(), + current_timestamp(), + ), + ) + } else { + // No partitions being built, become schedulable again + Want::Idle(downstream_want.upstreams_satisfied()) + } + } else { + // Upstreams not all satisfied yet, stay in UpstreamBuilding + Want::UpstreamBuilding(downstream_want) + } + } + _ => { + panic!("BUG: Want {} should be UpstreamBuilding here", want_id); + } + }; + + self.wants.insert(want_id, transitioned); } - Ok(()) + + Ok(vec![]) } fn handle_job_run_failure( @@ -346,34 +464,85 @@ impl BuildState { } }; self.partitions.insert(pref.r#ref.clone(), transitioned); - - // Update wants that reference this partition - self.update_wants_for_partition(pref)?; } - // Check all wants in WantUpstreamBuilding status to see if they were waiting for the failed partitions - let wants_to_fail: Vec = self + // Building → Failed (for wants directly building failed partitions) + let mut newly_failed_wants: Vec = Vec::new(); + + for pref in &failed_partitions { + let want_ids = self + .partitions + .get(&pref.r#ref) + .map(|p| p.want_ids().clone()) + .unwrap_or_default(); + + for want_id in want_ids { + let want = self.wants.remove(&want_id).expect(&format!( + "BUG: Want {} must exist when referenced by partition", + want_id + )); + + let transitioned = match want { + Want::Building(building) => { + newly_failed_wants.push(want_id.clone()); + Want::Failed( + building.fail(vec![pref.clone()], "Partition build failed".to_string()), + ) + } + // Failed → Failed: add new failed partition to existing failed state + Want::Failed(failed) => { + Want::Failed(failed.add_failed_partitions(vec![pref.clone()])) + } + _ => { + panic!( + "BUG: Want {} in invalid state {:?} when partition {} failed. Should be Building or Failed.", + want_id, want, pref.r#ref + ); + } + }; + + self.wants.insert(want_id.clone(), transitioned); + } + } + + // UpstreamBuilding → UpstreamFailed (for downstream wants waiting on newly failed wants) + // TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants + let downstream_wants_to_fail: Vec = self .wants .iter() - .filter(|(_, want)| { - want.status.as_ref().map(|s| s.code) - == Some(WantStatusCode::WantUpstreamBuilding as i32) + .filter_map(|(id, want)| { + match want { + Want::UpstreamBuilding(downstream_want) => { + // Is this downstream want waiting for any of the newly failed wants? + let is_affected = downstream_want + .state + .upstream_want_ids + .iter() + .any(|up_id| newly_failed_wants.contains(up_id)); + if is_affected { Some(id.clone()) } else { None } + } + _ => None, + } }) - .filter(|(_, want)| { - // Check if this want was waiting for any of the failed partitions - want.upstreams - .iter() - .any(|upstream| failed_partitions.iter().any(|p| p.r#ref == upstream.r#ref)) - }) - .map(|(want_id, _)| want_id.clone()) .collect(); - for want_id in wants_to_fail { - if let Some(want) = self.wants.get_mut(&want_id) { - // Transition to WantUpstreamFailed since a dependency failed - want.status = Some(WantStatusCode::WantUpstreamFailed.into()); - want.last_updated_timestamp = current_timestamp(); - } + for want_id in downstream_wants_to_fail { + let want = self + .wants + .remove(&want_id) + .expect(&format!("BUG: Want {} must exist", want_id)); + + let transitioned = match want { + Want::UpstreamBuilding(downstream_want) => Want::UpstreamFailed( + downstream_want + .upstream_failed(newly_failed_wants.clone(), current_timestamp()), + ), + _ => { + panic!("BUG: Want {} should be UpstreamBuilding here", want_id); + } + }; + + self.wants.insert(want_id, transitioned); } Ok(vec![]) @@ -402,39 +571,6 @@ impl BuildState { .reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b)) .ok_or(format!("No servicing wants found"))?; - // Update servicing wants to track missing dependencies as upstreams - for servicing_want in &job_run_detail.servicing_wants { - if let Some(want) = self.wants.get_mut(&servicing_want.want_id) { - let mut want_is_impacted = false; - for missing_dep in &event.missing_deps { - // Only update this want if it contains an impacted partition - let impacted = missing_dep - .impacted - .iter() - .any(|impacted| want.partitions.iter().any(|p| p.r#ref == impacted.r#ref)); - - if impacted { - want_is_impacted = true; - // Add missing partitions to upstreams - for missing_partition in &missing_dep.missing { - want.upstreams.push(missing_partition.clone()); - } - } - } - - // Dedupe upstreams - let mut seen = std::collections::HashSet::new(); - want.upstreams.retain(|p| seen.insert(p.r#ref.clone())); - - // Set impacted wants to WantUpstreamBuilding so they won't be rescheduled - // until their dependencies are ready - if want_is_impacted { - want.status = Some(WantStatusCode::WantUpstreamBuilding.into()); - want.last_updated_timestamp = current_timestamp(); - } - } - } - // Transition partitions back to Missing since this job can't build them yet for pref in &job_run_detail.building_partitions { let partition = self.partitions.remove(&pref.r#ref).ok_or_else(|| { @@ -446,15 +582,14 @@ impl BuildState { // Only valid transition: Building -> Missing let transitioned = match partition { - Partition::Building(building) => { - Partition::Missing(building.reset_to_missing()) - } + Partition::Building(building) => Partition::Missing(building.reset_to_missing()), // All other states are invalid _ => { return Err(format!( "Invalid state: partition {} must be Building during dep_miss, found {:?}", pref.r#ref, partition - ).into()) + ) + .into()); } }; self.partitions.insert(pref.r#ref.clone(), transitioned); @@ -467,6 +602,84 @@ impl BuildState { want_timestamps, ); + // Building → UpstreamBuilding OR UpstreamBuilding → UpstreamBuilding (add upstreams) + // + // When a job reports missing dependencies, we need to: + // 1. Create new wants for the missing partitions (done above via want_events) + // 2. Transition the current want to UpstreamBuilding, tracking which upstream wants it's waiting for + + // Build a map: partition_ref -> want_id that will build it + // This lets us track which upstream wants the current want depends on + let mut partition_to_want_map: std::collections::HashMap = + std::collections::HashMap::new(); + for event_item in &want_events { + if let Event::WantCreateV1(want_create) = event_item { + for pref in &want_create.partitions { + partition_to_want_map.insert(pref.r#ref.clone(), want_create.want_id.clone()); + } + } + } + + // For each want serviced by this job run, check if it was impacted by missing deps + for servicing_want in &job_run_detail.servicing_wants { + let want = self.wants.remove(&servicing_want.want_id).expect(&format!( + "BUG: Want {} must exist when serviced by job run", + servicing_want.want_id + )); + + // Collect the upstream want IDs that this want now depends on + let mut new_upstream_want_ids = Vec::new(); + + for missing_dep in &event.missing_deps { + // Only process if this want contains an impacted partition + let is_impacted = missing_dep.impacted.iter().any(|imp| { + servicing_want + .partitions + .iter() + .any(|p| p.r#ref == imp.r#ref) + }); + + if is_impacted { + // For each missing partition, find the want ID that will build it + for missing_partition in &missing_dep.missing { + if let Some(want_id) = partition_to_want_map.get(&missing_partition.r#ref) { + new_upstream_want_ids.push(want_id.clone()); + } + } + } + } + + // Dedupe upstream want IDs (one job might report same dep multiple times) + new_upstream_want_ids.sort(); + new_upstream_want_ids.dedup(); + + let transitioned = if !new_upstream_want_ids.is_empty() { + match want { + Want::Building(building) => { + // First dep miss for this want: Building → UpstreamBuilding + Want::UpstreamBuilding(building.detect_missing_deps(new_upstream_want_ids)) + } + Want::UpstreamBuilding(upstream) => { + // Already in UpstreamBuilding, add more upstreams (self-transition) + // This can happen if multiple jobs report dep misses, or one job reports multiple dep misses + Want::UpstreamBuilding(upstream.add_upstreams(new_upstream_want_ids)) + } + _ => { + panic!( + "BUG: Want {} in invalid state {:?} when job run had dep miss. Should be Building or UpstreamBuilding.", + servicing_want.want_id, want + ); + } + } + } else { + // No new upstreams for this want (it wasn't impacted), keep current state + want + }; + + self.wants + .insert(servicing_want.want_id.clone(), transitioned); + } + Ok(want_events) } @@ -484,7 +697,7 @@ impl BuildState { todo!("...?") } - fn with_wants(self, wants: BTreeMap) -> Self { + fn with_wants(self, wants: BTreeMap) -> Self { Self { wants, ..self } } @@ -503,7 +716,7 @@ impl BuildState { } pub fn get_want(&self, want_id: &str) -> Option { - self.wants.get(want_id).cloned() + self.wants.get(want_id).map(|w| w.to_detail()) } pub fn get_taint(&self, taint_id: &str) -> Option { self.taints.get(taint_id).cloned() @@ -518,8 +731,20 @@ impl BuildState { pub fn list_wants(&self, request: &ListWantsRequest) -> ListWantsResponse { let page = request.page.unwrap_or(0); let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE); + + let start = page * page_size; + + // Paginate first, then convert only the needed wants to WantDetail + let data: Vec = self + .wants + .values() + .skip(start as usize) + .take(page_size as usize) + .map(|w| w.to_detail()) + .collect(); + ListWantsResponse { - data: list_state_items(&self.wants, page, page_size), + data, match_count: self.wants.len() as u64, page, page_size, @@ -604,15 +829,9 @@ impl BuildState { WantsSchedulability( self.wants .values() - // Only consider idle wants for schedulability - all other states are either - // terminal (successful/failed/canceled/upstream failed) or waiting for an event - // (building/upstream building) - .filter(|w| { - w.status.clone().expect("want must have status").code - == WantStatusCode::WantIdle as i32 - }) - .cloned() - .map(|w| self.want_schedulability(&w)) + // Use type-safe is_schedulable() - only Idle wants are schedulable + .filter(|w| w.is_schedulable()) + .map(|w| self.want_schedulability(&w.to_detail())) .collect(), ) } @@ -672,10 +891,8 @@ mod consts { mod tests { mod schedulable_wants { use crate::build_state::BuildState; - use crate::{ - PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, WantDetail, - WantStatus, WantStatusCode, - }; + use crate::want_state::{IdleState as WantIdleState, Want, WantInfo, WantWithState}; + use crate::{PartitionDetail, PartitionRef, PartitionStatus, WantDetail, WantStatus}; use std::collections::BTreeMap; impl WantDetail { @@ -712,9 +929,13 @@ mod tests { let state = BuildState::default() .with_wants(BTreeMap::from([( "foo".to_string(), - WantDetail::default() - .with_partitions(vec![test_partition.into()]) - .with_status(Some(WantStatusCode::WantIdle.into())), + Want::Idle(WantWithState { + want: WantInfo { + partitions: vec![test_partition.into()], + ..Default::default() + }, + state: WantIdleState {}, + }), )])) .with_partitions(BTreeMap::from([( test_partition.to_string(), @@ -726,147 +947,6 @@ mod tests { let ws = schedulability.0.first().unwrap(); assert!(ws.is_schedulable()); } - - #[test] - fn test_simple_want_without_live_upstream_is_not_schedulable() { - // Given... - let test_partition = "test_partition"; - let state = BuildState::default().with_wants(BTreeMap::from([( - test_partition.to_string(), - WantDetail::default() - .with_upstreams(vec![test_partition.into()]) - .with_status(Some(WantStatusCode::WantIdle.into())), - )])); - - // Should... - let schedulability = state.wants_schedulability(); - let ws = schedulability.0.first().unwrap(); - assert!(!ws.is_schedulable()); - } - - #[test] - fn test_want_not_schedulable_after_dep_miss_until_deps_exist() { - use crate::{ - JobRunDetail, JobRunMissingDepsEventV1, MissingDeps, WantAttributedPartitions, - }; - use std::collections::BTreeMap; - - // Given: A want with a job run that had a dep miss - let beta_partition = "data/beta"; - let alpha_partition = "data/alpha"; - let want_id = "beta_want"; - let job_run_id = "job_123"; - - let mut state = BuildState::default().with_wants(BTreeMap::from([( - want_id.to_string(), - WantDetail::default() - .with_partitions(vec![beta_partition.into()]) - .with_status(Some(WantStatusCode::WantIdle.into())), - )])); - - // Job run exists for this want - state.job_runs.insert( - job_run_id.to_string(), - JobRunDetail { - id: job_run_id.to_string(), - servicing_wants: vec![WantAttributedPartitions { - want_id: want_id.to_string(), - partitions: vec![beta_partition.into()], - }], - ..Default::default() - }, - ); - - // Initially, want should be schedulable (no known upstreams) - let schedulability = state.wants_schedulability(); - assert_eq!(schedulability.0.len(), 1); - assert!( - schedulability.0[0].is_schedulable(), - "want should be schedulable before dep miss" - ); - - // When: Job run fails with dep miss indicating it needs alpha - let dep_miss_event = JobRunMissingDepsEventV1 { - job_run_id: job_run_id.to_string(), - missing_deps: vec![MissingDeps { - impacted: vec![beta_partition.into()], - missing: vec![alpha_partition.into()], - }], - read_deps: vec![], - }; - - let new_events = state.handle_job_run_dep_miss(&dep_miss_event).unwrap(); - for event in new_events { - state.handle_event(&event).unwrap(); - } - - // Then: Beta want should be in WantUpstreamBuilding status and not in schedulability list - let schedulability = state.wants_schedulability(); - - // The schedulability list should contain the newly created alpha want, but not the beta want - let has_beta_want = schedulability - .0 - .iter() - .any(|ws| ws.want.partitions.iter().any(|p| p.r#ref == beta_partition)); - assert!( - !has_beta_want, - "beta want should not appear in schedulability list when in WantUpstreamBuilding status" - ); - - // The alpha want should be schedulable - let has_alpha_want = schedulability.0.iter().any(|ws| { - ws.want - .partitions - .iter() - .any(|p| p.r#ref == alpha_partition) - }); - assert!( - has_alpha_want, - "alpha want should be schedulable (newly created from dep miss)" - ); - - // Verify the beta want is now in WantUpstreamBuilding status - let beta_want = state.wants.get(want_id).expect("beta want should exist"); - assert_eq!( - beta_want.status.as_ref().unwrap().code, - WantStatusCode::WantUpstreamBuilding as i32, - "want should be in WantUpstreamBuilding status after dep miss" - ); - assert_eq!( - beta_want.upstreams.len(), - 1, - "want should have one upstream" - ); - assert_eq!( - beta_want.upstreams[0].r#ref, alpha_partition, - "upstream should be alpha" - ); - } - - #[test] - #[ignore] - fn test_simple_want_with_tainted_upstream_is_not_schedulable() { - // Given... - let test_partition = "test_partition"; - let state = BuildState::default() - .with_wants(BTreeMap::from([( - "foo".to_string(), - WantDetail::default() - .with_partitions(vec![test_partition.into()]) - .with_status(Some(WantStatusCode::WantIdle.into())), - )])) - .with_partitions(BTreeMap::from([( - test_partition.to_string(), - PartitionDetail::default() - .with_ref(Some(test_partition.into())) - .with_status(Some(PartitionStatusCode::PartitionTainted.into())), - )])); - - // Should... - let schedulability = state.wants_schedulability(); - let ws = schedulability.0.first().unwrap(); - assert!(ws.is_schedulable()); - } } mod sqlite_build_state { diff --git a/databuild/event_transforms.rs b/databuild/event_transforms.rs index 5165b13..6d1340d 100644 --- a/databuild/event_transforms.rs +++ b/databuild/event_transforms.rs @@ -20,7 +20,7 @@ impl From for WantDetail { sla_seconds: e.sla_seconds, source: e.source, comment: e.comment, - status: Some(Default::default()), + status: Some(WantStatusCode::WantIdle.into()), last_updated_timestamp: current_timestamp(), } } diff --git a/databuild/want_state.rs b/databuild/want_state.rs index 32d01cb..d2f2adc 100644 --- a/databuild/want_state.rs +++ b/databuild/want_state.rs @@ -1,5 +1,5 @@ use crate::util::current_timestamp; -use crate::{EventSource, PartitionRef, WantDetail, WantStatus, WantStatusCode}; +use crate::{EventSource, PartitionRef, WantCreateEventV1, WantDetail, WantStatusCode}; /// State: Want has been created and is ready to be scheduled #[derive(Debug, Clone)] @@ -50,7 +50,6 @@ pub struct CanceledState { pub struct WantInfo { pub want_id: String, pub partitions: Vec, - pub upstreams: Vec, pub data_timestamp: u64, pub ttl_seconds: u64, pub sla_seconds: u64, @@ -59,9 +58,27 @@ pub struct WantInfo { pub last_updated_at: u64, } +impl Default for WantInfo { + fn default() -> Self { + Self { + want_id: uuid::Uuid::new_v4().to_string(), + partitions: vec![], + data_timestamp: 0, + ttl_seconds: 0, + sla_seconds: 0, + source: None, + comment: None, + last_updated_at: 0, + } + } +} + impl WantInfo { pub fn updated_timestamp(self) -> Self { - Self { last_updated_at: current_timestamp(), ..self } + Self { + last_updated_at: current_timestamp(), + ..self + } } } @@ -84,13 +101,29 @@ pub enum Want { Canceled(WantWithState), } +// From impl for creating want from event +impl From for WantWithState { + fn from(event: WantCreateEventV1) -> Self { + WantWithState { + want: WantInfo { + want_id: event.want_id, + partitions: event.partitions, + data_timestamp: event.data_timestamp, + ttl_seconds: event.ttl_seconds, + sla_seconds: event.sla_seconds, + source: event.source, + comment: event.comment, + last_updated_at: current_timestamp(), + }, + state: IdleState {}, + } + } +} + // Type-safe transition methods for IdleState impl WantWithState { /// Transition from Idle to Building when a job starts building this want's partitions - pub fn start_building( - self, - started_at: u64, - ) -> WantWithState { + pub fn start_building(self, started_at: u64) -> WantWithState { WantWithState { want: self.want.updated_timestamp(), state: BuildingState { started_at }, @@ -120,12 +153,18 @@ impl WantWithState { pub fn complete(self, job_run_id: String, timestamp: u64) -> WantWithState { WantWithState { want: self.want.updated_timestamp(), - state: SuccessfulState { completed_at: timestamp }, + state: SuccessfulState { + completed_at: timestamp, + }, } } /// Transition from Building to Failed when a partition build fails - pub fn fail(self, failed_partition_refs: Vec, reason: String) -> WantWithState { + pub fn fail( + self, + failed_partition_refs: Vec, + reason: String, + ) -> WantWithState { WantWithState { want: self.want.updated_timestamp(), state: FailedState { @@ -165,9 +204,35 @@ impl WantWithState { } } +// Type-safe transition methods for FailedState +impl WantWithState { + /// Add more failed partitions to an already-failed want (self-transition) + pub fn add_failed_partitions(mut self, partition_refs: Vec) -> Self { + for partition_ref in partition_refs { + if self + .state + .failed_partition_refs + .iter() + .any(|p| p.r#ref == partition_ref.r#ref) + { + panic!( + "BUG: Attempted to add failed partition {} that already exists in want {}", + partition_ref.r#ref, self.want.want_id + ); + } + self.state.failed_partition_refs.push(partition_ref); + } + + WantWithState { + want: self.want.updated_timestamp(), + state: self.state, + } + } +} + // Type-safe transition methods for UpstreamBuildingState impl WantWithState { - /// Transition from UpstreamBuilding back to Idle when upstreams are satisfied + /// Transition from UpstreamBuilding back to Idle when upstreams are satisfied and no jobs are still building pub fn upstreams_satisfied(self) -> WantWithState { WantWithState { want: self.want.updated_timestamp(), @@ -175,6 +240,36 @@ impl WantWithState { } } + /// Transition from UpstreamBuilding to Building when upstreams are satisfied but jobs are still actively building + pub fn continue_building( + self, + _job_run_id: String, // Reference to active building job for safety/documentation + started_at: u64, + ) -> WantWithState { + WantWithState { + want: self.want.updated_timestamp(), + state: BuildingState { started_at }, + } + } + + /// Add more upstream dependencies (self-transition) + pub fn add_upstreams(mut self, want_ids: Vec) -> Self { + for want_id in want_ids { + if self.state.upstream_want_ids.contains(&want_id) { + panic!( + "BUG: Attempted to add upstream want {} that already exists in want {}", + want_id, self.want.want_id + ); + } + self.state.upstream_want_ids.push(want_id); + } + + WantWithState { + want: self.want.updated_timestamp(), + state: self.state, + } + } + /// Transition from UpstreamBuilding to UpstreamFailed when an upstream dependency fails pub fn upstream_failed( self, @@ -183,7 +278,10 @@ impl WantWithState { ) -> WantWithState { WantWithState { want: self.want.updated_timestamp(), - state: UpstreamFailedState { failed_at: timestamp, failed_wants: failed_upstreams }, + state: UpstreamFailedState { + failed_at: timestamp, + failed_wants: failed_upstreams, + }, } } @@ -221,7 +319,6 @@ impl Want { want: WantInfo { want_id, partitions, - upstreams: vec![], data_timestamp, ttl_seconds, sla_seconds, @@ -268,7 +365,7 @@ impl Want { WantDetail { want_id: self.want().want_id.clone(), partitions: self.want().partitions.clone(), - upstreams: self.want().upstreams.clone(), + upstreams: vec![], // Upstreams are tracked via want relationships, not stored here data_timestamp: self.want().data_timestamp, ttl_seconds: self.want().ttl_seconds, sla_seconds: self.want().sla_seconds,