factor out specific state transition helpers in build_state.rs
This commit is contained in:
parent
c8e2b4fdaf
commit
01d50dde1b
1 changed files with 483 additions and 367 deletions
|
|
@ -80,6 +80,474 @@ impl BuildState {
|
|||
}
|
||||
}
|
||||
|
||||
/// Transition partitions from Missing to Building state
|
||||
/// Used when a job run starts building partitions
|
||||
fn transition_partitions_to_building(
|
||||
&mut self,
|
||||
partition_refs: &[PartitionRef],
|
||||
job_run_id: &str,
|
||||
) {
|
||||
for pref in partition_refs {
|
||||
if let Some(partition) = self.partitions.remove(&pref.r#ref) {
|
||||
// Partition exists - transition based on current state
|
||||
let transitioned = match partition {
|
||||
// Valid: Missing -> Building
|
||||
Partition::Missing(missing) => {
|
||||
Partition::Building(missing.start_building(job_run_id.to_string()))
|
||||
}
|
||||
// Invalid state: partition should not already be Building, Live, Failed, or Tainted
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Invalid state - partition {} cannot start building from state {:?}",
|
||||
pref.r#ref, partition
|
||||
)
|
||||
}
|
||||
};
|
||||
self.partitions.insert(pref.r#ref.clone(), transitioned);
|
||||
} else {
|
||||
// Partition doesn't exist yet - create in Missing then transition to Building
|
||||
let missing = Partition::new_missing(pref.clone());
|
||||
if let Partition::Missing(m) = missing {
|
||||
let building = m.start_building(job_run_id.to_string());
|
||||
self.partitions
|
||||
.insert(pref.r#ref.clone(), Partition::Building(building));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition partitions from Building to Live state
|
||||
/// Used when a job run successfully completes
|
||||
fn transition_partitions_to_live(
|
||||
&mut self,
|
||||
partition_refs: &[PartitionRef],
|
||||
job_run_id: &str,
|
||||
timestamp: u64,
|
||||
) {
|
||||
for pref in partition_refs {
|
||||
let partition = self.partitions.remove(&pref.r#ref).expect(&format!(
|
||||
"BUG: Partition {} must exist and be in Building state before completion",
|
||||
pref.r#ref
|
||||
));
|
||||
|
||||
// ONLY valid transition: Building -> Live
|
||||
let transitioned = match partition {
|
||||
Partition::Building(building) => {
|
||||
Partition::Live(building.complete(job_run_id.to_string(), timestamp))
|
||||
}
|
||||
// All other states are invalid
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Invalid state - partition {} must be Building to transition to Live, found {:?}",
|
||||
pref.r#ref, partition
|
||||
)
|
||||
}
|
||||
};
|
||||
self.partitions.insert(pref.r#ref.clone(), transitioned);
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition partitions from Building to Failed state
|
||||
/// Used when a job run fails
|
||||
fn transition_partitions_to_failed(
|
||||
&mut self,
|
||||
partition_refs: &[PartitionRef],
|
||||
job_run_id: &str,
|
||||
timestamp: u64,
|
||||
) {
|
||||
for pref in partition_refs {
|
||||
let partition = self.partitions.remove(&pref.r#ref).expect(&format!(
|
||||
"BUG: Partition {} must exist and be in Building state before failure",
|
||||
pref.r#ref
|
||||
));
|
||||
|
||||
// ONLY valid transition: Building -> Failed
|
||||
let transitioned = match partition {
|
||||
Partition::Building(building) => {
|
||||
Partition::Failed(building.fail(job_run_id.to_string(), timestamp))
|
||||
}
|
||||
// All other states are invalid
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Invalid state - partition {} must be Building to transition to Failed, found {:?}",
|
||||
pref.r#ref, partition
|
||||
)
|
||||
}
|
||||
};
|
||||
self.partitions.insert(pref.r#ref.clone(), transitioned);
|
||||
}
|
||||
}
|
||||
|
||||
/// Reset partitions from Building back to Missing state
|
||||
/// Used when a job run encounters missing dependencies and cannot proceed
|
||||
fn reset_partitions_to_missing(
|
||||
&mut self,
|
||||
partition_refs: &[PartitionRef],
|
||||
) {
|
||||
for pref in partition_refs {
|
||||
let partition = self.partitions.remove(&pref.r#ref).expect(&format!(
|
||||
"BUG: Partition {} must exist and be in Building state during dep_miss",
|
||||
pref.r#ref
|
||||
));
|
||||
|
||||
// Only valid transition: Building -> Missing
|
||||
let transitioned = match partition {
|
||||
Partition::Building(building) => Partition::Missing(building.reset_to_missing()),
|
||||
// All other states are invalid
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Invalid state - partition {} must be Building during dep_miss, found {:?}",
|
||||
pref.r#ref, partition
|
||||
)
|
||||
}
|
||||
};
|
||||
self.partitions.insert(pref.r#ref.clone(), transitioned);
|
||||
}
|
||||
}
|
||||
|
||||
/// Complete wants when all their partitions become Live
|
||||
/// Transitions Building → Successful, returns list of newly successful want IDs
|
||||
fn complete_successful_wants(
|
||||
&mut self,
|
||||
newly_live_partitions: &[PartitionRef],
|
||||
job_run_id: &str,
|
||||
timestamp: u64,
|
||||
) -> Vec<String> {
|
||||
let mut newly_successful_wants: Vec<String> = Vec::new();
|
||||
|
||||
for pref in newly_live_partitions {
|
||||
let want_ids = self
|
||||
.partitions
|
||||
.get(&pref.r#ref)
|
||||
.map(|p| p.want_ids().clone())
|
||||
.unwrap_or_default();
|
||||
|
||||
for want_id in want_ids {
|
||||
let want = self.wants.remove(&want_id).expect(&format!(
|
||||
"BUG: Want {} must exist when referenced by partition",
|
||||
want_id
|
||||
));
|
||||
|
||||
let transitioned = match want {
|
||||
Want::Building(building) => {
|
||||
// Check if ALL partitions for this want are now Live
|
||||
let all_partitions_live = building.want.partitions.iter().all(|p| {
|
||||
self.partitions
|
||||
.get(&p.r#ref)
|
||||
.map(|partition| partition.is_live())
|
||||
.unwrap_or(false)
|
||||
});
|
||||
|
||||
if all_partitions_live {
|
||||
newly_successful_wants.push(want_id.clone());
|
||||
Want::Successful(
|
||||
building.complete(job_run_id.to_string(), timestamp),
|
||||
)
|
||||
} else {
|
||||
Want::Building(building) // Still building other partitions
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Want {} in invalid state {:?} when partition {} became Live. Should be Building.",
|
||||
want_id, want, pref.r#ref
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
self.wants.insert(want_id.clone(), transitioned);
|
||||
}
|
||||
}
|
||||
|
||||
newly_successful_wants
|
||||
}
|
||||
|
||||
/// Fail wants when their partitions fail
|
||||
/// Transitions Building → Failed, and adds to already-failed wants
|
||||
/// Returns list of newly failed want IDs for downstream cascade
|
||||
fn fail_directly_affected_wants(
|
||||
&mut self,
|
||||
failed_partitions: &[PartitionRef],
|
||||
) -> Vec<String> {
|
||||
let mut newly_failed_wants: Vec<String> = Vec::new();
|
||||
|
||||
for pref in failed_partitions {
|
||||
let want_ids = self
|
||||
.partitions
|
||||
.get(&pref.r#ref)
|
||||
.map(|p| p.want_ids().clone())
|
||||
.unwrap_or_default();
|
||||
|
||||
for want_id in want_ids {
|
||||
let want = self.wants.remove(&want_id).expect(&format!(
|
||||
"BUG: Want {} must exist when referenced by partition",
|
||||
want_id
|
||||
));
|
||||
|
||||
let transitioned = match want {
|
||||
Want::Building(building) => {
|
||||
newly_failed_wants.push(want_id.clone());
|
||||
Want::Failed(
|
||||
building.fail(vec![pref.clone()], "Partition build failed".to_string()),
|
||||
)
|
||||
}
|
||||
// Failed → Failed: add new failed partition to existing failed state
|
||||
Want::Failed(failed) => {
|
||||
Want::Failed(failed.add_failed_partitions(vec![pref.clone()]))
|
||||
}
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Want {} in invalid state {:?} when partition {} failed. Should be Building or Failed.",
|
||||
want_id, want, pref.r#ref
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
self.wants.insert(want_id.clone(), transitioned);
|
||||
}
|
||||
}
|
||||
|
||||
newly_failed_wants
|
||||
}
|
||||
|
||||
/// Unblock downstream wants when their upstream dependencies succeed
|
||||
/// Transitions UpstreamBuilding → Idle (when ready) or Building (when partitions already building)
|
||||
fn unblock_downstream_wants(
|
||||
&mut self,
|
||||
newly_successful_wants: &[String],
|
||||
job_run_id: &str,
|
||||
timestamp: u64,
|
||||
) {
|
||||
// Find downstream wants that are waiting for any of the newly successful wants
|
||||
// TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants
|
||||
let downstream_wants_to_check: Vec<String> = self
|
||||
.wants
|
||||
.iter()
|
||||
.filter_map(|(id, want)| {
|
||||
match want {
|
||||
Want::UpstreamBuilding(downstream_want) => {
|
||||
// Is this downstream want waiting for any of the newly successful wants?
|
||||
let is_affected = downstream_want
|
||||
.state
|
||||
.upstream_want_ids
|
||||
.iter()
|
||||
.any(|up_id| newly_successful_wants.contains(up_id));
|
||||
if is_affected { Some(id.clone()) } else { None }
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
for want_id in downstream_wants_to_check {
|
||||
let want = self
|
||||
.wants
|
||||
.remove(&want_id)
|
||||
.expect(&format!("BUG: Want {} must exist", want_id));
|
||||
|
||||
let transitioned = match want {
|
||||
Want::UpstreamBuilding(downstream_want) => {
|
||||
// Check if ALL of this downstream want's upstream dependencies are now Successful
|
||||
let all_upstreams_successful = downstream_want
|
||||
.state
|
||||
.upstream_want_ids
|
||||
.iter()
|
||||
.all(|up_want_id| {
|
||||
self.wants
|
||||
.get(up_want_id)
|
||||
.map(|w| matches!(w, Want::Successful(_)))
|
||||
.unwrap_or(false)
|
||||
});
|
||||
|
||||
if all_upstreams_successful {
|
||||
// Check if any of this want's partitions are still being built
|
||||
// If a job dep-missed, its partitions transitioned back to Missing
|
||||
// But other jobs might still be building other partitions for this want
|
||||
let any_partition_building =
|
||||
downstream_want.want.partitions.iter().any(|p| {
|
||||
self.partitions
|
||||
.get(&p.r#ref)
|
||||
.map(|partition| matches!(partition, Partition::Building(_)))
|
||||
.unwrap_or(false)
|
||||
});
|
||||
|
||||
if any_partition_building {
|
||||
// Some partitions still being built, continue in Building state
|
||||
Want::Building(
|
||||
downstream_want.continue_building(
|
||||
job_run_id.to_string(),
|
||||
timestamp,
|
||||
),
|
||||
)
|
||||
} else {
|
||||
// No partitions being built, become schedulable again
|
||||
Want::Idle(downstream_want.upstreams_satisfied())
|
||||
}
|
||||
} else {
|
||||
// Upstreams not all satisfied yet, stay in UpstreamBuilding
|
||||
Want::UpstreamBuilding(downstream_want)
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
panic!("BUG: Want {} should be UpstreamBuilding here", want_id);
|
||||
}
|
||||
};
|
||||
|
||||
self.wants.insert(want_id, transitioned);
|
||||
}
|
||||
}
|
||||
|
||||
/// Cascade failures to downstream wants when their upstream dependencies fail
|
||||
/// Transitions UpstreamBuilding → UpstreamFailed
|
||||
fn cascade_failures_to_downstream_wants(
|
||||
&mut self,
|
||||
newly_failed_wants: &[String],
|
||||
timestamp: u64,
|
||||
) {
|
||||
// Find downstream wants that are waiting for any of the newly failed wants
|
||||
// TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants
|
||||
let downstream_wants_to_fail: Vec<String> = self
|
||||
.wants
|
||||
.iter()
|
||||
.filter_map(|(id, want)| {
|
||||
match want {
|
||||
Want::UpstreamBuilding(downstream_want) => {
|
||||
// Is this downstream want waiting for any of the newly failed wants?
|
||||
let is_affected = downstream_want
|
||||
.state
|
||||
.upstream_want_ids
|
||||
.iter()
|
||||
.any(|up_id| newly_failed_wants.contains(up_id));
|
||||
if is_affected { Some(id.clone()) } else { None }
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
for want_id in downstream_wants_to_fail {
|
||||
let want = self
|
||||
.wants
|
||||
.remove(&want_id)
|
||||
.expect(&format!("BUG: Want {} must exist", want_id));
|
||||
|
||||
let transitioned = match want {
|
||||
Want::UpstreamBuilding(downstream_want) => Want::UpstreamFailed(
|
||||
downstream_want
|
||||
.upstream_failed(newly_failed_wants.to_vec(), timestamp),
|
||||
),
|
||||
_ => {
|
||||
panic!("BUG: Want {} should be UpstreamBuilding here", want_id);
|
||||
}
|
||||
};
|
||||
|
||||
self.wants.insert(want_id, transitioned);
|
||||
}
|
||||
}
|
||||
|
||||
/// Build a mapping from partition references to the want IDs that will build them
|
||||
/// Used to track which upstream wants a downstream want depends on after a dep miss
|
||||
fn build_partition_to_want_mapping(
|
||||
&self,
|
||||
want_events: &[Event],
|
||||
) -> std::collections::HashMap<String, String> {
|
||||
let mut partition_to_want_map: std::collections::HashMap<String, String> =
|
||||
std::collections::HashMap::new();
|
||||
|
||||
for event_item in want_events {
|
||||
if let Event::WantCreateV1(want_create) = event_item {
|
||||
for pref in &want_create.partitions {
|
||||
partition_to_want_map.insert(pref.r#ref.clone(), want_create.want_id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
partition_to_want_map
|
||||
}
|
||||
|
||||
/// Collect upstream want IDs that a servicing want now depends on based on dep misses
|
||||
/// Returns a deduplicated, sorted list of upstream want IDs
|
||||
fn collect_upstream_want_ids(
|
||||
&self,
|
||||
servicing_want: &crate::WantAttributedPartitions,
|
||||
missing_deps: &[crate::MissingDeps],
|
||||
partition_to_want_map: &std::collections::HashMap<String, String>,
|
||||
) -> Vec<String> {
|
||||
let mut new_upstream_want_ids = Vec::new();
|
||||
|
||||
for missing_dep in missing_deps {
|
||||
// Only process if this want contains an impacted partition
|
||||
let is_impacted = missing_dep.impacted.iter().any(|imp| {
|
||||
servicing_want
|
||||
.partitions
|
||||
.iter()
|
||||
.any(|p| p.r#ref == imp.r#ref)
|
||||
});
|
||||
|
||||
if is_impacted {
|
||||
// For each missing partition, find the want ID that will build it
|
||||
for missing_partition in &missing_dep.missing {
|
||||
if let Some(want_id) = partition_to_want_map.get(&missing_partition.r#ref) {
|
||||
new_upstream_want_ids.push(want_id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Dedupe upstream want IDs (one job might report same dep multiple times)
|
||||
new_upstream_want_ids.sort();
|
||||
new_upstream_want_ids.dedup();
|
||||
|
||||
new_upstream_want_ids
|
||||
}
|
||||
|
||||
/// Transition wants to UpstreamBuilding when they have missing dependencies
|
||||
/// Handles Building → UpstreamBuilding and UpstreamBuilding → UpstreamBuilding (add upstreams)
|
||||
fn transition_wants_to_upstream_building(
|
||||
&mut self,
|
||||
servicing_wants: &[crate::WantAttributedPartitions],
|
||||
missing_deps: &[crate::MissingDeps],
|
||||
partition_to_want_map: &std::collections::HashMap<String, String>,
|
||||
) {
|
||||
// For each want serviced by this job run, check if it was impacted by missing deps
|
||||
for servicing_want in servicing_wants {
|
||||
let want = self.wants.remove(&servicing_want.want_id).expect(&format!(
|
||||
"BUG: Want {} must exist when serviced by job run",
|
||||
servicing_want.want_id
|
||||
));
|
||||
|
||||
// Collect the upstream want IDs that this want now depends on
|
||||
let new_upstream_want_ids =
|
||||
self.collect_upstream_want_ids(servicing_want, missing_deps, partition_to_want_map);
|
||||
|
||||
let transitioned = if !new_upstream_want_ids.is_empty() {
|
||||
match want {
|
||||
Want::Building(building) => {
|
||||
// First dep miss for this want: Building → UpstreamBuilding
|
||||
Want::UpstreamBuilding(building.detect_missing_deps(new_upstream_want_ids))
|
||||
}
|
||||
Want::UpstreamBuilding(upstream) => {
|
||||
// Already in UpstreamBuilding, add more upstreams (self-transition)
|
||||
// This can happen if multiple jobs report dep misses, or one job reports multiple dep misses
|
||||
Want::UpstreamBuilding(upstream.add_upstreams(new_upstream_want_ids))
|
||||
}
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Want {} in invalid state {:?} when job run had dep miss. Should be Building or UpstreamBuilding.",
|
||||
servicing_want.want_id, want
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No new upstreams for this want (it wasn't impacted), keep current state
|
||||
want
|
||||
};
|
||||
|
||||
self.wants
|
||||
.insert(servicing_want.want_id.clone(), transitioned);
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles reacting to events, updating state, and erroring if its an invalid state transition
|
||||
/// Event handlers can return vecs of events that will then be appended to the BEL
|
||||
pub fn handle_event(&mut self, event: &Event) -> Result<Vec<Event>, DatabuildError> {
|
||||
|
|
@ -203,33 +671,7 @@ impl BuildState {
|
|||
}
|
||||
|
||||
// Transition partitions to Building state
|
||||
for pref in &job_run.building_partitions {
|
||||
if let Some(partition) = self.partitions.remove(&pref.r#ref) {
|
||||
// Partition exists - transition based on current state
|
||||
let transitioned =
|
||||
match partition {
|
||||
// Valid: Missing -> Building
|
||||
Partition::Missing(missing) => Partition::Building(
|
||||
missing.start_building(event.job_run_id.clone()),
|
||||
),
|
||||
// Invalid state: partition should not already be Building, Live, Failed, or Tainted
|
||||
_ => return Err(format!(
|
||||
"Invalid state: partition {} cannot start building from state {:?}",
|
||||
pref.r#ref, partition
|
||||
)
|
||||
.into()),
|
||||
};
|
||||
self.partitions.insert(pref.r#ref.clone(), transitioned);
|
||||
} else {
|
||||
// Partition doesn't exist yet - create in Missing then transition to Building
|
||||
let missing = Partition::new_missing(pref.clone());
|
||||
if let Partition::Missing(m) = missing {
|
||||
let building = m.start_building(event.job_run_id.clone());
|
||||
self.partitions
|
||||
.insert(pref.r#ref.clone(), Partition::Building(building));
|
||||
}
|
||||
}
|
||||
}
|
||||
self.transition_partitions_to_building(&job_run.building_partitions, &event.job_run_id);
|
||||
|
||||
self.job_runs
|
||||
.insert(event.job_run_id.clone(), job_run.clone());
|
||||
|
|
@ -272,158 +714,13 @@ impl BuildState {
|
|||
let newly_live_partitions: Vec<PartitionRef> = job_run.building_partitions.clone();
|
||||
|
||||
// Update partitions being built by this job (strict type-safe transitions)
|
||||
for pref in &newly_live_partitions {
|
||||
let partition = self.partitions.remove(&pref.r#ref).ok_or_else(|| {
|
||||
format!(
|
||||
"Partition {} must exist and be in Building state before completion",
|
||||
pref.r#ref
|
||||
)
|
||||
})?;
|
||||
|
||||
// ONLY valid transition: Building -> Live
|
||||
let transitioned = match partition {
|
||||
Partition::Building(building) => {
|
||||
Partition::Live(building.complete(
|
||||
event.job_run_id.clone(),
|
||||
current_timestamp()
|
||||
))
|
||||
}
|
||||
// All other states are invalid
|
||||
_ => {
|
||||
return Err(format!(
|
||||
"Invalid state: partition {} must be Building to transition to Live, found {:?}",
|
||||
pref.r#ref, partition
|
||||
).into())
|
||||
}
|
||||
};
|
||||
self.partitions.insert(pref.r#ref.clone(), transitioned);
|
||||
}
|
||||
self.transition_partitions_to_live(&newly_live_partitions, &event.job_run_id, current_timestamp());
|
||||
|
||||
// Building → Successful (when all partitions Live)
|
||||
let mut newly_successful_wants: Vec<String> = Vec::new();
|
||||
|
||||
for pref in &newly_live_partitions {
|
||||
let want_ids = self
|
||||
.partitions
|
||||
.get(&pref.r#ref)
|
||||
.map(|p| p.want_ids().clone())
|
||||
.unwrap_or_default();
|
||||
|
||||
for want_id in want_ids {
|
||||
let want = self.wants.remove(&want_id).expect(&format!(
|
||||
"BUG: Want {} must exist when referenced by partition",
|
||||
want_id
|
||||
));
|
||||
|
||||
let transitioned = match want {
|
||||
Want::Building(building) => {
|
||||
// Check if ALL partitions for this want are now Live
|
||||
let all_partitions_live = building.want.partitions.iter().all(|p| {
|
||||
self.partitions
|
||||
.get(&p.r#ref)
|
||||
.map(|partition| partition.is_live())
|
||||
.unwrap_or(false)
|
||||
});
|
||||
|
||||
if all_partitions_live {
|
||||
newly_successful_wants.push(want_id.clone());
|
||||
Want::Successful(
|
||||
building.complete(event.job_run_id.clone(), current_timestamp()),
|
||||
)
|
||||
} else {
|
||||
Want::Building(building) // Still building other partitions
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Want {} in invalid state {:?} when partition {} became Live. Should be Building.",
|
||||
want_id, want, pref.r#ref
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
self.wants.insert(want_id.clone(), transitioned);
|
||||
}
|
||||
}
|
||||
let newly_successful_wants = self.complete_successful_wants(&newly_live_partitions, &event.job_run_id, current_timestamp());
|
||||
|
||||
// UpstreamBuilding → Idle/Building (for downstream wants waiting on newly successful wants)
|
||||
// Only check wants that are waiting for the newly successful wants
|
||||
// TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants
|
||||
let downstream_wants_to_check: Vec<String> = self
|
||||
.wants
|
||||
.iter()
|
||||
.filter_map(|(id, want)| {
|
||||
match want {
|
||||
Want::UpstreamBuilding(downstream_want) => {
|
||||
// Is this downstream want waiting for any of the newly successful wants?
|
||||
let is_affected = downstream_want
|
||||
.state
|
||||
.upstream_want_ids
|
||||
.iter()
|
||||
.any(|up_id| newly_successful_wants.contains(up_id));
|
||||
if is_affected { Some(id.clone()) } else { None }
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
for want_id in downstream_wants_to_check {
|
||||
let want = self
|
||||
.wants
|
||||
.remove(&want_id)
|
||||
.expect(&format!("BUG: Want {} must exist", want_id));
|
||||
|
||||
let transitioned = match want {
|
||||
Want::UpstreamBuilding(downstream_want) => {
|
||||
// Check if ALL of this downstream want's upstream dependencies are now Successful
|
||||
let all_upstreams_successful = downstream_want
|
||||
.state
|
||||
.upstream_want_ids
|
||||
.iter()
|
||||
.all(|up_want_id| {
|
||||
self.wants
|
||||
.get(up_want_id)
|
||||
.map(|w| matches!(w, Want::Successful(_)))
|
||||
.unwrap_or(false)
|
||||
});
|
||||
|
||||
if all_upstreams_successful {
|
||||
// Check if any of this want's partitions are still being built
|
||||
// If a job dep-missed, its partitions transitioned back to Missing
|
||||
// But other jobs might still be building other partitions for this want
|
||||
let any_partition_building =
|
||||
downstream_want.want.partitions.iter().any(|p| {
|
||||
self.partitions
|
||||
.get(&p.r#ref)
|
||||
.map(|partition| matches!(partition, Partition::Building(_)))
|
||||
.unwrap_or(false)
|
||||
});
|
||||
|
||||
if any_partition_building {
|
||||
// Some partitions still being built, continue in Building state
|
||||
Want::Building(
|
||||
downstream_want.continue_building(
|
||||
event.job_run_id.clone(),
|
||||
current_timestamp(),
|
||||
),
|
||||
)
|
||||
} else {
|
||||
// No partitions being built, become schedulable again
|
||||
Want::Idle(downstream_want.upstreams_satisfied())
|
||||
}
|
||||
} else {
|
||||
// Upstreams not all satisfied yet, stay in UpstreamBuilding
|
||||
Want::UpstreamBuilding(downstream_want)
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
panic!("BUG: Want {} should be UpstreamBuilding here", want_id);
|
||||
}
|
||||
};
|
||||
|
||||
self.wants.insert(want_id, transitioned);
|
||||
}
|
||||
self.unblock_downstream_wants(&newly_successful_wants, &event.job_run_id, current_timestamp());
|
||||
|
||||
Ok(vec![])
|
||||
}
|
||||
|
|
@ -439,111 +736,13 @@ impl BuildState {
|
|||
let failed_partitions: Vec<PartitionRef> = job_run.building_partitions.clone();
|
||||
|
||||
// Transition partitions using strict type-safe methods
|
||||
for pref in &failed_partitions {
|
||||
let partition = self.partitions.remove(&pref.r#ref).ok_or_else(|| {
|
||||
format!(
|
||||
"Partition {} must exist and be in Building state before failure",
|
||||
pref.r#ref
|
||||
)
|
||||
})?;
|
||||
|
||||
// ONLY valid transition: Building -> Failed
|
||||
let transitioned = match partition {
|
||||
Partition::Building(building) => {
|
||||
Partition::Failed(building.fail(
|
||||
event.job_run_id.clone(),
|
||||
current_timestamp()
|
||||
))
|
||||
}
|
||||
// All other states are invalid
|
||||
_ => {
|
||||
return Err(format!(
|
||||
"Invalid state: partition {} must be Building to transition to Failed, found {:?}",
|
||||
pref.r#ref, partition
|
||||
).into())
|
||||
}
|
||||
};
|
||||
self.partitions.insert(pref.r#ref.clone(), transitioned);
|
||||
}
|
||||
self.transition_partitions_to_failed(&failed_partitions, &event.job_run_id, current_timestamp());
|
||||
|
||||
// Building → Failed (for wants directly building failed partitions)
|
||||
let mut newly_failed_wants: Vec<String> = Vec::new();
|
||||
|
||||
for pref in &failed_partitions {
|
||||
let want_ids = self
|
||||
.partitions
|
||||
.get(&pref.r#ref)
|
||||
.map(|p| p.want_ids().clone())
|
||||
.unwrap_or_default();
|
||||
|
||||
for want_id in want_ids {
|
||||
let want = self.wants.remove(&want_id).expect(&format!(
|
||||
"BUG: Want {} must exist when referenced by partition",
|
||||
want_id
|
||||
));
|
||||
|
||||
let transitioned = match want {
|
||||
Want::Building(building) => {
|
||||
newly_failed_wants.push(want_id.clone());
|
||||
Want::Failed(
|
||||
building.fail(vec![pref.clone()], "Partition build failed".to_string()),
|
||||
)
|
||||
}
|
||||
// Failed → Failed: add new failed partition to existing failed state
|
||||
Want::Failed(failed) => {
|
||||
Want::Failed(failed.add_failed_partitions(vec![pref.clone()]))
|
||||
}
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Want {} in invalid state {:?} when partition {} failed. Should be Building or Failed.",
|
||||
want_id, want, pref.r#ref
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
self.wants.insert(want_id.clone(), transitioned);
|
||||
}
|
||||
}
|
||||
let newly_failed_wants = self.fail_directly_affected_wants(&failed_partitions);
|
||||
|
||||
// UpstreamBuilding → UpstreamFailed (for downstream wants waiting on newly failed wants)
|
||||
// TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants
|
||||
let downstream_wants_to_fail: Vec<String> = self
|
||||
.wants
|
||||
.iter()
|
||||
.filter_map(|(id, want)| {
|
||||
match want {
|
||||
Want::UpstreamBuilding(downstream_want) => {
|
||||
// Is this downstream want waiting for any of the newly failed wants?
|
||||
let is_affected = downstream_want
|
||||
.state
|
||||
.upstream_want_ids
|
||||
.iter()
|
||||
.any(|up_id| newly_failed_wants.contains(up_id));
|
||||
if is_affected { Some(id.clone()) } else { None }
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
for want_id in downstream_wants_to_fail {
|
||||
let want = self
|
||||
.wants
|
||||
.remove(&want_id)
|
||||
.expect(&format!("BUG: Want {} must exist", want_id));
|
||||
|
||||
let transitioned = match want {
|
||||
Want::UpstreamBuilding(downstream_want) => Want::UpstreamFailed(
|
||||
downstream_want
|
||||
.upstream_failed(newly_failed_wants.clone(), current_timestamp()),
|
||||
),
|
||||
_ => {
|
||||
panic!("BUG: Want {} should be UpstreamBuilding here", want_id);
|
||||
}
|
||||
};
|
||||
|
||||
self.wants.insert(want_id, transitioned);
|
||||
}
|
||||
self.cascade_failures_to_downstream_wants(&newly_failed_wants, current_timestamp());
|
||||
|
||||
Ok(vec![])
|
||||
}
|
||||
|
|
@ -572,28 +771,7 @@ impl BuildState {
|
|||
.ok_or(format!("No servicing wants found"))?;
|
||||
|
||||
// Transition partitions back to Missing since this job can't build them yet
|
||||
for pref in &job_run_detail.building_partitions {
|
||||
let partition = self.partitions.remove(&pref.r#ref).ok_or_else(|| {
|
||||
format!(
|
||||
"Partition {} must exist and be in Building state during dep_miss",
|
||||
pref.r#ref
|
||||
)
|
||||
})?;
|
||||
|
||||
// Only valid transition: Building -> Missing
|
||||
let transitioned = match partition {
|
||||
Partition::Building(building) => Partition::Missing(building.reset_to_missing()),
|
||||
// All other states are invalid
|
||||
_ => {
|
||||
return Err(format!(
|
||||
"Invalid state: partition {} must be Building during dep_miss, found {:?}",
|
||||
pref.r#ref, partition
|
||||
)
|
||||
.into());
|
||||
}
|
||||
};
|
||||
self.partitions.insert(pref.r#ref.clone(), transitioned);
|
||||
}
|
||||
self.reset_partitions_to_missing(&job_run_detail.building_partitions);
|
||||
|
||||
// Create wants from dep misses
|
||||
let want_events = missing_deps_to_want_events(
|
||||
|
|
@ -609,76 +787,14 @@ impl BuildState {
|
|||
// 2. Transition the current want to UpstreamBuilding, tracking which upstream wants it's waiting for
|
||||
|
||||
// Build a map: partition_ref -> want_id that will build it
|
||||
// This lets us track which upstream wants the current want depends on
|
||||
let mut partition_to_want_map: std::collections::HashMap<String, String> =
|
||||
std::collections::HashMap::new();
|
||||
for event_item in &want_events {
|
||||
if let Event::WantCreateV1(want_create) = event_item {
|
||||
for pref in &want_create.partitions {
|
||||
partition_to_want_map.insert(pref.r#ref.clone(), want_create.want_id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
let partition_to_want_map = self.build_partition_to_want_mapping(&want_events);
|
||||
|
||||
// For each want serviced by this job run, check if it was impacted by missing deps
|
||||
for servicing_want in &job_run_detail.servicing_wants {
|
||||
let want = self.wants.remove(&servicing_want.want_id).expect(&format!(
|
||||
"BUG: Want {} must exist when serviced by job run",
|
||||
servicing_want.want_id
|
||||
));
|
||||
|
||||
// Collect the upstream want IDs that this want now depends on
|
||||
let mut new_upstream_want_ids = Vec::new();
|
||||
|
||||
for missing_dep in &event.missing_deps {
|
||||
// Only process if this want contains an impacted partition
|
||||
let is_impacted = missing_dep.impacted.iter().any(|imp| {
|
||||
servicing_want
|
||||
.partitions
|
||||
.iter()
|
||||
.any(|p| p.r#ref == imp.r#ref)
|
||||
});
|
||||
|
||||
if is_impacted {
|
||||
// For each missing partition, find the want ID that will build it
|
||||
for missing_partition in &missing_dep.missing {
|
||||
if let Some(want_id) = partition_to_want_map.get(&missing_partition.r#ref) {
|
||||
new_upstream_want_ids.push(want_id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Dedupe upstream want IDs (one job might report same dep multiple times)
|
||||
new_upstream_want_ids.sort();
|
||||
new_upstream_want_ids.dedup();
|
||||
|
||||
let transitioned = if !new_upstream_want_ids.is_empty() {
|
||||
match want {
|
||||
Want::Building(building) => {
|
||||
// First dep miss for this want: Building → UpstreamBuilding
|
||||
Want::UpstreamBuilding(building.detect_missing_deps(new_upstream_want_ids))
|
||||
}
|
||||
Want::UpstreamBuilding(upstream) => {
|
||||
// Already in UpstreamBuilding, add more upstreams (self-transition)
|
||||
// This can happen if multiple jobs report dep misses, or one job reports multiple dep misses
|
||||
Want::UpstreamBuilding(upstream.add_upstreams(new_upstream_want_ids))
|
||||
}
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Want {} in invalid state {:?} when job run had dep miss. Should be Building or UpstreamBuilding.",
|
||||
servicing_want.want_id, want
|
||||
// Transition servicing wants to UpstreamBuilding when they have missing dependencies
|
||||
self.transition_wants_to_upstream_building(
|
||||
&job_run_detail.servicing_wants,
|
||||
&event.missing_deps,
|
||||
&partition_to_want_map,
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No new upstreams for this want (it wasn't impacted), keep current state
|
||||
want
|
||||
};
|
||||
|
||||
self.wants
|
||||
.insert(servicing_want.want_id.clone(), transitioned);
|
||||
}
|
||||
|
||||
Ok(want_events)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue