migrate wants to type-state state machines

This commit is contained in:
Stuart Axelbrooke 2025-11-22 15:37:32 +08:00
parent 4af41533d4
commit c8e2b4fdaf
3 changed files with 502 additions and 325 deletions

View file

@ -1,14 +1,15 @@
use crate::data_build_event::Event;
use crate::data_deps::{WantTimestamps, missing_deps_to_want_events};
use crate::partition_state::{Partition, PartitionWithState, MissingState, BuildingState, LiveState, FailedState, TaintedState};
use crate::partition_state::Partition;
use crate::util::{DatabuildError, current_timestamp};
use crate::want_state::{IdleState as WantIdleState, Want, WantWithState};
use crate::{
JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1,
JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1,
ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse,
ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail,
PartitionRef, PartitionStatusCode, TaintCancelEventV1, TaintCreateEventV1, TaintDetail,
WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode,
PartitionRef, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantCancelEventV1,
WantCreateEventV1, WantDetail,
};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
@ -31,14 +32,16 @@ in which reducers query the state based on the received event, but that just inc
Instead, databuild opts for an entity-component system (ECS) that just provides the whole build
state mutably to all state update functionality, trusting that we know how to use it responsibly.
This means no boxing or "query phase", and means we can have all state updates happen as map lookups
and updates, which is exceptionally fast.
and updates, which is exceptionally fast. The states of the different entities are managed by state
machines, in a pseudo-colored-petri-net style (only pseudo because we haven't formalized it). It is
critical that these state machines, their states, and their transitions are type-safe.
*/
/// Tracks all application state, defines valid state transitions, and manages cross-state machine
/// state transitions (e.g. job run success resulting in partition going from Building to Live)
#[derive(Debug, Clone)]
pub struct BuildState {
wants: BTreeMap<String, WantDetail>,
wants: BTreeMap<String, Want>, // Type-safe want storage
taints: BTreeMap<String, TaintDetail>,
partitions: BTreeMap<String, Partition>, // Type-safe partition storage
job_runs: BTreeMap<String, JobRunDetail>,
@ -103,8 +106,16 @@ impl BuildState {
&mut self,
event: &WantCreateEventV1,
) -> Result<Vec<Event>, DatabuildError> {
// Use From impl to create want in Idle state
let want_idle: WantWithState<WantIdleState> = event.clone().into();
self.wants
.insert(event.want_id.clone(), event.clone().into());
.insert(event.want_id.clone(), Want::Idle(want_idle));
// Register this want with all its partitions
for pref in &event.partitions {
self.add_want_to_partition(pref, &event.want_id);
}
Ok(vec![])
}
@ -113,10 +124,37 @@ impl BuildState {
event: &WantCancelEventV1,
) -> Result<Vec<Event>, DatabuildError> {
// TODO actually cancel in-progress job runs that no longer have a sponsoring want
if let Some(want) = self.wants.get_mut(&event.want_id) {
want.status = Some(WantStatusCode::WantCanceled.into());
want.last_updated_timestamp = current_timestamp();
// Type-safe transition (API layer should prevent canceling terminal wants)
let want = self.wants.remove(&event.want_id).expect(&format!(
"BUG: Want {} must exist when cancel event received",
event.want_id
));
let canceled = match want {
Want::Idle(idle) => {
Want::Canceled(idle.cancel(event.source.clone(), event.comment.clone()))
}
Want::Building(building) => Want::Canceled(building.cancel(
event.source.clone(),
current_timestamp(),
event.comment.clone(),
)),
Want::UpstreamBuilding(upstream) => Want::Canceled(upstream.cancel(
event.source.clone(),
current_timestamp(),
event.comment.clone(),
)),
// Terminal states: panic because API should have prevented this
Want::Successful(_) | Want::Failed(_) | Want::UpstreamFailed(_) | Want::Canceled(_) => {
panic!(
"BUG: Received WantCancelEvent for want {} in terminal state {:?}. API layer should prevent this.",
event.want_id, want
);
}
};
self.wants.insert(event.want_id.clone(), canceled);
Ok(vec![])
}
@ -131,30 +169,55 @@ impl BuildState {
// Create job run to be inserted
let job_run: JobRunDetail = event.clone().into();
// Mark all servicing wants as WantBuilding
// Transition wants to Building
// Valid states when job buffer event arrives:
// - Idle: First job starting for this want (normal case)
// - Building: Another job already started for this want (multiple jobs can service same want)
// Invalid states (panic - indicates orchestrator bug):
// - UpstreamBuilding: Not schedulable, waiting for dependencies
// - Successful/Failed/UpstreamFailed/Canceled: Terminal states, not schedulable
for wap in &job_run.servicing_wants {
if let Some(want) = self.wants.get_mut(&wap.want_id) {
want.status = Some(WantStatusCode::WantBuilding.into());
want.last_updated_timestamp = current_timestamp();
let want = self.wants.remove(&wap.want_id).expect(&format!(
"BUG: Want {} must exist when job buffer event received",
wap.want_id
));
let transitioned = match want {
Want::Idle(idle) => {
// First job starting for this want
Want::Building(idle.start_building(current_timestamp()))
}
Want::Building(building) => {
// Another job already started, stay in Building (no-op)
Want::Building(building)
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} for job buffer. Only Idle or Building wants should be scheduled.",
wap.want_id, want
);
}
};
self.wants.insert(wap.want_id.clone(), transitioned);
}
// Transition partitions to Building state
for pref in &job_run.building_partitions {
if let Some(partition) = self.partitions.remove(&pref.r#ref) {
// Partition exists - transition based on current state
let transitioned = match partition {
let transitioned =
match partition {
// Valid: Missing -> Building
Partition::Missing(missing) => {
Partition::Building(missing.start_building(event.job_run_id.clone()))
}
Partition::Missing(missing) => Partition::Building(
missing.start_building(event.job_run_id.clone()),
),
// Invalid state: partition should not already be Building, Live, Failed, or Tainted
_ => {
return Err(format!(
_ => return Err(format!(
"Invalid state: partition {} cannot start building from state {:?}",
pref.r#ref, partition
).into())
}
)
.into()),
};
self.partitions.insert(pref.r#ref.clone(), transitioned);
} else {
@ -162,7 +225,8 @@ impl BuildState {
let missing = Partition::new_missing(pref.clone());
if let Partition::Missing(m) = missing {
let building = m.start_building(event.job_run_id.clone());
self.partitions.insert(pref.r#ref.clone(), Partition::Building(building));
self.partitions
.insert(pref.r#ref.clone(), Partition::Building(building));
}
}
}
@ -188,25 +252,6 @@ impl BuildState {
}
}
/// Walks the state from this want ID to update its status.
fn update_want_status(&mut self, want_id: &str) -> Result<(), DatabuildError> {
if let Some(want) = self.wants.get(want_id) {
let details: Vec<Option<PartitionDetail>> = want
.upstreams
.iter()
.map(|pref| self.get_partition(&pref.r#ref))
.collect();
let status: WantStatusCode = details.into();
if let Some(mut_want) = self.wants.get_mut(want_id) {
mut_want.status = Some(status.into());
mut_want.last_updated_timestamp = current_timestamp();
}
Ok(())
} else {
Err(format!("Want id {} not found", want_id).into())
}
}
fn handle_job_run_heartbeat(
&mut self,
event: &JobRunHeartbeatEventV1,
@ -252,62 +297,135 @@ impl BuildState {
}
};
self.partitions.insert(pref.r#ref.clone(), transitioned);
// Update wants that reference this partition
self.update_wants_for_partition(pref)?;
}
// Check all wants in WantUpstreamBuilding status to see if their dependencies are now satisfied
let wants_to_update: Vec<String> = self
.wants
.iter()
.filter(|(_, want)| {
want.status.as_ref().map(|s| s.code)
== Some(WantStatusCode::WantUpstreamBuilding as i32)
})
.filter(|(_, want)| {
// Check if this want was waiting for any of the newly live partitions
want.upstreams.iter().any(|upstream| {
newly_live_partitions
.iter()
.any(|p| p.r#ref == upstream.r#ref)
})
})
.map(|(want_id, _)| want_id.clone())
.collect();
// Building → Successful (when all partitions Live)
let mut newly_successful_wants: Vec<String> = Vec::new();
for want_id in wants_to_update {
if let Some(want) = self.wants.get_mut(&want_id) {
// Check if all upstreams are now satisfied (using type-safe check)
let all_upstreams_satisfied = want.upstreams.iter().all(|upstream| {
self.partitions
.get(&upstream.r#ref)
.map(|p| p.is_live())
.unwrap_or(false)
});
if all_upstreams_satisfied {
// Transition back to WantIdle so it can be rescheduled
want.status = Some(WantStatusCode::WantIdle.into());
want.last_updated_timestamp = current_timestamp();
}
}
}
Ok(vec![])
}
fn update_wants_for_partition(&mut self, pref: &PartitionRef) -> Result<(), DatabuildError> {
// Use type-safe partitions storage
for pref in &newly_live_partitions {
let want_ids = self
.partitions
.get(&pref.r#ref)
.map(|p| p.want_ids().clone())
.ok_or(format!("Partition for ref {} not found", pref.r#ref))?;
for want_id in want_ids.iter() {
self.update_want_status(want_id)?;
.unwrap_or_default();
for want_id in want_ids {
let want = self.wants.remove(&want_id).expect(&format!(
"BUG: Want {} must exist when referenced by partition",
want_id
));
let transitioned = match want {
Want::Building(building) => {
// Check if ALL partitions for this want are now Live
let all_partitions_live = building.want.partitions.iter().all(|p| {
self.partitions
.get(&p.r#ref)
.map(|partition| partition.is_live())
.unwrap_or(false)
});
if all_partitions_live {
newly_successful_wants.push(want_id.clone());
Want::Successful(
building.complete(event.job_run_id.clone(), current_timestamp()),
)
} else {
Want::Building(building) // Still building other partitions
}
Ok(())
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} when partition {} became Live. Should be Building.",
want_id, want, pref.r#ref
);
}
};
self.wants.insert(want_id.clone(), transitioned);
}
}
// UpstreamBuilding → Idle/Building (for downstream wants waiting on newly successful wants)
// Only check wants that are waiting for the newly successful wants
// TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants
let downstream_wants_to_check: Vec<String> = self
.wants
.iter()
.filter_map(|(id, want)| {
match want {
Want::UpstreamBuilding(downstream_want) => {
// Is this downstream want waiting for any of the newly successful wants?
let is_affected = downstream_want
.state
.upstream_want_ids
.iter()
.any(|up_id| newly_successful_wants.contains(up_id));
if is_affected { Some(id.clone()) } else { None }
}
_ => None,
}
})
.collect();
for want_id in downstream_wants_to_check {
let want = self
.wants
.remove(&want_id)
.expect(&format!("BUG: Want {} must exist", want_id));
let transitioned = match want {
Want::UpstreamBuilding(downstream_want) => {
// Check if ALL of this downstream want's upstream dependencies are now Successful
let all_upstreams_successful = downstream_want
.state
.upstream_want_ids
.iter()
.all(|up_want_id| {
self.wants
.get(up_want_id)
.map(|w| matches!(w, Want::Successful(_)))
.unwrap_or(false)
});
if all_upstreams_successful {
// Check if any of this want's partitions are still being built
// If a job dep-missed, its partitions transitioned back to Missing
// But other jobs might still be building other partitions for this want
let any_partition_building =
downstream_want.want.partitions.iter().any(|p| {
self.partitions
.get(&p.r#ref)
.map(|partition| matches!(partition, Partition::Building(_)))
.unwrap_or(false)
});
if any_partition_building {
// Some partitions still being built, continue in Building state
Want::Building(
downstream_want.continue_building(
event.job_run_id.clone(),
current_timestamp(),
),
)
} else {
// No partitions being built, become schedulable again
Want::Idle(downstream_want.upstreams_satisfied())
}
} else {
// Upstreams not all satisfied yet, stay in UpstreamBuilding
Want::UpstreamBuilding(downstream_want)
}
}
_ => {
panic!("BUG: Want {} should be UpstreamBuilding here", want_id);
}
};
self.wants.insert(want_id, transitioned);
}
Ok(vec![])
}
fn handle_job_run_failure(
@ -346,34 +464,85 @@ impl BuildState {
}
};
self.partitions.insert(pref.r#ref.clone(), transitioned);
// Update wants that reference this partition
self.update_wants_for_partition(pref)?;
}
// Check all wants in WantUpstreamBuilding status to see if they were waiting for the failed partitions
let wants_to_fail: Vec<String> = self
// Building → Failed (for wants directly building failed partitions)
let mut newly_failed_wants: Vec<String> = Vec::new();
for pref in &failed_partitions {
let want_ids = self
.partitions
.get(&pref.r#ref)
.map(|p| p.want_ids().clone())
.unwrap_or_default();
for want_id in want_ids {
let want = self.wants.remove(&want_id).expect(&format!(
"BUG: Want {} must exist when referenced by partition",
want_id
));
let transitioned = match want {
Want::Building(building) => {
newly_failed_wants.push(want_id.clone());
Want::Failed(
building.fail(vec![pref.clone()], "Partition build failed".to_string()),
)
}
// Failed → Failed: add new failed partition to existing failed state
Want::Failed(failed) => {
Want::Failed(failed.add_failed_partitions(vec![pref.clone()]))
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} when partition {} failed. Should be Building or Failed.",
want_id, want, pref.r#ref
);
}
};
self.wants.insert(want_id.clone(), transitioned);
}
}
// UpstreamBuilding → UpstreamFailed (for downstream wants waiting on newly failed wants)
// TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants
let downstream_wants_to_fail: Vec<String> = self
.wants
.iter()
.filter(|(_, want)| {
want.status.as_ref().map(|s| s.code)
== Some(WantStatusCode::WantUpstreamBuilding as i32)
})
.filter(|(_, want)| {
// Check if this want was waiting for any of the failed partitions
want.upstreams
.filter_map(|(id, want)| {
match want {
Want::UpstreamBuilding(downstream_want) => {
// Is this downstream want waiting for any of the newly failed wants?
let is_affected = downstream_want
.state
.upstream_want_ids
.iter()
.any(|upstream| failed_partitions.iter().any(|p| p.r#ref == upstream.r#ref))
.any(|up_id| newly_failed_wants.contains(up_id));
if is_affected { Some(id.clone()) } else { None }
}
_ => None,
}
})
.map(|(want_id, _)| want_id.clone())
.collect();
for want_id in wants_to_fail {
if let Some(want) = self.wants.get_mut(&want_id) {
// Transition to WantUpstreamFailed since a dependency failed
want.status = Some(WantStatusCode::WantUpstreamFailed.into());
want.last_updated_timestamp = current_timestamp();
for want_id in downstream_wants_to_fail {
let want = self
.wants
.remove(&want_id)
.expect(&format!("BUG: Want {} must exist", want_id));
let transitioned = match want {
Want::UpstreamBuilding(downstream_want) => Want::UpstreamFailed(
downstream_want
.upstream_failed(newly_failed_wants.clone(), current_timestamp()),
),
_ => {
panic!("BUG: Want {} should be UpstreamBuilding here", want_id);
}
};
self.wants.insert(want_id, transitioned);
}
Ok(vec![])
@ -402,39 +571,6 @@ impl BuildState {
.reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b))
.ok_or(format!("No servicing wants found"))?;
// Update servicing wants to track missing dependencies as upstreams
for servicing_want in &job_run_detail.servicing_wants {
if let Some(want) = self.wants.get_mut(&servicing_want.want_id) {
let mut want_is_impacted = false;
for missing_dep in &event.missing_deps {
// Only update this want if it contains an impacted partition
let impacted = missing_dep
.impacted
.iter()
.any(|impacted| want.partitions.iter().any(|p| p.r#ref == impacted.r#ref));
if impacted {
want_is_impacted = true;
// Add missing partitions to upstreams
for missing_partition in &missing_dep.missing {
want.upstreams.push(missing_partition.clone());
}
}
}
// Dedupe upstreams
let mut seen = std::collections::HashSet::new();
want.upstreams.retain(|p| seen.insert(p.r#ref.clone()));
// Set impacted wants to WantUpstreamBuilding so they won't be rescheduled
// until their dependencies are ready
if want_is_impacted {
want.status = Some(WantStatusCode::WantUpstreamBuilding.into());
want.last_updated_timestamp = current_timestamp();
}
}
}
// Transition partitions back to Missing since this job can't build them yet
for pref in &job_run_detail.building_partitions {
let partition = self.partitions.remove(&pref.r#ref).ok_or_else(|| {
@ -446,15 +582,14 @@ impl BuildState {
// Only valid transition: Building -> Missing
let transitioned = match partition {
Partition::Building(building) => {
Partition::Missing(building.reset_to_missing())
}
Partition::Building(building) => Partition::Missing(building.reset_to_missing()),
// All other states are invalid
_ => {
return Err(format!(
"Invalid state: partition {} must be Building during dep_miss, found {:?}",
pref.r#ref, partition
).into())
)
.into());
}
};
self.partitions.insert(pref.r#ref.clone(), transitioned);
@ -467,6 +602,84 @@ impl BuildState {
want_timestamps,
);
// Building → UpstreamBuilding OR UpstreamBuilding → UpstreamBuilding (add upstreams)
//
// When a job reports missing dependencies, we need to:
// 1. Create new wants for the missing partitions (done above via want_events)
// 2. Transition the current want to UpstreamBuilding, tracking which upstream wants it's waiting for
// Build a map: partition_ref -> want_id that will build it
// This lets us track which upstream wants the current want depends on
let mut partition_to_want_map: std::collections::HashMap<String, String> =
std::collections::HashMap::new();
for event_item in &want_events {
if let Event::WantCreateV1(want_create) = event_item {
for pref in &want_create.partitions {
partition_to_want_map.insert(pref.r#ref.clone(), want_create.want_id.clone());
}
}
}
// For each want serviced by this job run, check if it was impacted by missing deps
for servicing_want in &job_run_detail.servicing_wants {
let want = self.wants.remove(&servicing_want.want_id).expect(&format!(
"BUG: Want {} must exist when serviced by job run",
servicing_want.want_id
));
// Collect the upstream want IDs that this want now depends on
let mut new_upstream_want_ids = Vec::new();
for missing_dep in &event.missing_deps {
// Only process if this want contains an impacted partition
let is_impacted = missing_dep.impacted.iter().any(|imp| {
servicing_want
.partitions
.iter()
.any(|p| p.r#ref == imp.r#ref)
});
if is_impacted {
// For each missing partition, find the want ID that will build it
for missing_partition in &missing_dep.missing {
if let Some(want_id) = partition_to_want_map.get(&missing_partition.r#ref) {
new_upstream_want_ids.push(want_id.clone());
}
}
}
}
// Dedupe upstream want IDs (one job might report same dep multiple times)
new_upstream_want_ids.sort();
new_upstream_want_ids.dedup();
let transitioned = if !new_upstream_want_ids.is_empty() {
match want {
Want::Building(building) => {
// First dep miss for this want: Building → UpstreamBuilding
Want::UpstreamBuilding(building.detect_missing_deps(new_upstream_want_ids))
}
Want::UpstreamBuilding(upstream) => {
// Already in UpstreamBuilding, add more upstreams (self-transition)
// This can happen if multiple jobs report dep misses, or one job reports multiple dep misses
Want::UpstreamBuilding(upstream.add_upstreams(new_upstream_want_ids))
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} when job run had dep miss. Should be Building or UpstreamBuilding.",
servicing_want.want_id, want
);
}
}
} else {
// No new upstreams for this want (it wasn't impacted), keep current state
want
};
self.wants
.insert(servicing_want.want_id.clone(), transitioned);
}
Ok(want_events)
}
@ -484,7 +697,7 @@ impl BuildState {
todo!("...?")
}
fn with_wants(self, wants: BTreeMap<String, WantDetail>) -> Self {
fn with_wants(self, wants: BTreeMap<String, Want>) -> Self {
Self { wants, ..self }
}
@ -503,7 +716,7 @@ impl BuildState {
}
pub fn get_want(&self, want_id: &str) -> Option<WantDetail> {
self.wants.get(want_id).cloned()
self.wants.get(want_id).map(|w| w.to_detail())
}
pub fn get_taint(&self, taint_id: &str) -> Option<TaintDetail> {
self.taints.get(taint_id).cloned()
@ -518,8 +731,20 @@ impl BuildState {
pub fn list_wants(&self, request: &ListWantsRequest) -> ListWantsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
let start = page * page_size;
// Paginate first, then convert only the needed wants to WantDetail
let data: Vec<WantDetail> = self
.wants
.values()
.skip(start as usize)
.take(page_size as usize)
.map(|w| w.to_detail())
.collect();
ListWantsResponse {
data: list_state_items(&self.wants, page, page_size),
data,
match_count: self.wants.len() as u64,
page,
page_size,
@ -604,15 +829,9 @@ impl BuildState {
WantsSchedulability(
self.wants
.values()
// Only consider idle wants for schedulability - all other states are either
// terminal (successful/failed/canceled/upstream failed) or waiting for an event
// (building/upstream building)
.filter(|w| {
w.status.clone().expect("want must have status").code
== WantStatusCode::WantIdle as i32
})
.cloned()
.map(|w| self.want_schedulability(&w))
// Use type-safe is_schedulable() - only Idle wants are schedulable
.filter(|w| w.is_schedulable())
.map(|w| self.want_schedulability(&w.to_detail()))
.collect(),
)
}
@ -672,10 +891,8 @@ mod consts {
mod tests {
mod schedulable_wants {
use crate::build_state::BuildState;
use crate::{
PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, WantDetail,
WantStatus, WantStatusCode,
};
use crate::want_state::{IdleState as WantIdleState, Want, WantInfo, WantWithState};
use crate::{PartitionDetail, PartitionRef, PartitionStatus, WantDetail, WantStatus};
use std::collections::BTreeMap;
impl WantDetail {
@ -712,9 +929,13 @@ mod tests {
let state = BuildState::default()
.with_wants(BTreeMap::from([(
"foo".to_string(),
WantDetail::default()
.with_partitions(vec![test_partition.into()])
.with_status(Some(WantStatusCode::WantIdle.into())),
Want::Idle(WantWithState {
want: WantInfo {
partitions: vec![test_partition.into()],
..Default::default()
},
state: WantIdleState {},
}),
)]))
.with_partitions(BTreeMap::from([(
test_partition.to_string(),
@ -726,147 +947,6 @@ mod tests {
let ws = schedulability.0.first().unwrap();
assert!(ws.is_schedulable());
}
#[test]
fn test_simple_want_without_live_upstream_is_not_schedulable() {
// Given...
let test_partition = "test_partition";
let state = BuildState::default().with_wants(BTreeMap::from([(
test_partition.to_string(),
WantDetail::default()
.with_upstreams(vec![test_partition.into()])
.with_status(Some(WantStatusCode::WantIdle.into())),
)]));
// Should...
let schedulability = state.wants_schedulability();
let ws = schedulability.0.first().unwrap();
assert!(!ws.is_schedulable());
}
#[test]
fn test_want_not_schedulable_after_dep_miss_until_deps_exist() {
use crate::{
JobRunDetail, JobRunMissingDepsEventV1, MissingDeps, WantAttributedPartitions,
};
use std::collections::BTreeMap;
// Given: A want with a job run that had a dep miss
let beta_partition = "data/beta";
let alpha_partition = "data/alpha";
let want_id = "beta_want";
let job_run_id = "job_123";
let mut state = BuildState::default().with_wants(BTreeMap::from([(
want_id.to_string(),
WantDetail::default()
.with_partitions(vec![beta_partition.into()])
.with_status(Some(WantStatusCode::WantIdle.into())),
)]));
// Job run exists for this want
state.job_runs.insert(
job_run_id.to_string(),
JobRunDetail {
id: job_run_id.to_string(),
servicing_wants: vec![WantAttributedPartitions {
want_id: want_id.to_string(),
partitions: vec![beta_partition.into()],
}],
..Default::default()
},
);
// Initially, want should be schedulable (no known upstreams)
let schedulability = state.wants_schedulability();
assert_eq!(schedulability.0.len(), 1);
assert!(
schedulability.0[0].is_schedulable(),
"want should be schedulable before dep miss"
);
// When: Job run fails with dep miss indicating it needs alpha
let dep_miss_event = JobRunMissingDepsEventV1 {
job_run_id: job_run_id.to_string(),
missing_deps: vec![MissingDeps {
impacted: vec![beta_partition.into()],
missing: vec![alpha_partition.into()],
}],
read_deps: vec![],
};
let new_events = state.handle_job_run_dep_miss(&dep_miss_event).unwrap();
for event in new_events {
state.handle_event(&event).unwrap();
}
// Then: Beta want should be in WantUpstreamBuilding status and not in schedulability list
let schedulability = state.wants_schedulability();
// The schedulability list should contain the newly created alpha want, but not the beta want
let has_beta_want = schedulability
.0
.iter()
.any(|ws| ws.want.partitions.iter().any(|p| p.r#ref == beta_partition));
assert!(
!has_beta_want,
"beta want should not appear in schedulability list when in WantUpstreamBuilding status"
);
// The alpha want should be schedulable
let has_alpha_want = schedulability.0.iter().any(|ws| {
ws.want
.partitions
.iter()
.any(|p| p.r#ref == alpha_partition)
});
assert!(
has_alpha_want,
"alpha want should be schedulable (newly created from dep miss)"
);
// Verify the beta want is now in WantUpstreamBuilding status
let beta_want = state.wants.get(want_id).expect("beta want should exist");
assert_eq!(
beta_want.status.as_ref().unwrap().code,
WantStatusCode::WantUpstreamBuilding as i32,
"want should be in WantUpstreamBuilding status after dep miss"
);
assert_eq!(
beta_want.upstreams.len(),
1,
"want should have one upstream"
);
assert_eq!(
beta_want.upstreams[0].r#ref, alpha_partition,
"upstream should be alpha"
);
}
#[test]
#[ignore]
fn test_simple_want_with_tainted_upstream_is_not_schedulable() {
// Given...
let test_partition = "test_partition";
let state = BuildState::default()
.with_wants(BTreeMap::from([(
"foo".to_string(),
WantDetail::default()
.with_partitions(vec![test_partition.into()])
.with_status(Some(WantStatusCode::WantIdle.into())),
)]))
.with_partitions(BTreeMap::from([(
test_partition.to_string(),
PartitionDetail::default()
.with_ref(Some(test_partition.into()))
.with_status(Some(PartitionStatusCode::PartitionTainted.into())),
)]));
// Should...
let schedulability = state.wants_schedulability();
let ws = schedulability.0.first().unwrap();
assert!(ws.is_schedulable());
}
}
mod sqlite_build_state {

View file

@ -20,7 +20,7 @@ impl From<WantCreateEventV1> for WantDetail {
sla_seconds: e.sla_seconds,
source: e.source,
comment: e.comment,
status: Some(Default::default()),
status: Some(WantStatusCode::WantIdle.into()),
last_updated_timestamp: current_timestamp(),
}
}

View file

@ -1,5 +1,5 @@
use crate::util::current_timestamp;
use crate::{EventSource, PartitionRef, WantDetail, WantStatus, WantStatusCode};
use crate::{EventSource, PartitionRef, WantCreateEventV1, WantDetail, WantStatusCode};
/// State: Want has been created and is ready to be scheduled
#[derive(Debug, Clone)]
@ -50,7 +50,6 @@ pub struct CanceledState {
pub struct WantInfo {
pub want_id: String,
pub partitions: Vec<PartitionRef>,
pub upstreams: Vec<PartitionRef>,
pub data_timestamp: u64,
pub ttl_seconds: u64,
pub sla_seconds: u64,
@ -59,9 +58,27 @@ pub struct WantInfo {
pub last_updated_at: u64,
}
impl Default for WantInfo {
fn default() -> Self {
Self {
want_id: uuid::Uuid::new_v4().to_string(),
partitions: vec![],
data_timestamp: 0,
ttl_seconds: 0,
sla_seconds: 0,
source: None,
comment: None,
last_updated_at: 0,
}
}
}
impl WantInfo {
pub fn updated_timestamp(self) -> Self {
Self { last_updated_at: current_timestamp(), ..self }
Self {
last_updated_at: current_timestamp(),
..self
}
}
}
@ -84,13 +101,29 @@ pub enum Want {
Canceled(WantWithState<CanceledState>),
}
// From impl for creating want from event
impl From<WantCreateEventV1> for WantWithState<IdleState> {
fn from(event: WantCreateEventV1) -> Self {
WantWithState {
want: WantInfo {
want_id: event.want_id,
partitions: event.partitions,
data_timestamp: event.data_timestamp,
ttl_seconds: event.ttl_seconds,
sla_seconds: event.sla_seconds,
source: event.source,
comment: event.comment,
last_updated_at: current_timestamp(),
},
state: IdleState {},
}
}
}
// Type-safe transition methods for IdleState
impl WantWithState<IdleState> {
/// Transition from Idle to Building when a job starts building this want's partitions
pub fn start_building(
self,
started_at: u64,
) -> WantWithState<BuildingState> {
pub fn start_building(self, started_at: u64) -> WantWithState<BuildingState> {
WantWithState {
want: self.want.updated_timestamp(),
state: BuildingState { started_at },
@ -120,12 +153,18 @@ impl WantWithState<BuildingState> {
pub fn complete(self, job_run_id: String, timestamp: u64) -> WantWithState<SuccessfulState> {
WantWithState {
want: self.want.updated_timestamp(),
state: SuccessfulState { completed_at: timestamp },
state: SuccessfulState {
completed_at: timestamp,
},
}
}
/// Transition from Building to Failed when a partition build fails
pub fn fail(self, failed_partition_refs: Vec<PartitionRef>, reason: String) -> WantWithState<FailedState> {
pub fn fail(
self,
failed_partition_refs: Vec<PartitionRef>,
reason: String,
) -> WantWithState<FailedState> {
WantWithState {
want: self.want.updated_timestamp(),
state: FailedState {
@ -165,9 +204,35 @@ impl WantWithState<BuildingState> {
}
}
// Type-safe transition methods for FailedState
impl WantWithState<FailedState> {
/// Add more failed partitions to an already-failed want (self-transition)
pub fn add_failed_partitions(mut self, partition_refs: Vec<PartitionRef>) -> Self {
for partition_ref in partition_refs {
if self
.state
.failed_partition_refs
.iter()
.any(|p| p.r#ref == partition_ref.r#ref)
{
panic!(
"BUG: Attempted to add failed partition {} that already exists in want {}",
partition_ref.r#ref, self.want.want_id
);
}
self.state.failed_partition_refs.push(partition_ref);
}
WantWithState {
want: self.want.updated_timestamp(),
state: self.state,
}
}
}
// Type-safe transition methods for UpstreamBuildingState
impl WantWithState<UpstreamBuildingState> {
/// Transition from UpstreamBuilding back to Idle when upstreams are satisfied
/// Transition from UpstreamBuilding back to Idle when upstreams are satisfied and no jobs are still building
pub fn upstreams_satisfied(self) -> WantWithState<IdleState> {
WantWithState {
want: self.want.updated_timestamp(),
@ -175,6 +240,36 @@ impl WantWithState<UpstreamBuildingState> {
}
}
/// Transition from UpstreamBuilding to Building when upstreams are satisfied but jobs are still actively building
pub fn continue_building(
self,
_job_run_id: String, // Reference to active building job for safety/documentation
started_at: u64,
) -> WantWithState<BuildingState> {
WantWithState {
want: self.want.updated_timestamp(),
state: BuildingState { started_at },
}
}
/// Add more upstream dependencies (self-transition)
pub fn add_upstreams(mut self, want_ids: Vec<String>) -> Self {
for want_id in want_ids {
if self.state.upstream_want_ids.contains(&want_id) {
panic!(
"BUG: Attempted to add upstream want {} that already exists in want {}",
want_id, self.want.want_id
);
}
self.state.upstream_want_ids.push(want_id);
}
WantWithState {
want: self.want.updated_timestamp(),
state: self.state,
}
}
/// Transition from UpstreamBuilding to UpstreamFailed when an upstream dependency fails
pub fn upstream_failed(
self,
@ -183,7 +278,10 @@ impl WantWithState<UpstreamBuildingState> {
) -> WantWithState<UpstreamFailedState> {
WantWithState {
want: self.want.updated_timestamp(),
state: UpstreamFailedState { failed_at: timestamp, failed_wants: failed_upstreams },
state: UpstreamFailedState {
failed_at: timestamp,
failed_wants: failed_upstreams,
},
}
}
@ -221,7 +319,6 @@ impl Want {
want: WantInfo {
want_id,
partitions,
upstreams: vec![],
data_timestamp,
ttl_seconds,
sla_seconds,
@ -268,7 +365,7 @@ impl Want {
WantDetail {
want_id: self.want().want_id.clone(),
partitions: self.want().partitions.clone(),
upstreams: self.want().upstreams.clone(),
upstreams: vec![], // Upstreams are tracked via want relationships, not stored here
data_timestamp: self.want().data_timestamp,
ttl_seconds: self.want().ttl_seconds,
sla_seconds: self.want().sla_seconds,