345 lines
14 KiB
Rust
345 lines
14 KiB
Rust
//! Partition state transition logic
|
|
//!
|
|
//! Methods for transitioning partitions between states (Building, Live, Failed,
|
|
//! UpstreamBuilding, UpForRetry, UpstreamFailed) and managing downstream dependencies.
|
|
|
|
use crate::PartitionRef;
|
|
use crate::partition_state::{
|
|
BuildingPartitionRef, BuildingState, FailedPartitionRef, LivePartitionRef, Partition,
|
|
PartitionWithState,
|
|
};
|
|
use crate::util::current_timestamp;
|
|
use uuid::Uuid;
|
|
|
|
use super::BuildState;
|
|
|
|
impl BuildState {
|
|
/// Create a new partition in Building state and update indexes
|
|
pub(crate) fn create_partition_building(
|
|
&mut self,
|
|
job_run_id: &str,
|
|
partition_ref: PartitionRef,
|
|
) -> Uuid {
|
|
let partition =
|
|
PartitionWithState::<BuildingState>::new(job_run_id.to_string(), partition_ref.clone());
|
|
let uuid = partition.uuid;
|
|
|
|
// Update indexes
|
|
self.partitions_by_uuid
|
|
.insert(uuid, Partition::Building(partition));
|
|
self.canonical_partitions
|
|
.insert(partition_ref.r#ref.clone(), uuid);
|
|
|
|
tracing::info!(
|
|
partition = %partition_ref.r#ref,
|
|
uuid = %uuid,
|
|
job_run_id = %job_run_id,
|
|
"Partition: Created in Building state"
|
|
);
|
|
|
|
uuid
|
|
}
|
|
|
|
/// Create partitions in Building state
|
|
/// Used when a job run starts building partitions.
|
|
/// Note: Partitions no longer have a Missing state - they start directly as Building.
|
|
pub(crate) fn transition_partitions_to_building(
|
|
&mut self,
|
|
partition_refs: &[BuildingPartitionRef],
|
|
job_run_id: &str,
|
|
) {
|
|
for building_ref in partition_refs {
|
|
if let Some(partition) = self.get_canonical_partition(&building_ref.0.r#ref).cloned() {
|
|
// Partition already exists - this is an error unless we're retrying from UpForRetry
|
|
match partition {
|
|
Partition::UpForRetry(_) => {
|
|
// Valid: UpForRetry -> Building (retry after deps satisfied)
|
|
// Old partition stays in partitions_by_uuid as historical record
|
|
// Create new Building partition with fresh UUID
|
|
let uuid =
|
|
self.create_partition_building(job_run_id, building_ref.0.clone());
|
|
tracing::info!(
|
|
partition = %building_ref.0.r#ref,
|
|
job_run_id = %job_run_id,
|
|
uuid = %uuid,
|
|
"Partition: UpForRetry → Building (retry)"
|
|
);
|
|
}
|
|
_ => {
|
|
panic!(
|
|
"BUG: Invalid state - partition {} cannot start building from state {:?}",
|
|
building_ref.0.r#ref, partition
|
|
)
|
|
}
|
|
}
|
|
} else {
|
|
// Partition doesn't exist yet - create directly in Building state
|
|
let uuid = self.create_partition_building(job_run_id, building_ref.0.clone());
|
|
tracing::info!(
|
|
partition = %building_ref.0.r#ref,
|
|
job_run_id = %job_run_id,
|
|
uuid = %uuid,
|
|
"Partition: (new) → Building"
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Transition partitions from Building to Live state
|
|
/// Used when a job run successfully completes
|
|
pub(crate) fn transition_partitions_to_live(
|
|
&mut self,
|
|
partition_refs: &[LivePartitionRef],
|
|
job_run_id: &str,
|
|
timestamp: u64,
|
|
) {
|
|
for pref in partition_refs {
|
|
let partition = self
|
|
.take_canonical_partition(&pref.0.r#ref)
|
|
.expect(&format!(
|
|
"BUG: Partition {} must exist and be in Building state before completion",
|
|
pref.0.r#ref
|
|
));
|
|
|
|
// ONLY valid transition: Building -> Live
|
|
let transitioned = match partition {
|
|
Partition::Building(building) => {
|
|
tracing::info!(
|
|
partition = %pref.0.r#ref,
|
|
job_run_id = %job_run_id,
|
|
"Partition: Building → Live"
|
|
);
|
|
Partition::Live(building.complete(timestamp))
|
|
}
|
|
// All other states are invalid
|
|
_ => {
|
|
panic!(
|
|
"BUG: Invalid state - partition {} must be Building to transition to Live, found {:?}",
|
|
pref.0.r#ref, partition
|
|
)
|
|
}
|
|
};
|
|
self.update_partition(transitioned);
|
|
}
|
|
}
|
|
|
|
/// Transition partitions from Building to Failed state
|
|
/// Used when a job run fails
|
|
pub(crate) fn transition_partitions_to_failed(
|
|
&mut self,
|
|
partition_refs: &[FailedPartitionRef],
|
|
job_run_id: &str,
|
|
timestamp: u64,
|
|
) {
|
|
for pref in partition_refs {
|
|
let partition = self
|
|
.take_canonical_partition(&pref.0.r#ref)
|
|
.expect(&format!(
|
|
"BUG: Partition {} must exist and be in Building state before failure",
|
|
pref.0.r#ref
|
|
));
|
|
|
|
// ONLY valid transition: Building -> Failed
|
|
let transitioned = match partition {
|
|
Partition::Building(building) => {
|
|
tracing::info!(
|
|
partition = %pref.0.r#ref,
|
|
job_run_id = %job_run_id,
|
|
"Partition: Building → Failed"
|
|
);
|
|
Partition::Failed(building.fail(timestamp))
|
|
}
|
|
// All other states are invalid
|
|
_ => {
|
|
panic!(
|
|
"BUG: Invalid state - partition {} must be Building to transition to Failed, found {:?}",
|
|
pref.0.r#ref, partition
|
|
)
|
|
}
|
|
};
|
|
self.update_partition(transitioned);
|
|
}
|
|
}
|
|
|
|
/// Transition partitions from Building to UpstreamBuilding state
|
|
/// Used when a job run encounters missing dependencies and cannot proceed.
|
|
/// The partition waits for its upstream deps to be built before becoming UpForRetry.
|
|
pub(crate) fn transition_partitions_to_upstream_building(
|
|
&mut self,
|
|
partition_refs: &[BuildingPartitionRef],
|
|
missing_deps: Vec<PartitionRef>,
|
|
) {
|
|
for building_ref in partition_refs {
|
|
let partition = self
|
|
.take_canonical_partition(&building_ref.0.r#ref)
|
|
.expect(&format!(
|
|
"BUG: Partition {} must exist and be in Building state during dep_miss",
|
|
building_ref.0.r#ref
|
|
));
|
|
|
|
// Only valid transition: Building -> UpstreamBuilding
|
|
let transitioned = match partition {
|
|
Partition::Building(building) => {
|
|
let partition_uuid = building.uuid;
|
|
tracing::info!(
|
|
partition = %building_ref.0.r#ref,
|
|
uuid = %partition_uuid,
|
|
missing_deps = ?missing_deps.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
|
|
"Partition: Building → UpstreamBuilding (dep miss)"
|
|
);
|
|
|
|
// Update downstream_waiting index: for each missing dep, record that this partition is waiting
|
|
for missing_dep in &missing_deps {
|
|
self.downstream_waiting
|
|
.entry(missing_dep.r#ref.clone())
|
|
.or_default()
|
|
.push(partition_uuid);
|
|
}
|
|
|
|
Partition::UpstreamBuilding(building.dep_miss(missing_deps.clone()))
|
|
}
|
|
// All other states are invalid
|
|
_ => {
|
|
panic!(
|
|
"BUG: Invalid state - partition {} must be Building during dep_miss, found {:?}",
|
|
building_ref.0.r#ref, partition
|
|
)
|
|
}
|
|
};
|
|
self.update_partition(transitioned);
|
|
}
|
|
}
|
|
|
|
/// Transition partitions from UpstreamBuilding to UpForRetry when their upstream deps become Live.
|
|
/// This should be called when partitions become Live to check if any downstream partitions can now retry.
|
|
/// Uses the `downstream_waiting` index for O(1) lookup of affected partitions.
|
|
pub(crate) fn unblock_downstream_partitions(
|
|
&mut self,
|
|
newly_live_partition_refs: &[LivePartitionRef],
|
|
) {
|
|
// Collect UUIDs of partitions that might be unblocked using the inverted index
|
|
let mut uuids_to_check: Vec<Uuid> = Vec::new();
|
|
for live_ref in newly_live_partition_refs {
|
|
if let Some(waiting_uuids) = self.downstream_waiting.get(&live_ref.0.r#ref) {
|
|
uuids_to_check.extend(waiting_uuids.iter().cloned());
|
|
}
|
|
}
|
|
|
|
// Deduplicate UUIDs (a partition might be waiting for multiple deps that all became live)
|
|
uuids_to_check.sort();
|
|
uuids_to_check.dedup();
|
|
|
|
for uuid in uuids_to_check {
|
|
// Get partition by UUID - it might have been transitioned already or no longer exist
|
|
let Some(partition) = self.partitions_by_uuid.get(&uuid).cloned() else {
|
|
continue;
|
|
};
|
|
|
|
let partition_ref = partition.partition_ref().r#ref.clone();
|
|
|
|
// Only process UpstreamBuilding partitions
|
|
if let Partition::UpstreamBuilding(mut upstream_building) = partition {
|
|
// Remove satisfied deps from missing_deps
|
|
for live_ref in newly_live_partition_refs {
|
|
upstream_building
|
|
.state
|
|
.missing_deps
|
|
.retain(|d| d.r#ref != live_ref.0.r#ref);
|
|
// Also remove from downstream_waiting index
|
|
if let Some(waiting) = self.downstream_waiting.get_mut(&live_ref.0.r#ref) {
|
|
waiting.retain(|u| *u != uuid);
|
|
}
|
|
}
|
|
|
|
let transitioned = if upstream_building.state.missing_deps.is_empty() {
|
|
// All deps satisfied, transition to UpForRetry
|
|
tracing::info!(
|
|
partition = %partition_ref,
|
|
uuid = %uuid,
|
|
"Partition: UpstreamBuilding → UpForRetry (all upstreams satisfied)"
|
|
);
|
|
Partition::UpForRetry(upstream_building.upstreams_satisfied())
|
|
} else {
|
|
// Still waiting for more deps
|
|
tracing::debug!(
|
|
partition = %partition_ref,
|
|
uuid = %uuid,
|
|
remaining_deps = ?upstream_building.state.missing_deps.iter().map(|d| &d.r#ref).collect::<Vec<_>>(),
|
|
"Partition remains in UpstreamBuilding (still waiting for deps)"
|
|
);
|
|
Partition::UpstreamBuilding(upstream_building)
|
|
};
|
|
|
|
self.update_partition(transitioned);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Cascade failures to downstream partitions when their upstream dependencies fail.
|
|
/// Transitions UpstreamBuilding → UpstreamFailed for partitions waiting on failed upstreams.
|
|
/// Uses the `downstream_waiting` index for O(1) lookup of affected partitions.
|
|
pub(crate) fn cascade_failures_to_downstream_partitions(
|
|
&mut self,
|
|
failed_partition_refs: &[FailedPartitionRef],
|
|
) {
|
|
// Collect UUIDs of partitions that are waiting for the failed partitions
|
|
let mut uuids_to_fail: Vec<Uuid> = Vec::new();
|
|
for failed_ref in failed_partition_refs {
|
|
if let Some(waiting_uuids) = self.downstream_waiting.get(&failed_ref.0.r#ref) {
|
|
uuids_to_fail.extend(waiting_uuids.iter().cloned());
|
|
}
|
|
}
|
|
|
|
// Deduplicate UUIDs
|
|
uuids_to_fail.sort();
|
|
uuids_to_fail.dedup();
|
|
|
|
for uuid in uuids_to_fail {
|
|
// Get partition by UUID
|
|
let Some(partition) = self.partitions_by_uuid.get(&uuid).cloned() else {
|
|
continue;
|
|
};
|
|
|
|
let partition_ref = partition.partition_ref().r#ref.clone();
|
|
|
|
// Only process UpstreamBuilding partitions
|
|
if let Partition::UpstreamBuilding(upstream_building) = partition {
|
|
// Collect which upstream refs failed
|
|
let failed_upstream_refs: Vec<PartitionRef> = failed_partition_refs
|
|
.iter()
|
|
.filter(|f| {
|
|
upstream_building
|
|
.state
|
|
.missing_deps
|
|
.iter()
|
|
.any(|d| d.r#ref == f.0.r#ref)
|
|
})
|
|
.map(|f| f.0.clone())
|
|
.collect();
|
|
|
|
if !failed_upstream_refs.is_empty() {
|
|
tracing::info!(
|
|
partition = %partition_ref,
|
|
uuid = %uuid,
|
|
failed_upstreams = ?failed_upstream_refs.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
|
|
"Partition: UpstreamBuilding → UpstreamFailed (upstream failed)"
|
|
);
|
|
|
|
// Remove from downstream_waiting index for all deps
|
|
for dep in &upstream_building.state.missing_deps {
|
|
if let Some(waiting) = self.downstream_waiting.get_mut(&dep.r#ref) {
|
|
waiting.retain(|u| *u != uuid);
|
|
}
|
|
}
|
|
|
|
// Transition to UpstreamFailed
|
|
let transitioned = Partition::UpstreamFailed(
|
|
upstream_building
|
|
.upstream_failed(failed_upstream_refs, current_timestamp()),
|
|
);
|
|
self.update_partition(transitioned);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|