refactor build_state.rs into module

This commit is contained in:
Stuart Axelbrooke 2025-11-25 14:55:43 +08:00
parent e7aac32607
commit 14a24ef6d6
7 changed files with 2412 additions and 2340 deletions

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,181 @@
//! Build State - the heart of databuild's orchestration system
//!
//! The BuildState struct tracks all application state, defines valid state transitions,
//! and manages cross-state machine state transitions (e.g. job run success resulting
//! in partition going from Building to Live).
//!
//! See docs/design/build-state-semantics.md for the full conceptual model.
mod event_handlers;
mod partition_transitions;
mod queries;
mod schedulability;
mod want_transitions;
use crate::job_run_state::JobRun;
use crate::partition_state::Partition;
use crate::want_state::Want;
use crate::{PartitionRef, TaintDetail};
use std::collections::BTreeMap;
use uuid::Uuid;
// Re-export public types
pub use schedulability::{WantSchedulability, WantUpstreamStatus, WantsSchedulability};
/**
Design Notes
The build state struct is the heart of the service and orchestrator, adapting build events to
higher level questions about build state. One temptation is to implement the build state as a set
of hierarchically defined reducers, to achieve information hiding and factor system capabilities and
state tracking simply. Unfortunately, to update state based on an event, you need a mutable borrow
of some part of the build state (that the reducer controls, for instance), and an immutable borrow
of the whole state for read/query purposes. The whole state needs to be available to handle state
updates like "this is the list of currently active job runs" in response to a job run event. Put
simply, this isn't possible without introducing some locking of the whole state and mutable state
subset, since they would conflict (the mutable subset would have already been borrowed, so can't
be borrowed immutably as part of the whole state borrow). You might also define a "query" phase
in which reducers query the state based on the received event, but that just increases complexity.
Instead, databuild opts for an entity-component system (ECS) that just provides the whole build
state mutably to all state update functionality, trusting that we know how to use it responsibly.
This means no boxing or "query phase", and means we can have all state updates happen as map lookups
and updates, which is exceptionally fast. The states of the different entities are managed by state
machines, in a pseudo-colored-petri-net style (only pseudo because we haven't formalized it). It is
critical that these state machines, their states, and their transitions are type-safe.
*/
/// Tracks all application state, defines valid state transitions, and manages cross-state machine
/// state transitions (e.g. job run success resulting in partition going from Building to Live)
#[derive(Debug, Clone, Default)]
pub struct BuildState {
// Core entity storage
pub(crate) wants: BTreeMap<String, Want>,
pub(crate) taints: BTreeMap<String, TaintDetail>,
pub(crate) job_runs: BTreeMap<String, JobRun>,
// UUID-based partition indexing
pub(crate) partitions_by_uuid: BTreeMap<Uuid, Partition>,
pub(crate) canonical_partitions: BTreeMap<String, Uuid>, // partition ref → current UUID
// Inverted indexes
pub(crate) wants_for_partition: BTreeMap<String, Vec<String>>, // partition ref → want_ids
pub(crate) downstream_waiting: BTreeMap<String, Vec<Uuid>>, // upstream ref → partition UUIDs waiting for it
}
impl BuildState {
/// Reconstruct BuildState from a sequence of events (for read path in web server)
/// This allows the web server to rebuild state from BEL storage without holding a lock
pub fn from_events(events: &[crate::DataBuildEvent]) -> Self {
let mut state = BuildState::default();
for event in events {
if let Some(ref inner_event) = event.event {
// handle_event returns Vec<Event> for cascading events, but we ignore them
// since we're replaying from a complete event log
state.handle_event(inner_event);
}
}
state
}
pub fn count_job_runs(&self) -> usize {
self.job_runs.len()
}
// ===== UUID-based partition access methods =====
/// Get the canonical partition for a ref (the current/active partition instance)
pub fn get_canonical_partition(&self, partition_ref: &str) -> Option<&Partition> {
self.canonical_partitions
.get(partition_ref)
.and_then(|uuid| self.partitions_by_uuid.get(uuid))
}
/// Get the canonical partition UUID for a ref
pub fn get_canonical_partition_uuid(&self, partition_ref: &str) -> Option<Uuid> {
self.canonical_partitions.get(partition_ref).copied()
}
/// Get a partition by its UUID
pub fn get_partition_by_uuid(&self, uuid: Uuid) -> Option<&Partition> {
self.partitions_by_uuid.get(&uuid)
}
/// Take the canonical partition for a ref (removes from partitions_by_uuid for state transition)
/// The canonical_partitions mapping is NOT removed - caller must update it if creating a new partition
pub(crate) fn take_canonical_partition(&mut self, partition_ref: &str) -> Option<Partition> {
self.canonical_partitions
.get(partition_ref)
.copied()
.and_then(|uuid| self.partitions_by_uuid.remove(&uuid))
}
/// Get want IDs for a partition ref (from inverted index)
pub fn get_wants_for_partition(&self, partition_ref: &str) -> &[String] {
self.wants_for_partition
.get(partition_ref)
.map(|v| v.as_slice())
.unwrap_or(&[])
}
/// Register a want in the wants_for_partition inverted index
pub(crate) fn register_want_for_partitions(&mut self, want_id: &str, partition_refs: &[PartitionRef]) {
for pref in partition_refs {
let want_ids = self
.wants_for_partition
.entry(pref.r#ref.clone())
.or_insert_with(Vec::new);
if !want_ids.contains(&want_id.to_string()) {
want_ids.push(want_id.to_string());
}
}
}
/// Update a partition in the indexes (after state transition)
pub(crate) fn update_partition(&mut self, partition: Partition) {
let uuid = partition.uuid();
self.partitions_by_uuid.insert(uuid, partition);
}
// Test helpers
pub(crate) fn with_wants(self, wants: BTreeMap<String, Want>) -> Self {
Self { wants, ..self }
}
#[cfg(test)]
pub(crate) fn with_partitions(self, old_partitions: BTreeMap<String, crate::PartitionDetail>) -> Self {
use crate::partition_state::PartitionWithState;
let mut canonical_partitions: BTreeMap<String, Uuid> = BTreeMap::new();
let mut partitions_by_uuid: BTreeMap<Uuid, Partition> = BTreeMap::new();
// Convert PartitionDetail to Live partitions for testing
for (key, detail) in old_partitions {
let partition_ref = detail.r#ref.clone().unwrap_or_default();
// Create a deterministic UUID for test data
let uuid =
crate::partition_state::derive_partition_uuid("test_job_run", &partition_ref.r#ref);
let live_partition = Partition::Live(PartitionWithState {
uuid,
partition_ref,
state: crate::partition_state::LiveState {
built_at: 0,
built_by: "test_job_run".to_string(),
},
});
canonical_partitions.insert(key, uuid);
partitions_by_uuid.insert(uuid, live_partition);
}
Self {
canonical_partitions,
partitions_by_uuid,
..self
}
}
}
pub(crate) mod consts {
pub const DEFAULT_PAGE_SIZE: u64 = 100;
}

View file

@ -0,0 +1,338 @@
//! Partition state transition logic
//!
//! Methods for transitioning partitions between states (Building, Live, Failed,
//! UpstreamBuilding, UpForRetry, UpstreamFailed) and managing downstream dependencies.
use crate::partition_state::{
BuildingPartitionRef, BuildingState, FailedPartitionRef, LivePartitionRef, Partition,
PartitionWithState,
};
use crate::util::current_timestamp;
use crate::PartitionRef;
use uuid::Uuid;
use super::BuildState;
impl BuildState {
/// Create a new partition in Building state and update indexes
pub(crate) fn create_partition_building(&mut self, job_run_id: &str, partition_ref: PartitionRef) -> Uuid {
let partition =
PartitionWithState::<BuildingState>::new(job_run_id.to_string(), partition_ref.clone());
let uuid = partition.uuid;
// Update indexes
self.partitions_by_uuid
.insert(uuid, Partition::Building(partition));
self.canonical_partitions
.insert(partition_ref.r#ref.clone(), uuid);
tracing::info!(
partition = %partition_ref.r#ref,
uuid = %uuid,
job_run_id = %job_run_id,
"Partition: Created in Building state"
);
uuid
}
/// Create partitions in Building state
/// Used when a job run starts building partitions.
/// Note: Partitions no longer have a Missing state - they start directly as Building.
pub(crate) fn transition_partitions_to_building(
&mut self,
partition_refs: &[BuildingPartitionRef],
job_run_id: &str,
) {
for building_ref in partition_refs {
if let Some(partition) = self.get_canonical_partition(&building_ref.0.r#ref).cloned() {
// Partition already exists - this is an error unless we're retrying from UpForRetry
match partition {
Partition::UpForRetry(_) => {
// Valid: UpForRetry -> Building (retry after deps satisfied)
// Old partition stays in partitions_by_uuid as historical record
// Create new Building partition with fresh UUID
let uuid =
self.create_partition_building(job_run_id, building_ref.0.clone());
tracing::info!(
partition = %building_ref.0.r#ref,
job_run_id = %job_run_id,
uuid = %uuid,
"Partition: UpForRetry → Building (retry)"
);
}
_ => {
panic!(
"BUG: Invalid state - partition {} cannot start building from state {:?}",
building_ref.0.r#ref, partition
)
}
}
} else {
// Partition doesn't exist yet - create directly in Building state
let uuid = self.create_partition_building(job_run_id, building_ref.0.clone());
tracing::info!(
partition = %building_ref.0.r#ref,
job_run_id = %job_run_id,
uuid = %uuid,
"Partition: (new) → Building"
);
}
}
}
/// Transition partitions from Building to Live state
/// Used when a job run successfully completes
pub(crate) fn transition_partitions_to_live(
&mut self,
partition_refs: &[LivePartitionRef],
job_run_id: &str,
timestamp: u64,
) {
for pref in partition_refs {
let partition = self
.take_canonical_partition(&pref.0.r#ref)
.expect(&format!(
"BUG: Partition {} must exist and be in Building state before completion",
pref.0.r#ref
));
// ONLY valid transition: Building -> Live
let transitioned = match partition {
Partition::Building(building) => {
tracing::info!(
partition = %pref.0.r#ref,
job_run_id = %job_run_id,
"Partition: Building → Live"
);
Partition::Live(building.complete(timestamp))
}
// All other states are invalid
_ => {
panic!(
"BUG: Invalid state - partition {} must be Building to transition to Live, found {:?}",
pref.0.r#ref, partition
)
}
};
self.update_partition(transitioned);
}
}
/// Transition partitions from Building to Failed state
/// Used when a job run fails
pub(crate) fn transition_partitions_to_failed(
&mut self,
partition_refs: &[FailedPartitionRef],
job_run_id: &str,
timestamp: u64,
) {
for pref in partition_refs {
let partition = self
.take_canonical_partition(&pref.0.r#ref)
.expect(&format!(
"BUG: Partition {} must exist and be in Building state before failure",
pref.0.r#ref
));
// ONLY valid transition: Building -> Failed
let transitioned = match partition {
Partition::Building(building) => {
tracing::info!(
partition = %pref.0.r#ref,
job_run_id = %job_run_id,
"Partition: Building → Failed"
);
Partition::Failed(building.fail(timestamp))
}
// All other states are invalid
_ => {
panic!(
"BUG: Invalid state - partition {} must be Building to transition to Failed, found {:?}",
pref.0.r#ref, partition
)
}
};
self.update_partition(transitioned);
}
}
/// Transition partitions from Building to UpstreamBuilding state
/// Used when a job run encounters missing dependencies and cannot proceed.
/// The partition waits for its upstream deps to be built before becoming UpForRetry.
pub(crate) fn transition_partitions_to_upstream_building(
&mut self,
partition_refs: &[BuildingPartitionRef],
missing_deps: Vec<PartitionRef>,
) {
for building_ref in partition_refs {
let partition = self
.take_canonical_partition(&building_ref.0.r#ref)
.expect(&format!(
"BUG: Partition {} must exist and be in Building state during dep_miss",
building_ref.0.r#ref
));
// Only valid transition: Building -> UpstreamBuilding
let transitioned = match partition {
Partition::Building(building) => {
let partition_uuid = building.uuid;
tracing::info!(
partition = %building_ref.0.r#ref,
uuid = %partition_uuid,
missing_deps = ?missing_deps.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
"Partition: Building → UpstreamBuilding (dep miss)"
);
// Update downstream_waiting index: for each missing dep, record that this partition is waiting
for missing_dep in &missing_deps {
self.downstream_waiting
.entry(missing_dep.r#ref.clone())
.or_default()
.push(partition_uuid);
}
Partition::UpstreamBuilding(building.dep_miss(missing_deps.clone()))
}
// All other states are invalid
_ => {
panic!(
"BUG: Invalid state - partition {} must be Building during dep_miss, found {:?}",
building_ref.0.r#ref, partition
)
}
};
self.update_partition(transitioned);
}
}
/// Transition partitions from UpstreamBuilding to UpForRetry when their upstream deps become Live.
/// This should be called when partitions become Live to check if any downstream partitions can now retry.
/// Uses the `downstream_waiting` index for O(1) lookup of affected partitions.
pub(crate) fn unblock_downstream_partitions(&mut self, newly_live_partition_refs: &[LivePartitionRef]) {
// Collect UUIDs of partitions that might be unblocked using the inverted index
let mut uuids_to_check: Vec<Uuid> = Vec::new();
for live_ref in newly_live_partition_refs {
if let Some(waiting_uuids) = self.downstream_waiting.get(&live_ref.0.r#ref) {
uuids_to_check.extend(waiting_uuids.iter().cloned());
}
}
// Deduplicate UUIDs (a partition might be waiting for multiple deps that all became live)
uuids_to_check.sort();
uuids_to_check.dedup();
for uuid in uuids_to_check {
// Get partition by UUID - it might have been transitioned already or no longer exist
let Some(partition) = self.partitions_by_uuid.get(&uuid).cloned() else {
continue;
};
let partition_ref = partition.partition_ref().r#ref.clone();
// Only process UpstreamBuilding partitions
if let Partition::UpstreamBuilding(mut upstream_building) = partition {
// Remove satisfied deps from missing_deps
for live_ref in newly_live_partition_refs {
upstream_building
.state
.missing_deps
.retain(|d| d.r#ref != live_ref.0.r#ref);
// Also remove from downstream_waiting index
if let Some(waiting) = self.downstream_waiting.get_mut(&live_ref.0.r#ref) {
waiting.retain(|u| *u != uuid);
}
}
let transitioned = if upstream_building.state.missing_deps.is_empty() {
// All deps satisfied, transition to UpForRetry
tracing::info!(
partition = %partition_ref,
uuid = %uuid,
"Partition: UpstreamBuilding → UpForRetry (all upstreams satisfied)"
);
Partition::UpForRetry(upstream_building.upstreams_satisfied())
} else {
// Still waiting for more deps
tracing::debug!(
partition = %partition_ref,
uuid = %uuid,
remaining_deps = ?upstream_building.state.missing_deps.iter().map(|d| &d.r#ref).collect::<Vec<_>>(),
"Partition remains in UpstreamBuilding (still waiting for deps)"
);
Partition::UpstreamBuilding(upstream_building)
};
self.update_partition(transitioned);
}
}
}
/// Cascade failures to downstream partitions when their upstream dependencies fail.
/// Transitions UpstreamBuilding → UpstreamFailed for partitions waiting on failed upstreams.
/// Uses the `downstream_waiting` index for O(1) lookup of affected partitions.
pub(crate) fn cascade_failures_to_downstream_partitions(
&mut self,
failed_partition_refs: &[FailedPartitionRef],
) {
// Collect UUIDs of partitions that are waiting for the failed partitions
let mut uuids_to_fail: Vec<Uuid> = Vec::new();
for failed_ref in failed_partition_refs {
if let Some(waiting_uuids) = self.downstream_waiting.get(&failed_ref.0.r#ref) {
uuids_to_fail.extend(waiting_uuids.iter().cloned());
}
}
// Deduplicate UUIDs
uuids_to_fail.sort();
uuids_to_fail.dedup();
for uuid in uuids_to_fail {
// Get partition by UUID
let Some(partition) = self.partitions_by_uuid.get(&uuid).cloned() else {
continue;
};
let partition_ref = partition.partition_ref().r#ref.clone();
// Only process UpstreamBuilding partitions
if let Partition::UpstreamBuilding(upstream_building) = partition {
// Collect which upstream refs failed
let failed_upstream_refs: Vec<PartitionRef> = failed_partition_refs
.iter()
.filter(|f| {
upstream_building
.state
.missing_deps
.iter()
.any(|d| d.r#ref == f.0.r#ref)
})
.map(|f| f.0.clone())
.collect();
if !failed_upstream_refs.is_empty() {
tracing::info!(
partition = %partition_ref,
uuid = %uuid,
failed_upstreams = ?failed_upstream_refs.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
"Partition: UpstreamBuilding → UpstreamFailed (upstream failed)"
);
// Remove from downstream_waiting index for all deps
for dep in &upstream_building.state.missing_deps {
if let Some(waiting) = self.downstream_waiting.get_mut(&dep.r#ref) {
waiting.retain(|u| *u != uuid);
}
}
// Transition to UpstreamFailed
let transitioned = Partition::UpstreamFailed(
upstream_building
.upstream_failed(failed_upstream_refs, current_timestamp()),
);
self.update_partition(transitioned);
}
}
}
}
}

View file

@ -0,0 +1,118 @@
//! Query methods for BuildState
//!
//! Read-only methods for accessing state (get_*, list_*) used by the API layer.
use crate::{
JobRunDetail, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest,
ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest,
ListWantsResponse, PartitionDetail, TaintDetail, WantDetail,
};
use std::collections::BTreeMap;
use super::{consts, BuildState};
impl BuildState {
pub fn get_want(&self, want_id: &str) -> Option<WantDetail> {
self.wants.get(want_id).map(|w| w.to_detail())
}
pub fn get_taint(&self, taint_id: &str) -> Option<TaintDetail> {
self.taints.get(taint_id).cloned()
}
pub fn get_partition(&self, partition_id: &str) -> Option<PartitionDetail> {
self.get_canonical_partition(partition_id)
.map(|p| p.to_detail())
}
pub fn get_job_run(&self, job_run_id: &str) -> Option<JobRunDetail> {
self.job_runs.get(job_run_id).map(|jr| jr.to_detail())
}
pub fn list_wants(&self, request: &ListWantsRequest) -> ListWantsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
let start = page * page_size;
// Paginate first, then convert only the needed wants to WantDetail
let data: Vec<WantDetail> = self
.wants
.values()
.skip(start as usize)
.take(page_size as usize)
.map(|w| w.to_detail())
.collect();
ListWantsResponse {
data,
match_count: self.wants.len() as u64,
page,
page_size,
}
}
pub fn list_taints(&self, request: &ListTaintsRequest) -> ListTaintsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
ListTaintsResponse {
data: list_state_items(&self.taints, page, page_size),
match_count: self.wants.len() as u64,
page,
page_size,
}
}
pub fn list_partitions(&self, request: &ListPartitionsRequest) -> ListPartitionsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
// Convert canonical partitions to PartitionDetail for API
let partition_details: BTreeMap<String, PartitionDetail> = self
.canonical_partitions
.iter()
.filter_map(|(k, uuid)| {
self.partitions_by_uuid
.get(uuid)
.map(|p| (k.clone(), p.to_detail()))
})
.collect();
ListPartitionsResponse {
data: list_state_items(&partition_details, page, page_size),
match_count: self.canonical_partitions.len() as u64,
page,
page_size,
}
}
pub fn list_job_runs(&self, request: &ListJobRunsRequest) -> ListJobRunsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
let start = page * page_size;
let data: Vec<JobRunDetail> = self
.job_runs
.values()
.skip(start as usize)
.take(page_size as usize)
.map(|jr| jr.to_detail())
.collect();
ListJobRunsResponse {
data,
match_count: self.job_runs.len() as u64,
page,
page_size,
}
}
}
fn list_state_items<T: Clone>(map: &BTreeMap<String, T>, page: u64, page_size: u64) -> Vec<T> {
// TODO when we add filtering, can we add it generically via some trait or filter object that can be provided?
let start = page * page_size;
let end = start + page_size;
map.values()
.skip(start as usize)
.take(end as usize)
.cloned()
.collect()
}

View file

@ -0,0 +1,176 @@
//! Want schedulability logic
//!
//! Types and methods for determining whether wants are schedulable based on
//! upstream partition states and target partition build status.
use crate::partition_state::{
BuildingPartitionRef, LivePartitionRef, Partition, TaintedPartitionRef,
};
use crate::{PartitionRef, WantDetail};
use serde::{Deserialize, Serialize};
use super::BuildState;
/// The status of partitions required by a want to build (sensed from dep miss job run)
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WantUpstreamStatus {
pub live: Vec<LivePartitionRef>,
pub tainted: Vec<TaintedPartitionRef>,
/// Upstream partitions that are not ready (don't exist, or are in Building/UpstreamBuilding/UpForRetry/Failed/UpstreamFailed states)
pub not_ready: Vec<PartitionRef>,
/// Target partitions that are currently being built by another job
pub building: Vec<BuildingPartitionRef>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WantSchedulability {
pub want: WantDetail,
pub status: WantUpstreamStatus,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WantsSchedulability(pub Vec<WantSchedulability>);
impl WantsSchedulability {
pub fn schedulable_wants(self) -> Vec<WantDetail> {
self.0
.iter()
.filter_map(|ws| match ws.is_schedulable() {
false => None,
true => Some(ws.want.clone()),
})
.collect()
}
}
impl WantSchedulability {
pub fn is_schedulable(&self) -> bool {
// Want is schedulable if:
// - No not-ready upstream dependencies (must all be Live or Tainted)
// - No tainted upstream dependencies
// - No target partitions currently being built by another job
self.status.not_ready.is_empty()
&& self.status.tainted.is_empty()
&& self.status.building.is_empty()
}
}
impl BuildState {
/// Wants are schedulable when their upstreams are ready and target partitions are not already building
pub fn want_schedulability(&self, want: &WantDetail) -> WantSchedulability {
// Check upstream partition statuses (dependencies)
let mut live: Vec<LivePartitionRef> = Vec::new();
let mut tainted: Vec<TaintedPartitionRef> = Vec::new();
let mut not_ready: Vec<PartitionRef> = Vec::new(); // Partitions that don't exist or aren't Live
for upstream_ref in &want.upstreams {
match self.get_canonical_partition(&upstream_ref.r#ref) {
Some(partition) => {
match partition {
Partition::Live(p) => live.push(p.get_ref()),
Partition::Tainted(p) => tainted.push(p.get_ref()),
// All other states (Building, UpstreamBuilding, UpForRetry, Failed, UpstreamFailed) mean upstream is not ready
_ => not_ready.push(upstream_ref.clone()),
}
}
None => {
// Partition doesn't exist yet - it's not ready
not_ready.push(upstream_ref.clone());
}
}
}
// Check target partition statuses (what this want is trying to build)
// If any target partition is already Building, this want should wait
let mut building: Vec<BuildingPartitionRef> = Vec::new();
for target_ref in &want.partitions {
if let Some(partition) = self.get_canonical_partition(&target_ref.r#ref) {
if let Partition::Building(p) = partition {
building.push(p.get_ref());
}
}
}
WantSchedulability {
want: want.clone(),
status: WantUpstreamStatus {
live,
tainted,
not_ready,
building,
},
}
}
pub fn wants_schedulability(&self) -> WantsSchedulability {
WantsSchedulability(
self.wants
.values()
// Use type-safe is_schedulable() - only Idle wants are schedulable
.filter(|w| w.is_schedulable())
.map(|w| self.want_schedulability(&w.to_detail()))
.collect(),
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::want_state::{IdleState as WantIdleState, Want, WantInfo, WantWithState};
use crate::{PartitionDetail, PartitionRef, PartitionStatus, WantStatus};
use std::collections::BTreeMap;
impl WantDetail {
fn with_partitions(self, partitions: Vec<PartitionRef>) -> Self {
Self { partitions, ..self }
}
fn with_upstreams(self, upstreams: Vec<PartitionRef>) -> Self {
Self { upstreams, ..self }
}
fn with_status(self, status: Option<WantStatus>) -> Self {
Self { status, ..self }
}
}
impl PartitionDetail {
fn with_status(self, status: Option<PartitionStatus>) -> Self {
Self { status, ..self }
}
fn with_ref(self, r#ref: Option<PartitionRef>) -> Self {
Self { r#ref, ..self }
}
}
#[test]
fn test_empty_wants_noop() {
assert_eq!(BuildState::default().wants_schedulability().0.len(), 0);
}
// A want with satisfied upstreams (incl "none") should be schedulable
#[test]
fn test_simple_want_with_live_upstream_is_schedulable() {
// Given...
let test_partition = "test_partition";
let state = BuildState::default()
.with_wants(BTreeMap::from([(
"foo".to_string(),
Want::Idle(WantWithState {
want: WantInfo {
partitions: vec![test_partition.into()],
..Default::default()
},
state: WantIdleState {},
}),
)]))
.with_partitions(BTreeMap::from([(
test_partition.to_string(),
PartitionDetail::default().with_ref(Some(test_partition.into())),
)]));
// Should...
let schedulability = state.wants_schedulability();
let ws = schedulability.0.first().unwrap();
assert!(ws.is_schedulable());
}
}

View file

@ -0,0 +1,403 @@
//! Want state transition logic
//!
//! Methods for transitioning wants between states and managing dependencies
//! between wants (derivative wants from dep misses).
use crate::job_run_state::JobRun;
use crate::partition_state::{FailedPartitionRef, LivePartitionRef, Partition};
use crate::want_state::{FailedWantId, SuccessfulWantId, Want};
use crate::PartitionRef;
use super::BuildState;
impl BuildState {
/// Handle creation of a derivative want (created due to job dep miss)
///
/// When a job reports missing dependencies, it returns WantCreateV1 events for those missing partitions.
/// Those events get appended to the BEL and eventually processed by handle_want_create().
///
/// This function is called when we detect a derivative want (has source.job_triggered) and transitions
/// the impacted wants to UpstreamBuilding state, tracking the derivative want ID as an upstream dependency.
///
/// KEY INSIGHT: We must use the actual want_id from the WantCreateV1 event, not synthetic UUIDs generated
/// during event processing. This ensures replay works correctly - the same want IDs are used both during
/// original execution and during replay from the BEL.
pub(crate) fn handle_derivative_want_creation(
&mut self,
derivative_want_id: &str,
derivative_want_partitions: &[PartitionRef],
source_job_run_id: &str,
) {
// Look up the job run that triggered this derivative want
// This job run must be in DepMiss state because it reported missing dependencies
let job_run = self.job_runs.get(source_job_run_id).expect(&format!(
"BUG: Job run {} must exist when derivative want created",
source_job_run_id
));
// Extract the missing deps from the DepMiss job run
let missing_deps = match job_run {
JobRun::DepMiss(dep_miss) => dep_miss.get_missing_deps(),
_ => {
panic!(
"BUG: Job run {} must be in DepMiss state when derivative want created, found {:?}",
source_job_run_id, job_run
);
}
};
// Find which MissingDeps entry corresponds to this derivative want
// The derivative want was created for a specific set of missing partitions,
// and we need to find which downstream partitions are impacted by those missing partitions
for md in missing_deps {
// Check if this derivative want's partitions match the missing partitions in this entry
// We need exact match because one dep miss event can create multiple derivative wants
let partitions_match = md.missing.iter().all(|missing_ref| {
derivative_want_partitions
.iter()
.any(|p| p.r#ref == missing_ref.r#ref)
}) && derivative_want_partitions.len() == md.missing.len();
if partitions_match {
// Now we know which partitions are impacted by this missing dependency
let impacted_partition_refs: Vec<String> =
md.impacted.iter().map(|p| p.r#ref.clone()).collect();
tracing::debug!(
derivative_want_id = %derivative_want_id,
source_job_run_id = %source_job_run_id,
missing_partitions = ?derivative_want_partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
impacted_partitions = ?impacted_partition_refs,
"Processing derivative want creation"
);
// Find all wants that include these impacted partitions
// These are the wants that need to wait for the derivative want to complete
let mut impacted_want_ids: std::collections::HashSet<String> =
std::collections::HashSet::new();
for partition_ref in &impacted_partition_refs {
for want_id in self.get_wants_for_partition(partition_ref) {
impacted_want_ids.insert(want_id.clone());
}
}
// Transition each impacted want to UpstreamBuilding, tracking this derivative want as an upstream
for want_id in impacted_want_ids {
let want = self.wants.remove(&want_id).expect(&format!(
"BUG: Want {} must exist when processing derivative want",
want_id
));
let transitioned = match want {
Want::Building(building) => {
// First dep miss for this want: Building → UpstreamBuilding
tracing::info!(
want_id = %want_id,
derivative_want_id = %derivative_want_id,
"Want: Building → UpstreamBuilding (first missing dep detected)"
);
Want::UpstreamBuilding(
building.detect_missing_deps(vec![derivative_want_id.to_string()]),
)
}
Want::UpstreamBuilding(upstream) => {
// Additional dep miss: UpstreamBuilding → UpstreamBuilding (add another upstream)
// This can happen if multiple jobs report dep misses for different upstreams
tracing::info!(
want_id = %want_id,
derivative_want_id = %derivative_want_id,
"Want: UpstreamBuilding → UpstreamBuilding (additional upstream added)"
);
Want::UpstreamBuilding(
upstream.add_upstreams(vec![derivative_want_id.to_string()]),
)
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} when processing derivative want. Should be Building or UpstreamBuilding.",
want_id, want
);
}
};
self.wants.insert(want_id, transitioned);
}
}
}
}
/// Complete wants when all their partitions become Live
/// Transitions Building → Successful, returns list of newly successful want IDs
pub(crate) fn complete_successful_wants(
&mut self,
newly_live_partitions: &[LivePartitionRef],
job_run_id: &str,
timestamp: u64,
) -> Vec<SuccessfulWantId> {
let mut newly_successful_wants: Vec<SuccessfulWantId> = Vec::new();
for pref in newly_live_partitions {
let want_ids: Vec<String> = self.get_wants_for_partition(&pref.0.r#ref).to_vec();
for want_id in want_ids {
let want = self.wants.remove(&want_id).expect(&format!(
"BUG: Want {} must exist when referenced by partition",
want_id
));
let transitioned = match want {
Want::Building(building) => {
// Check if ALL partitions for this want are now Live
let all_partitions_live = building.want.partitions.iter().all(|p| {
self.get_canonical_partition(&p.r#ref)
.map(|partition| partition.is_live())
.unwrap_or(false)
});
if all_partitions_live {
let successful_want =
building.complete(job_run_id.to_string(), timestamp);
tracing::info!(
want_id = %want_id,
job_run_id = %job_run_id,
"Want: Building → Successful"
);
newly_successful_wants.push(successful_want.get_id());
Want::Successful(successful_want)
} else {
Want::Building(building) // Still building other partitions
}
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} when partition {} became Live. Should be Building.",
want_id, want, pref.0.r#ref
);
}
};
self.wants.insert(want_id.clone(), transitioned);
}
}
newly_successful_wants
}
/// Fail wants when their partitions fail
/// Transitions Building → Failed, and adds to already-failed wants
/// Returns list of newly failed want IDs for downstream cascade
pub(crate) fn fail_directly_affected_wants(
&mut self,
failed_partitions: &[FailedPartitionRef],
) -> Vec<FailedWantId> {
let mut newly_failed_wants: Vec<FailedWantId> = Vec::new();
for pref in failed_partitions {
let want_ids: Vec<String> = self.get_wants_for_partition(&pref.0.r#ref).to_vec();
for want_id in want_ids {
let want = self.wants.remove(&want_id).expect(&format!(
"BUG: Want {} must exist when referenced by partition",
want_id
));
let transitioned = match want {
Want::Building(building) => {
let failed = building
.fail(vec![pref.0.clone()], "Partition build failed".to_string());
newly_failed_wants.push(failed.get_id());
Want::Failed(failed)
}
// Failed → Failed: add new failed partition to existing failed state
Want::Failed(failed) => {
Want::Failed(failed.add_failed_partitions(vec![pref.clone()]))
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} when partition {} failed. Should be Building or Failed.",
want_id, want, pref.0.r#ref
);
}
};
self.wants.insert(want_id.clone(), transitioned);
}
}
newly_failed_wants
}
/// Unblock downstream wants when their upstream dependencies succeed
/// Transitions UpstreamBuilding → Idle (when ready) or Building (when partitions already building)
pub(crate) fn unblock_downstream_wants(
&mut self,
newly_successful_wants: &[SuccessfulWantId],
job_run_id: &str,
timestamp: u64,
) {
tracing::debug!(
newly_successful_wants = ?newly_successful_wants
.iter()
.map(|w| &w.0)
.collect::<Vec<_>>(),
"Checking downstream wants for unblocking"
);
// Find downstream wants that are waiting for any of the newly successful wants
// TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants
let downstream_wants_to_check: Vec<String> = self
.wants
.iter()
.filter_map(|(id, want)| {
match want {
Want::UpstreamBuilding(downstream_want) => {
// Is this downstream want waiting for any of the newly successful wants?
let is_affected =
downstream_want.state.upstream_want_ids.iter().any(|up_id| {
newly_successful_wants.iter().any(|swid| &swid.0 == up_id)
});
if is_affected { Some(id.clone()) } else { None }
}
_ => None,
}
})
.collect();
tracing::debug!(
downstream_wants_to_check = ?downstream_wants_to_check,
"Found downstream wants affected by upstream completion"
);
for want_id in downstream_wants_to_check {
let want = self
.wants
.remove(&want_id)
.expect(&format!("BUG: Want {} must exist", want_id));
let transitioned = match want {
Want::UpstreamBuilding(downstream_want) => {
tracing::debug!(
want_id = %want_id,
upstreams = ?downstream_want.state.upstream_want_ids,
"Checking if all upstreams are satisfied"
);
// Check if ALL of this downstream want's upstream dependencies are now Successful
let all_upstreams_successful = downstream_want
.state
.upstream_want_ids
.iter()
.all(|up_want_id| {
self.wants
.get(up_want_id)
.map(|w| matches!(w, Want::Successful(_)))
.unwrap_or(false)
});
tracing::debug!(
want_id = %want_id,
all_upstreams_successful = %all_upstreams_successful,
"Upstream satisfaction check complete"
);
if all_upstreams_successful {
// Check if any of this want's partitions are still being built
// If a job dep-missed, its partitions transitioned back to Missing
// But other jobs might still be building other partitions for this want
let any_partition_building =
downstream_want.want.partitions.iter().any(|p| {
self.get_canonical_partition(&p.r#ref)
.map(|partition| matches!(partition, Partition::Building(_)))
.unwrap_or(false)
});
tracing::debug!(
want_id = %want_id,
any_partition_building = %any_partition_building,
"Partition building status check"
);
if any_partition_building {
// Some partitions still being built, continue in Building state
tracing::info!(
want_id = %want_id,
job_run_id = %job_run_id,
"Want: UpstreamBuilding → Building (upstreams satisfied, partitions building)"
);
Want::Building(
downstream_want
.continue_building(job_run_id.to_string(), timestamp),
)
} else {
// No partitions being built, become schedulable again
tracing::info!(
want_id = %want_id,
"Want: UpstreamBuilding → Idle (upstreams satisfied, ready to schedule)"
);
Want::Idle(downstream_want.upstreams_satisfied())
}
} else {
// Upstreams not all satisfied yet, stay in UpstreamBuilding
tracing::debug!(
want_id = %want_id,
"Want remains in UpstreamBuilding state (upstreams not yet satisfied)"
);
Want::UpstreamBuilding(downstream_want)
}
}
_ => {
panic!("BUG: Want {} should be UpstreamBuilding here", want_id);
}
};
self.wants.insert(want_id, transitioned);
}
}
/// Cascade failures to downstream wants when their upstream dependencies fail
/// Transitions UpstreamBuilding → UpstreamFailed
pub(crate) fn cascade_failures_to_downstream_wants(
&mut self,
newly_failed_wants: &[FailedWantId],
timestamp: u64,
) {
// Find downstream wants that are waiting for any of the newly failed wants
// TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants
let downstream_wants_to_fail: Vec<String> = self
.wants
.iter()
.filter_map(|(id, want)| {
match want {
Want::UpstreamBuilding(downstream_want) => {
// Is this downstream want waiting for any of the newly failed wants?
let is_affected =
downstream_want.state.upstream_want_ids.iter().any(|up_id| {
newly_failed_wants.iter().any(|fwid| &fwid.0 == up_id)
});
if is_affected { Some(id.clone()) } else { None }
}
_ => None,
}
})
.collect();
for want_id in downstream_wants_to_fail {
let want = self
.wants
.remove(&want_id)
.expect(&format!("BUG: Want {} must exist", want_id));
let transitioned = match want {
Want::UpstreamBuilding(downstream_want) => Want::UpstreamFailed(
downstream_want.upstream_failed(
newly_failed_wants
.iter()
.map(|fwid| fwid.0.clone())
.collect(),
timestamp,
),
),
_ => {
panic!("BUG: Want {} should be UpstreamBuilding here", want_id);
}
};
self.wants.insert(want_id, transitioned);
}
}
}