databuild/databuild/build_state.rs

1962 lines
83 KiB
Rust

use crate::data_build_event::Event;
use crate::data_deps::{WantTimestamps, missing_deps_to_want_events};
use crate::event_source::Source as EventSourceVariant;
use crate::job_run_state::{JobRun, JobRunWithState, QueuedState as JobQueuedState};
use crate::partition_state::{
BuildingPartitionRef, BuildingState, FailedPartitionRef, LivePartitionRef, Partition,
PartitionWithState, TaintedPartitionRef, UpForRetryPartitionRef, UpstreamBuildingPartitionRef,
UpstreamFailedPartitionRef,
};
use crate::util::current_timestamp;
use crate::want_state::{
FailedWantId, IdleState as WantIdleState, NewState as WantNewState, SuccessfulWantId, Want,
WantWithState,
};
use crate::{
JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1,
JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1,
ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse,
ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail,
PartitionRef, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantCancelEventV1,
WantCreateEventV1, WantDetail,
};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use tracing;
use uuid::Uuid;
/**
Design Notes
The build state struct is the heart of the service and orchestrator, adapting build events to
higher level questions about build state. One temptation is to implement the build state as a set
of hierarchically defined reducers, to achieve information hiding and factor system capabilities and
state tracking simply. Unfortunately, to update state based on an event, you need a mutable borrow
of some part of the build state (that the reducer controls, for instance), and an immutable borrow
of the whole state for read/query purposes. The whole state needs to be available to handle state
updates like "this is the list of currently active job runs" in response to a job run event. Put
simply, this isn't possible without introducing some locking of the whole state and mutable state
subset, since they would conflict (the mutable subset would have already been borrowed, so can't
be borrowed immutably as part of the whole state borrow). You might also define a "query" phase
in which reducers query the state based on the received event, but that just increases complexity.
Instead, databuild opts for an entity-component system (ECS) that just provides the whole build
state mutably to all state update functionality, trusting that we know how to use it responsibly.
This means no boxing or "query phase", and means we can have all state updates happen as map lookups
and updates, which is exceptionally fast. The states of the different entities are managed by state
machines, in a pseudo-colored-petri-net style (only pseudo because we haven't formalized it). It is
critical that these state machines, their states, and their transitions are type-safe.
*/
/// Tracks all application state, defines valid state transitions, and manages cross-state machine
/// state transitions (e.g. job run success resulting in partition going from Building to Live)
#[derive(Debug, Clone, Default)]
pub struct BuildState {
// Core entity storage
wants: BTreeMap<String, Want>,
taints: BTreeMap<String, TaintDetail>,
job_runs: BTreeMap<String, JobRun>,
// UUID-based partition indexing
partitions_by_uuid: BTreeMap<Uuid, Partition>,
canonical_partitions: BTreeMap<String, Uuid>, // partition ref → current UUID
// Inverted indexes
wants_for_partition: BTreeMap<String, Vec<String>>, // partition ref → want_ids
downstream_waiting: BTreeMap<String, Vec<Uuid>>, // upstream ref → partition UUIDs waiting for it
}
impl BuildState {
/// Reconstruct BuildState from a sequence of events (for read path in web server)
/// This allows the web server to rebuild state from BEL storage without holding a lock
pub fn from_events(events: &[crate::DataBuildEvent]) -> Self {
let mut state = BuildState::default();
for event in events {
if let Some(ref inner_event) = event.event {
// handle_event returns Vec<Event> for cascading events, but we ignore them
// since we're replaying from a complete event log
state.handle_event(inner_event);
}
}
state
}
pub fn count_job_runs(&self) -> usize {
self.job_runs.len()
}
// ===== New UUID-based partition access methods =====
/// Get the canonical partition for a ref (the current/active partition instance)
pub fn get_canonical_partition(&self, partition_ref: &str) -> Option<&Partition> {
self.canonical_partitions
.get(partition_ref)
.and_then(|uuid| self.partitions_by_uuid.get(uuid))
}
/// Get the canonical partition UUID for a ref
pub fn get_canonical_partition_uuid(&self, partition_ref: &str) -> Option<Uuid> {
self.canonical_partitions.get(partition_ref).copied()
}
/// Get a partition by its UUID
pub fn get_partition_by_uuid(&self, uuid: Uuid) -> Option<&Partition> {
self.partitions_by_uuid.get(&uuid)
}
/// Take the canonical partition for a ref (removes from partitions_by_uuid for state transition)
/// The canonical_partitions mapping is NOT removed - caller must update it if creating a new partition
fn take_canonical_partition(&mut self, partition_ref: &str) -> Option<Partition> {
self.canonical_partitions
.get(partition_ref)
.copied()
.and_then(|uuid| self.partitions_by_uuid.remove(&uuid))
}
/// Get want IDs for a partition ref (from inverted index)
pub fn get_wants_for_partition(&self, partition_ref: &str) -> &[String] {
self.wants_for_partition
.get(partition_ref)
.map(|v| v.as_slice())
.unwrap_or(&[])
}
/// Register a want in the wants_for_partition inverted index
fn register_want_for_partitions(&mut self, want_id: &str, partition_refs: &[PartitionRef]) {
for pref in partition_refs {
let want_ids = self
.wants_for_partition
.entry(pref.r#ref.clone())
.or_insert_with(Vec::new);
if !want_ids.contains(&want_id.to_string()) {
want_ids.push(want_id.to_string());
}
}
}
/// Create a new partition in Building state and update indexes
fn create_partition_building(&mut self, job_run_id: &str, partition_ref: PartitionRef) -> Uuid {
let partition =
PartitionWithState::<BuildingState>::new(job_run_id.to_string(), partition_ref.clone());
let uuid = partition.uuid;
// Update indexes
self.partitions_by_uuid
.insert(uuid, Partition::Building(partition));
self.canonical_partitions
.insert(partition_ref.r#ref.clone(), uuid);
tracing::info!(
partition = %partition_ref.r#ref,
uuid = %uuid,
job_run_id = %job_run_id,
"Partition: Created in Building state"
);
uuid
}
/// Update a partition in the indexes (after state transition)
fn update_partition(&mut self, partition: Partition) {
let uuid = partition.uuid();
// Update in UUID map
self.partitions_by_uuid.insert(uuid, partition);
}
/// Handle creation of a derivative want (created due to job dep miss)
///
/// When a job reports missing dependencies, it returns WantCreateV1 events for those missing partitions.
/// Those events get appended to the BEL and eventually processed by handle_want_create().
///
/// This function is called when we detect a derivative want (has source.job_triggered) and transitions
/// the impacted wants to UpstreamBuilding state, tracking the derivative want ID as an upstream dependency.
///
/// KEY INSIGHT: We must use the actual want_id from the WantCreateV1 event, not synthetic UUIDs generated
/// during event processing. This ensures replay works correctly - the same want IDs are used both during
/// original execution and during replay from the BEL.
fn handle_derivative_want_creation(
&mut self,
derivative_want_id: &str,
derivative_want_partitions: &[PartitionRef],
source_job_run_id: &str,
) {
// Look up the job run that triggered this derivative want
// This job run must be in DepMiss state because it reported missing dependencies
let job_run = self.job_runs.get(source_job_run_id).expect(&format!(
"BUG: Job run {} must exist when derivative want created",
source_job_run_id
));
// Extract the missing deps from the DepMiss job run
let missing_deps = match job_run {
JobRun::DepMiss(dep_miss) => dep_miss.get_missing_deps(),
_ => {
panic!(
"BUG: Job run {} must be in DepMiss state when derivative want created, found {:?}",
source_job_run_id, job_run
);
}
};
// Find which MissingDeps entry corresponds to this derivative want
// The derivative want was created for a specific set of missing partitions,
// and we need to find which downstream partitions are impacted by those missing partitions
for md in missing_deps {
// Check if this derivative want's partitions match the missing partitions in this entry
// We need exact match because one dep miss event can create multiple derivative wants
let partitions_match = md.missing.iter().all(|missing_ref| {
derivative_want_partitions
.iter()
.any(|p| p.r#ref == missing_ref.r#ref)
}) && derivative_want_partitions.len() == md.missing.len();
if partitions_match {
// Now we know which partitions are impacted by this missing dependency
let impacted_partition_refs: Vec<String> =
md.impacted.iter().map(|p| p.r#ref.clone()).collect();
tracing::debug!(
derivative_want_id = %derivative_want_id,
source_job_run_id = %source_job_run_id,
missing_partitions = ?derivative_want_partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
impacted_partitions = ?impacted_partition_refs,
"Processing derivative want creation"
);
// Find all wants that include these impacted partitions
// These are the wants that need to wait for the derivative want to complete
let mut impacted_want_ids: std::collections::HashSet<String> =
std::collections::HashSet::new();
for partition_ref in &impacted_partition_refs {
for want_id in self.get_wants_for_partition(partition_ref) {
impacted_want_ids.insert(want_id.clone());
}
}
// Transition each impacted want to UpstreamBuilding, tracking this derivative want as an upstream
for want_id in impacted_want_ids {
let want = self.wants.remove(&want_id).expect(&format!(
"BUG: Want {} must exist when processing derivative want",
want_id
));
let transitioned = match want {
Want::Building(building) => {
// First dep miss for this want: Building → UpstreamBuilding
tracing::info!(
want_id = %want_id,
derivative_want_id = %derivative_want_id,
"Want: Building → UpstreamBuilding (first missing dep detected)"
);
Want::UpstreamBuilding(
building.detect_missing_deps(vec![derivative_want_id.to_string()]),
)
}
Want::UpstreamBuilding(upstream) => {
// Additional dep miss: UpstreamBuilding → UpstreamBuilding (add another upstream)
// This can happen if multiple jobs report dep misses for different upstreams
tracing::info!(
want_id = %want_id,
derivative_want_id = %derivative_want_id,
"Want: UpstreamBuilding → UpstreamBuilding (additional upstream added)"
);
Want::UpstreamBuilding(
upstream.add_upstreams(vec![derivative_want_id.to_string()]),
)
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} when processing derivative want. Should be Building or UpstreamBuilding.",
want_id, want
);
}
};
self.wants.insert(want_id, transitioned);
}
}
}
}
/// Create partitions in Building state
/// Used when a job run starts building partitions.
/// Note: Partitions no longer have a Missing state - they start directly as Building.
fn transition_partitions_to_building(
&mut self,
partition_refs: &[BuildingPartitionRef],
job_run_id: &str,
) {
for building_ref in partition_refs {
if let Some(partition) = self.get_canonical_partition(&building_ref.0.r#ref).cloned() {
// Partition already exists - this is an error unless we're retrying from UpForRetry
match partition {
Partition::UpForRetry(_) => {
// Valid: UpForRetry -> Building (retry after deps satisfied)
// Old partition stays in partitions_by_uuid as historical record
// Create new Building partition with fresh UUID
let uuid =
self.create_partition_building(job_run_id, building_ref.0.clone());
tracing::info!(
partition = %building_ref.0.r#ref,
job_run_id = %job_run_id,
uuid = %uuid,
"Partition: UpForRetry → Building (retry)"
);
}
_ => {
panic!(
"BUG: Invalid state - partition {} cannot start building from state {:?}",
building_ref.0.r#ref, partition
)
}
}
} else {
// Partition doesn't exist yet - create directly in Building state
let uuid = self.create_partition_building(job_run_id, building_ref.0.clone());
tracing::info!(
partition = %building_ref.0.r#ref,
job_run_id = %job_run_id,
uuid = %uuid,
"Partition: (new) → Building"
);
}
}
}
/// Transition partitions from Building to Live state
/// Used when a job run successfully completes
fn transition_partitions_to_live(
&mut self,
partition_refs: &[LivePartitionRef],
job_run_id: &str,
timestamp: u64,
) {
for pref in partition_refs {
let partition = self
.take_canonical_partition(&pref.0.r#ref)
.expect(&format!(
"BUG: Partition {} must exist and be in Building state before completion",
pref.0.r#ref
));
// ONLY valid transition: Building -> Live
let transitioned = match partition {
Partition::Building(building) => {
tracing::info!(
partition = %pref.0.r#ref,
job_run_id = %job_run_id,
"Partition: Building → Live"
);
Partition::Live(building.complete(timestamp))
}
// All other states are invalid
_ => {
panic!(
"BUG: Invalid state - partition {} must be Building to transition to Live, found {:?}",
pref.0.r#ref, partition
)
}
};
self.update_partition(transitioned);
}
}
/// Transition partitions from Building to Failed state
/// Used when a job run fails
fn transition_partitions_to_failed(
&mut self,
partition_refs: &[FailedPartitionRef],
job_run_id: &str,
timestamp: u64,
) {
for pref in partition_refs {
let partition = self
.take_canonical_partition(&pref.0.r#ref)
.expect(&format!(
"BUG: Partition {} must exist and be in Building state before failure",
pref.0.r#ref
));
// ONLY valid transition: Building -> Failed
let transitioned = match partition {
Partition::Building(building) => {
tracing::info!(
partition = %pref.0.r#ref,
job_run_id = %job_run_id,
"Partition: Building → Failed"
);
Partition::Failed(building.fail(timestamp))
}
// All other states are invalid
_ => {
panic!(
"BUG: Invalid state - partition {} must be Building to transition to Failed, found {:?}",
pref.0.r#ref, partition
)
}
};
self.update_partition(transitioned);
}
}
/// Transition partitions from Building to UpstreamBuilding state
/// Used when a job run encounters missing dependencies and cannot proceed.
/// The partition waits for its upstream deps to be built before becoming UpForRetry.
fn transition_partitions_to_upstream_building(
&mut self,
partition_refs: &[BuildingPartitionRef],
missing_deps: Vec<PartitionRef>,
) {
for building_ref in partition_refs {
let partition = self
.take_canonical_partition(&building_ref.0.r#ref)
.expect(&format!(
"BUG: Partition {} must exist and be in Building state during dep_miss",
building_ref.0.r#ref
));
// Only valid transition: Building -> UpstreamBuilding
let transitioned = match partition {
Partition::Building(building) => {
let partition_uuid = building.uuid;
tracing::info!(
partition = %building_ref.0.r#ref,
uuid = %partition_uuid,
missing_deps = ?missing_deps.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
"Partition: Building → UpstreamBuilding (dep miss)"
);
// Update downstream_waiting index: for each missing dep, record that this partition is waiting
for missing_dep in &missing_deps {
self.downstream_waiting
.entry(missing_dep.r#ref.clone())
.or_default()
.push(partition_uuid);
}
Partition::UpstreamBuilding(building.dep_miss(missing_deps.clone()))
}
// All other states are invalid
_ => {
panic!(
"BUG: Invalid state - partition {} must be Building during dep_miss, found {:?}",
building_ref.0.r#ref, partition
)
}
};
self.update_partition(transitioned);
}
}
/// Transition partitions from UpstreamBuilding to UpForRetry when their upstream deps become Live.
/// This should be called when partitions become Live to check if any downstream partitions can now retry.
/// Uses the `downstream_waiting` index for O(1) lookup of affected partitions.
fn unblock_downstream_partitions(&mut self, newly_live_partition_refs: &[LivePartitionRef]) {
// Collect UUIDs of partitions that might be unblocked using the inverted index
let mut uuids_to_check: Vec<Uuid> = Vec::new();
for live_ref in newly_live_partition_refs {
if let Some(waiting_uuids) = self.downstream_waiting.get(&live_ref.0.r#ref) {
uuids_to_check.extend(waiting_uuids.iter().cloned());
}
}
// Deduplicate UUIDs (a partition might be waiting for multiple deps that all became live)
uuids_to_check.sort();
uuids_to_check.dedup();
for uuid in uuids_to_check {
// Get partition by UUID - it might have been transitioned already or no longer exist
let Some(partition) = self.partitions_by_uuid.get(&uuid).cloned() else {
continue;
};
let partition_ref = partition.partition_ref().r#ref.clone();
// Only process UpstreamBuilding partitions
if let Partition::UpstreamBuilding(mut upstream_building) = partition {
// Remove satisfied deps from missing_deps
for live_ref in newly_live_partition_refs {
upstream_building
.state
.missing_deps
.retain(|d| d.r#ref != live_ref.0.r#ref);
// Also remove from downstream_waiting index
if let Some(waiting) = self.downstream_waiting.get_mut(&live_ref.0.r#ref) {
waiting.retain(|u| *u != uuid);
}
}
let transitioned = if upstream_building.state.missing_deps.is_empty() {
// All deps satisfied, transition to UpForRetry
tracing::info!(
partition = %partition_ref,
uuid = %uuid,
"Partition: UpstreamBuilding → UpForRetry (all upstreams satisfied)"
);
Partition::UpForRetry(upstream_building.upstreams_satisfied())
} else {
// Still waiting for more deps
tracing::debug!(
partition = %partition_ref,
uuid = %uuid,
remaining_deps = ?upstream_building.state.missing_deps.iter().map(|d| &d.r#ref).collect::<Vec<_>>(),
"Partition remains in UpstreamBuilding (still waiting for deps)"
);
Partition::UpstreamBuilding(upstream_building)
};
self.update_partition(transitioned);
}
}
}
/// Cascade failures to downstream partitions when their upstream dependencies fail.
/// Transitions UpstreamBuilding → UpstreamFailed for partitions waiting on failed upstreams.
/// Uses the `downstream_waiting` index for O(1) lookup of affected partitions.
fn cascade_failures_to_downstream_partitions(
&mut self,
failed_partition_refs: &[FailedPartitionRef],
) {
// Collect UUIDs of partitions that are waiting for the failed partitions
let mut uuids_to_fail: Vec<Uuid> = Vec::new();
for failed_ref in failed_partition_refs {
if let Some(waiting_uuids) = self.downstream_waiting.get(&failed_ref.0.r#ref) {
uuids_to_fail.extend(waiting_uuids.iter().cloned());
}
}
// Deduplicate UUIDs
uuids_to_fail.sort();
uuids_to_fail.dedup();
for uuid in uuids_to_fail {
// Get partition by UUID
let Some(partition) = self.partitions_by_uuid.get(&uuid).cloned() else {
continue;
};
let partition_ref = partition.partition_ref().r#ref.clone();
// Only process UpstreamBuilding partitions
if let Partition::UpstreamBuilding(upstream_building) = partition {
// Collect which upstream refs failed
let failed_upstream_refs: Vec<PartitionRef> = failed_partition_refs
.iter()
.filter(|f| {
upstream_building
.state
.missing_deps
.iter()
.any(|d| d.r#ref == f.0.r#ref)
})
.map(|f| f.0.clone())
.collect();
if !failed_upstream_refs.is_empty() {
tracing::info!(
partition = %partition_ref,
uuid = %uuid,
failed_upstreams = ?failed_upstream_refs.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
"Partition: UpstreamBuilding → UpstreamFailed (upstream failed)"
);
// Remove from downstream_waiting index for all deps
for dep in &upstream_building.state.missing_deps {
if let Some(waiting) = self.downstream_waiting.get_mut(&dep.r#ref) {
waiting.retain(|u| *u != uuid);
}
}
// Transition to UpstreamFailed
let transitioned = Partition::UpstreamFailed(
upstream_building
.upstream_failed(failed_upstream_refs, current_timestamp()),
);
self.update_partition(transitioned);
}
}
}
}
/// Complete wants when all their partitions become Live
/// Transitions Building → Successful, returns list of newly successful want IDs
fn complete_successful_wants(
&mut self,
newly_live_partitions: &[LivePartitionRef],
job_run_id: &str,
timestamp: u64,
) -> Vec<SuccessfulWantId> {
let mut newly_successful_wants: Vec<SuccessfulWantId> = Vec::new();
for pref in newly_live_partitions {
let want_ids: Vec<String> = self.get_wants_for_partition(&pref.0.r#ref).to_vec();
for want_id in want_ids {
let want = self.wants.remove(&want_id).expect(&format!(
"BUG: Want {} must exist when referenced by partition",
want_id
));
let transitioned = match want {
Want::Building(building) => {
// Check if ALL partitions for this want are now Live
let all_partitions_live = building.want.partitions.iter().all(|p| {
self.get_canonical_partition(&p.r#ref)
.map(|partition| partition.is_live())
.unwrap_or(false)
});
if all_partitions_live {
let successful_want =
building.complete(job_run_id.to_string(), timestamp);
tracing::info!(
want_id = %want_id,
job_run_id = %job_run_id,
"Want: Building → Successful"
);
newly_successful_wants.push(successful_want.get_id());
Want::Successful(successful_want)
} else {
Want::Building(building) // Still building other partitions
}
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} when partition {} became Live. Should be Building.",
want_id, want, pref.0.r#ref
);
}
};
self.wants.insert(want_id.clone(), transitioned);
}
}
newly_successful_wants
}
/// Fail wants when their partitions fail
/// Transitions Building → Failed, and adds to already-failed wants
/// Returns list of newly failed want IDs for downstream cascade
fn fail_directly_affected_wants(
&mut self,
failed_partitions: &[FailedPartitionRef],
) -> Vec<FailedWantId> {
let mut newly_failed_wants: Vec<FailedWantId> = Vec::new();
for pref in failed_partitions {
let want_ids: Vec<String> = self.get_wants_for_partition(&pref.0.r#ref).to_vec();
for want_id in want_ids {
let want = self.wants.remove(&want_id).expect(&format!(
"BUG: Want {} must exist when referenced by partition",
want_id
));
let transitioned = match want {
Want::Building(building) => {
let failed = building
.fail(vec![pref.0.clone()], "Partition build failed".to_string());
newly_failed_wants.push(failed.get_id());
Want::Failed(failed)
}
// Failed → Failed: add new failed partition to existing failed state
Want::Failed(failed) => {
Want::Failed(failed.add_failed_partitions(vec![pref.clone()]))
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} when partition {} failed. Should be Building or Failed.",
want_id, want, pref.0.r#ref
);
}
};
self.wants.insert(want_id.clone(), transitioned);
}
}
newly_failed_wants
}
/// Unblock downstream wants when their upstream dependencies succeed
/// Transitions UpstreamBuilding → Idle (when ready) or Building (when partitions already building)
fn unblock_downstream_wants(
&mut self,
newly_successful_wants: &[SuccessfulWantId],
job_run_id: &str,
timestamp: u64,
) {
tracing::debug!(
newly_successful_wants = ?newly_successful_wants
.iter()
.map(|w| &w.0)
.collect::<Vec<_>>(),
"Checking downstream wants for unblocking"
);
// Find downstream wants that are waiting for any of the newly successful wants
// TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants
let downstream_wants_to_check: Vec<String> = self
.wants
.iter()
.filter_map(|(id, want)| {
match want {
Want::UpstreamBuilding(downstream_want) => {
// Is this downstream want waiting for any of the newly successful wants?
let is_affected =
downstream_want.state.upstream_want_ids.iter().any(|up_id| {
newly_successful_wants.iter().any(|swid| &swid.0 == up_id)
});
if is_affected { Some(id.clone()) } else { None }
}
_ => None,
}
})
.collect();
tracing::debug!(
downstream_wants_to_check = ?downstream_wants_to_check,
"Found downstream wants affected by upstream completion"
);
for want_id in downstream_wants_to_check {
let want = self
.wants
.remove(&want_id)
.expect(&format!("BUG: Want {} must exist", want_id));
let transitioned = match want {
Want::UpstreamBuilding(downstream_want) => {
tracing::debug!(
want_id = %want_id,
upstreams = ?downstream_want.state.upstream_want_ids,
"Checking if all upstreams are satisfied"
);
// Check if ALL of this downstream want's upstream dependencies are now Successful
let all_upstreams_successful = downstream_want
.state
.upstream_want_ids
.iter()
.all(|up_want_id| {
self.wants
.get(up_want_id)
.map(|w| matches!(w, Want::Successful(_)))
.unwrap_or(false)
});
tracing::debug!(
want_id = %want_id,
all_upstreams_successful = %all_upstreams_successful,
"Upstream satisfaction check complete"
);
if all_upstreams_successful {
// Check if any of this want's partitions are still being built
// If a job dep-missed, its partitions transitioned back to Missing
// But other jobs might still be building other partitions for this want
let any_partition_building =
downstream_want.want.partitions.iter().any(|p| {
self.get_canonical_partition(&p.r#ref)
.map(|partition| matches!(partition, Partition::Building(_)))
.unwrap_or(false)
});
tracing::debug!(
want_id = %want_id,
any_partition_building = %any_partition_building,
"Partition building status check"
);
if any_partition_building {
// Some partitions still being built, continue in Building state
tracing::info!(
want_id = %want_id,
job_run_id = %job_run_id,
"Want: UpstreamBuilding → Building (upstreams satisfied, partitions building)"
);
Want::Building(
downstream_want
.continue_building(job_run_id.to_string(), timestamp),
)
} else {
// No partitions being built, become schedulable again
tracing::info!(
want_id = %want_id,
"Want: UpstreamBuilding → Idle (upstreams satisfied, ready to schedule)"
);
Want::Idle(downstream_want.upstreams_satisfied())
}
} else {
// Upstreams not all satisfied yet, stay in UpstreamBuilding
tracing::debug!(
want_id = %want_id,
"Want remains in UpstreamBuilding state (upstreams not yet satisfied)"
);
Want::UpstreamBuilding(downstream_want)
}
}
_ => {
panic!("BUG: Want {} should be UpstreamBuilding here", want_id);
}
};
self.wants.insert(want_id, transitioned);
}
}
/// Cascade failures to downstream wants when their upstream dependencies fail
/// Transitions UpstreamBuilding → UpstreamFailed
fn cascade_failures_to_downstream_wants(
&mut self,
newly_failed_wants: &[FailedWantId],
timestamp: u64,
) {
// Find downstream wants that are waiting for any of the newly failed wants
// TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants
let downstream_wants_to_fail: Vec<String> = self
.wants
.iter()
.filter_map(|(id, want)| {
match want {
Want::UpstreamBuilding(downstream_want) => {
// Is this downstream want waiting for any of the newly failed wants?
let is_affected =
downstream_want.state.upstream_want_ids.iter().any(|up_id| {
newly_failed_wants.iter().any(|fwid| &fwid.0 == up_id)
});
if is_affected { Some(id.clone()) } else { None }
}
_ => None,
}
})
.collect();
for want_id in downstream_wants_to_fail {
let want = self
.wants
.remove(&want_id)
.expect(&format!("BUG: Want {} must exist", want_id));
let transitioned = match want {
Want::UpstreamBuilding(downstream_want) => Want::UpstreamFailed(
downstream_want.upstream_failed(
newly_failed_wants
.iter()
.map(|fwid| fwid.0.clone())
.collect(),
timestamp,
),
),
_ => {
panic!("BUG: Want {} should be UpstreamBuilding here", want_id);
}
};
self.wants.insert(want_id, transitioned);
}
}
/// Handles reacting to events, updating state, and erroring if its an invalid state transition
/// Event handlers can return vecs of events that will then be appended to the BEL
pub fn handle_event(&mut self, event: &Event) -> Vec<Event> {
match event {
// JobRun events
Event::JobRunBufferV1(e) => self.handle_job_run_buffer(e),
Event::JobRunHeartbeatV1(e) => self.handle_job_run_heartbeat(e),
Event::JobRunFailureV1(e) => self.handle_job_run_failure(e),
Event::JobRunCancelV1(e) => self.handle_job_run_cancel(e),
Event::JobRunSuccessV1(e) => self.handle_job_run_success(e),
Event::JobRunMissingDepsV1(e) => self.handle_job_run_dep_miss(e),
// Want events
Event::WantCreateV1(e) => self.handle_want_create(e),
Event::WantCancelV1(e) => self.handle_want_cancel(e),
// Taint events
Event::TaintCreateV1(e) => self.handle_taint_create(e),
Event::TaintCancelV1(e) => self.handle_taint_delete(e),
// Ruh roh!
_ => panic!("Unhandled event type! {:?}", event),
}
}
fn handle_want_create(&mut self, event: &WantCreateEventV1) -> Vec<Event> {
// Create want in New state from event
let want_new: WantWithState<WantNewState> = event.clone().into();
// Log creation with derivative vs user-created distinction
let is_derivative = if let Some(source) = &event.source {
if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source {
tracing::info!(
want_id = %event.want_id,
partitions = ?event.partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
source_job_run_id = %job_triggered.job_run_id,
"Want created (derivative - auto-created due to missing dependency)"
);
true
} else {
false
}
} else {
tracing::info!(
want_id = %event.want_id,
partitions = ?event.partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
"Want created (user-requested)"
);
false
};
// Register this want with all its partitions (via inverted index)
self.register_want_for_partitions(&event.want_id, &event.partitions);
// Sense canonical partition states and determine initial want state
// Priority order: Failed > UpstreamFailed > AllLive > Building > UpstreamBuilding > UpForRetry > Idle
let mut failed_partitions: Vec<PartitionRef> = Vec::new();
let mut upstream_failed_wants: Vec<String> = Vec::new();
let mut all_live = true;
let mut any_building = false;
let mut any_upstream_building = false;
let mut any_up_for_retry = false;
let mut building_started_at: Option<u64> = None;
for pref in &event.partitions {
match self.get_canonical_partition(&pref.r#ref) {
Some(partition) => match partition {
Partition::Failed(_f) => {
failed_partitions.push(pref.clone());
all_live = false;
}
Partition::UpstreamFailed(uf) => {
// Track which upstream refs failed
for failed_ref in &uf.state.failed_upstream_refs {
upstream_failed_wants.push(failed_ref.r#ref.clone());
}
all_live = false;
}
Partition::Live(_) => {
// Contributes to all_live check
}
Partition::Building(_b) => {
any_building = true;
all_live = false;
// Track when building started (use earliest if multiple)
if building_started_at.is_none() {
building_started_at = Some(current_timestamp());
}
}
Partition::UpstreamBuilding(_) => {
any_upstream_building = true;
all_live = false;
}
Partition::UpForRetry(_) => {
any_up_for_retry = true;
all_live = false;
}
Partition::Tainted(_) => {
// Tainted partitions need rebuild
all_live = false;
}
},
None => {
// Partition doesn't exist - needs to be built
all_live = false;
}
}
}
// Transition from New to appropriate state based on sensing
let final_want: Want = if !failed_partitions.is_empty() {
tracing::info!(
want_id = %event.want_id,
failed_partitions = ?failed_partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
"Want: New → Failed (partition already failed)"
);
Want::Failed(
want_new.to_failed(failed_partitions, "Partition already failed".to_string()),
)
} else if !upstream_failed_wants.is_empty() {
tracing::info!(
want_id = %event.want_id,
upstream_failed = ?upstream_failed_wants,
"Want: New → UpstreamFailed (upstream already failed)"
);
Want::UpstreamFailed(want_new.to_upstream_failed(upstream_failed_wants))
} else if all_live && !event.partitions.is_empty() {
tracing::info!(
want_id = %event.want_id,
"Want: New → Successful (all partitions already live)"
);
Want::Successful(want_new.to_successful(current_timestamp()))
} else if any_building {
tracing::info!(
want_id = %event.want_id,
"Want: New → Building (partitions being built)"
);
Want::Building(
want_new.to_building(building_started_at.unwrap_or_else(current_timestamp)),
)
} else if any_upstream_building {
// For upstream building, we need the upstream want IDs
// For now, transition to Idle and let derivative want handling take care of it
tracing::info!(
want_id = %event.want_id,
"Want: New → Idle (upstream building - will be updated by derivative want handling)"
);
Want::Idle(want_new.to_idle())
} else {
// Partitions don't exist, or are UpForRetry - want is schedulable
tracing::info!(
want_id = %event.want_id,
"Want: New → Idle (ready to schedule)"
);
Want::Idle(want_new.to_idle())
};
self.wants.insert(event.want_id.clone(), final_want);
// If this is a derivative want (triggered by a job's dep miss), transition impacted wants to UpstreamBuilding
if is_derivative {
if let Some(source) = &event.source {
if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source {
self.handle_derivative_want_creation(
&event.want_id,
&event.partitions,
&job_triggered.job_run_id,
);
}
}
}
vec![]
}
fn handle_want_cancel(&mut self, event: &WantCancelEventV1) -> Vec<Event> {
// TODO actually cancel in-progress job runs that no longer have a sponsoring want
// Type-safe transition (API layer should prevent canceling terminal wants)
let want = self.wants.remove(&event.want_id).expect(&format!(
"BUG: Want {} must exist when cancel event received",
event.want_id
));
let canceled = match want {
Want::New(new_want) => {
Want::Canceled(new_want.cancel(event.source.clone(), event.comment.clone()))
}
Want::Idle(idle) => {
Want::Canceled(idle.cancel(event.source.clone(), event.comment.clone()))
}
Want::Building(building) => Want::Canceled(building.cancel(
event.source.clone(),
current_timestamp(),
event.comment.clone(),
)),
Want::UpstreamBuilding(upstream) => Want::Canceled(upstream.cancel(
event.source.clone(),
current_timestamp(),
event.comment.clone(),
)),
// Terminal states: panic because API should have prevented this
Want::Successful(_) | Want::Failed(_) | Want::UpstreamFailed(_) | Want::Canceled(_) => {
panic!(
"BUG: Received WantCancelEvent for want {} in terminal state {:?}. API layer should prevent this.",
event.want_id, want
);
}
};
self.wants.insert(event.want_id.clone(), canceled);
vec![]
}
fn handle_job_run_buffer(&mut self, event: &JobRunBufferEventV1) -> Vec<Event> {
// No job run should exist - if it does, that's a BUG in the orchestrator
if self.job_runs.get(&event.job_run_id).is_some() {
panic!(
"BUG: Job run ID collision on job run ID {}. Orchestrator should generate unique IDs.",
event.job_run_id
);
}
// Create job run in Queued state
let queued: JobRunWithState<JobQueuedState> = event.clone().into();
// Transition wants to Building
// Valid states when job buffer event arrives:
// - Idle: First job starting for this want (normal case)
// - Building: Another job already started for this want (multiple jobs can service same want)
// Invalid states (panic - indicates orchestrator bug):
// - UpstreamBuilding: Not schedulable, waiting for dependencies
// - Successful/Failed/UpstreamFailed/Canceled: Terminal states, not schedulable
for wap in &queued.info.servicing_wants {
let want = self.wants.remove(&wap.want_id).expect(&format!(
"BUG: Want {} must exist when job buffer event received",
wap.want_id
));
let transitioned = match want {
Want::New(new_want) => {
// Want was just created and hasn't fully sensed yet - transition to Building
// This can happen if want creation and job buffer happen in quick succession
tracing::info!(
want_id = %wap.want_id,
job_run_id = %event.job_run_id,
"Want: New → Building (job scheduled before sensing completed)"
);
Want::Building(new_want.to_building(current_timestamp()))
}
Want::Idle(idle) => {
// First job starting for this want
tracing::info!(
want_id = %wap.want_id,
job_run_id = %event.job_run_id,
"Want: Idle → Building (job scheduled)"
);
Want::Building(idle.start_building(current_timestamp()))
}
Want::Building(building) => {
// Another job already started, stay in Building (no-op)
Want::Building(building)
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} for job buffer. Only New, Idle or Building wants should be scheduled.",
wap.want_id, want
);
}
};
self.wants.insert(wap.want_id.clone(), transitioned);
}
// Get building partition refs from queued job - job is source of truth for building partitions
let building_refs: Vec<BuildingPartitionRef> = queued
.info
.building_partitions
.iter()
.map(|p| BuildingPartitionRef(p.clone()))
.collect();
// Transition partitions to Building state
self.transition_partitions_to_building(&building_refs, &event.job_run_id);
self.job_runs
.insert(event.job_run_id.clone(), JobRun::Queued(queued));
vec![]
}
fn handle_job_run_heartbeat(&mut self, event: &JobRunHeartbeatEventV1) -> Vec<Event> {
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
"BUG: Job run {} must exist when heartbeat received",
event.job_run_id
));
let running = match job_run {
// First heartbeat: Queued -> Running
JobRun::Queued(queued) => {
tracing::info!(
job_run_id = %event.job_run_id,
"JobRun: Queued → Running"
);
queued.start_running(current_timestamp())
}
// Subsequent heartbeat: update timestamp
JobRun::Running(running) => running.heartbeat(current_timestamp()),
_ => {
panic!(
"BUG: Heartbeat received for job run {} in invalid state {:?}",
event.job_run_id, job_run
);
}
};
self.job_runs
.insert(event.job_run_id.clone(), JobRun::Running(running));
vec![]
}
fn handle_job_run_success(&mut self, event: &JobRunSuccessEventV1) -> Vec<Event> {
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
"BUG: Job run {} must exist when success event received",
event.job_run_id
));
let succeeded = match job_run {
JobRun::Running(running) => {
tracing::info!(
job_run_id = %event.job_run_id,
"JobRun: Running → Succeeded"
);
running.succeed(current_timestamp())
}
_ => {
panic!(
"BUG: Success event received for job run {} in invalid state {:?}. Job must be Running to succeed.",
event.job_run_id, job_run
);
}
};
// Job run success is SOURCE of truth that partitions are live
let newly_live_partitions = succeeded.get_completed_partitions();
// Update partitions being built by this job (strict type-safe transitions)
self.transition_partitions_to_live(
&newly_live_partitions,
&event.job_run_id,
current_timestamp(),
);
// UpstreamBuilding → UpForRetry (for downstream partitions waiting on newly live partitions)
self.unblock_downstream_partitions(&newly_live_partitions);
// Building → Successful (when all partitions Live)
let newly_successful_wants: Vec<SuccessfulWantId> = self.complete_successful_wants(
&newly_live_partitions,
&event.job_run_id,
current_timestamp(),
);
// UpstreamBuilding → Idle/Building (for downstream wants waiting on newly successful wants)
self.unblock_downstream_wants(
&newly_successful_wants,
&event.job_run_id,
current_timestamp(),
);
self.job_runs
.insert(event.job_run_id.clone(), JobRun::Succeeded(succeeded));
vec![]
}
fn handle_job_run_failure(&mut self, event: &JobRunFailureEventV1) -> Vec<Event> {
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
"BUG: Job run {} must exist when failure event received",
event.job_run_id
));
let failed = match job_run {
JobRun::Running(running) => {
tracing::info!(
job_run_id = %event.job_run_id,
reason = %event.reason,
"JobRun: Running → Failed"
);
running.fail(current_timestamp(), event.reason.clone())
}
_ => {
panic!(
"BUG: Failure event received for job run {} in invalid state {:?}. Job must be Running to fail.",
event.job_run_id, job_run
);
}
};
// Job run failure is SOURCE of truth that partitions failed
let failed_partitions = failed.get_failed_partitions();
// Transition partitions using strict type-safe methods
self.transition_partitions_to_failed(
&failed_partitions,
&event.job_run_id,
current_timestamp(),
);
// UpstreamBuilding → UpstreamFailed (for downstream partitions waiting on failed upstreams)
self.cascade_failures_to_downstream_partitions(&failed_partitions);
// Building → Failed (for wants directly building failed partitions)
let newly_failed_wants: Vec<FailedWantId> =
self.fail_directly_affected_wants(&failed_partitions);
// UpstreamBuilding → UpstreamFailed (for downstream wants waiting on newly failed wants)
self.cascade_failures_to_downstream_wants(&newly_failed_wants, current_timestamp());
self.job_runs
.insert(event.job_run_id.clone(), JobRun::Failed(failed));
vec![]
}
fn handle_job_run_cancel(&mut self, event: &JobRunCancelEventV1) -> Vec<Event> {
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
"BUG: Job run {} must exist when cancel event received",
event.job_run_id
));
let canceled = match job_run {
JobRun::Queued(queued) => queued.cancel(
current_timestamp(),
event.source.clone(),
event.comment.clone().unwrap_or_default(),
),
JobRun::Running(running) => running.cancel(
current_timestamp(),
event.source.clone(),
event.comment.clone().unwrap_or_default(),
),
_ => {
panic!(
"BUG: Cancel event received for job run {} in invalid state {:?}",
event.job_run_id, job_run
);
}
};
// Canceled job means building partitions should be removed (they never completed).
// In the new model without Missing state, partitions are only created when jobs
// start building them, and removed if the job is canceled before completion.
let building_refs_to_reset = canceled.get_building_partitions_to_reset();
for building_ref in &building_refs_to_reset {
// Remove from UUID map and canonical map
if let Some(uuid) = self.canonical_partitions.remove(&building_ref.0.r#ref) {
self.partitions_by_uuid.remove(&uuid);
}
tracing::info!(
partition = %building_ref.0.r#ref,
"Partition removed (job canceled)"
);
}
self.job_runs
.insert(event.job_run_id.clone(), JobRun::Canceled(canceled));
vec![]
}
pub fn handle_job_run_dep_miss(&mut self, event: &JobRunMissingDepsEventV1) -> Vec<Event> {
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
"BUG: Job run {} must exist when dep miss event received",
event.job_run_id
));
let dep_miss = match job_run {
JobRun::Running(running) => {
tracing::info!(
job_run_id = %event.job_run_id,
missing_deps = ?event.missing_deps.iter()
.flat_map(|md| md.missing.iter().map(|p| &p.r#ref))
.collect::<Vec<_>>(),
"JobRun: Running → DepMiss (missing dependencies detected)"
);
running.dep_miss(
current_timestamp(),
event.missing_deps.clone(),
event.read_deps.clone(),
)
}
_ => {
panic!(
"BUG: DepMiss event received for job run {} in invalid state {:?}. Job must be Running to hit dep miss.",
event.job_run_id, job_run
);
}
};
// Infer data/SLA timestamps from servicing wants
let want_timestamps: WantTimestamps = dep_miss
.info
.servicing_wants
.iter()
.flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into()))
.reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b))
.expect("BUG: No servicing wants found");
// Collect all missing deps into a flat list of partition refs
let all_missing_deps: Vec<PartitionRef> = event
.missing_deps
.iter()
.flat_map(|md| md.missing.clone())
.collect();
// Transition partitions from Building to UpstreamBuilding since this job can't build them yet
let building_refs_to_reset = dep_miss.get_building_partitions_to_reset();
self.transition_partitions_to_upstream_building(&building_refs_to_reset, all_missing_deps);
// Generate WantCreateV1 events for the missing dependencies
// These events will be returned and appended to the BEL by BuildEventLog.append_event()
let want_events = missing_deps_to_want_events(
dep_miss.get_missing_deps().to_vec(),
&event.job_run_id,
want_timestamps,
);
// Store the job run in DepMiss state so we can access the missing_deps later
// When the derivative WantCreateV1 events get processed by handle_want_create(),
// they will look up this job run and use handle_derivative_want_creation() to
// transition impacted wants to UpstreamBuilding with the correct want IDs.
//
// KEY: We do NOT transition wants here because the want_events have randomly generated UUIDs
// that won't match during replay. Instead, we transition wants when processing the actual
// WantCreateV1 events that get written to and read from the BEL.
self.job_runs
.insert(event.job_run_id.clone(), JobRun::DepMiss(dep_miss));
// Return derivative want events to be appended to the BEL
want_events
}
fn handle_taint_create(&mut self, event: &TaintCreateEventV1) -> Vec<Event> {
// Store the taint detail
let taint_detail = TaintDetail {
taint_id: event.taint_id.clone(),
root_taint_id: event.root_taint_id.clone(),
parent_taint_id: event.parent_taint_id.clone(),
partitions: event.partitions.clone(),
source: event.source.clone(),
comment: event.comment.clone(),
};
self.taints.insert(event.taint_id.clone(), taint_detail);
// Transition affected partitions to Tainted state
for pref in &event.partitions {
if let Some(partition) = self.take_canonical_partition(&pref.r#ref) {
let transitioned = match partition {
Partition::Live(live) => {
tracing::info!(
partition = %pref.r#ref,
taint_id = %event.taint_id,
"Partition: Live → Tainted"
);
Partition::Tainted(live.taint(event.taint_id.clone(), current_timestamp()))
}
Partition::Tainted(tainted) => {
// Add additional taint to already-tainted partition
tracing::info!(
partition = %pref.r#ref,
taint_id = %event.taint_id,
"Partition: Tainted → Tainted (adding taint)"
);
Partition::Tainted(tainted.add_taint(event.taint_id.clone()))
}
other => {
// For non-Live/Tainted partitions (Building, UpstreamBuilding, etc.),
// we can't taint them - log a warning and skip
tracing::warn!(
partition = %pref.r#ref,
taint_id = %event.taint_id,
state = ?other,
"Cannot taint partition in non-Live state, skipping"
);
other
}
};
self.update_partition(transitioned);
} else {
// Partition doesn't exist yet - this is fine, taint will apply when it's built
tracing::debug!(
partition = %pref.r#ref,
taint_id = %event.taint_id,
"Taint targeting non-existent partition, will apply when built"
);
}
}
vec![]
}
fn handle_taint_delete(&mut self, event: &TaintCancelEventV1) -> Vec<Event> {
// Remove the taint from our tracking
if let Some(taint) = self.taints.remove(&event.taint_id) {
tracing::info!(
taint_id = %event.taint_id,
partitions = ?taint.partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
"Taint canceled/deleted"
);
// Note: We do NOT automatically un-taint partitions when a taint is canceled.
// Once tainted, partitions remain tainted until they are rebuilt.
// The taint_ids on the partition are historical records of why it was tainted.
} else {
tracing::warn!(
taint_id = %event.taint_id,
"Attempted to cancel non-existent taint"
);
}
vec![]
}
fn with_wants(self, wants: BTreeMap<String, Want>) -> Self {
Self { wants, ..self }
}
#[cfg(test)]
fn with_partitions(self, old_partitions: BTreeMap<String, PartitionDetail>) -> Self {
use crate::partition_state::PartitionWithState;
let mut canonical_partitions: BTreeMap<String, Uuid> = BTreeMap::new();
let mut partitions_by_uuid: BTreeMap<Uuid, Partition> = BTreeMap::new();
// Convert PartitionDetail to Live partitions for testing
for (key, detail) in old_partitions {
let partition_ref = detail.r#ref.clone().unwrap_or_default();
// Create a deterministic UUID for test data
let uuid =
crate::partition_state::derive_partition_uuid("test_job_run", &partition_ref.r#ref);
let live_partition = Partition::Live(PartitionWithState {
uuid,
partition_ref,
state: crate::partition_state::LiveState {
built_at: 0,
built_by: "test_job_run".to_string(),
},
});
canonical_partitions.insert(key, uuid);
partitions_by_uuid.insert(uuid, live_partition);
}
Self {
canonical_partitions,
partitions_by_uuid,
..self
}
}
pub fn get_want(&self, want_id: &str) -> Option<WantDetail> {
self.wants.get(want_id).map(|w| w.to_detail())
}
pub fn get_taint(&self, taint_id: &str) -> Option<TaintDetail> {
self.taints.get(taint_id).cloned()
}
pub fn get_partition(&self, partition_id: &str) -> Option<PartitionDetail> {
self.get_canonical_partition(partition_id)
.map(|p| p.to_detail())
}
pub fn get_job_run(&self, job_run_id: &str) -> Option<JobRunDetail> {
self.job_runs.get(job_run_id).map(|jr| jr.to_detail())
}
pub fn list_wants(&self, request: &ListWantsRequest) -> ListWantsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
let start = page * page_size;
// Paginate first, then convert only the needed wants to WantDetail
let data: Vec<WantDetail> = self
.wants
.values()
.skip(start as usize)
.take(page_size as usize)
.map(|w| w.to_detail())
.collect();
ListWantsResponse {
data,
match_count: self.wants.len() as u64,
page,
page_size,
}
}
pub fn list_taints(&self, request: &ListTaintsRequest) -> ListTaintsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
ListTaintsResponse {
data: list_state_items(&self.taints, page, page_size),
match_count: self.wants.len() as u64,
page,
page_size,
}
}
pub fn list_partitions(&self, request: &ListPartitionsRequest) -> ListPartitionsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
// Convert canonical partitions to PartitionDetail for API
let partition_details: BTreeMap<String, PartitionDetail> = self
.canonical_partitions
.iter()
.filter_map(|(k, uuid)| {
self.partitions_by_uuid
.get(uuid)
.map(|p| (k.clone(), p.to_detail()))
})
.collect();
ListPartitionsResponse {
data: list_state_items(&partition_details, page, page_size),
match_count: self.canonical_partitions.len() as u64,
page,
page_size,
}
}
pub fn list_job_runs(&self, request: &ListJobRunsRequest) -> ListJobRunsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
let start = page * page_size;
let data: Vec<JobRunDetail> = self
.job_runs
.values()
.skip(start as usize)
.take(page_size as usize)
.map(|jr| jr.to_detail())
.collect();
ListJobRunsResponse {
data,
match_count: self.job_runs.len() as u64,
page,
page_size,
}
}
/**
Wants are schedulable when their upstreams are ready and target partitions are not already building
*/
pub fn want_schedulability(&self, want: &WantDetail) -> WantSchedulability {
// Check upstream partition statuses (dependencies)
let mut live: Vec<LivePartitionRef> = Vec::new();
let mut tainted: Vec<TaintedPartitionRef> = Vec::new();
let mut not_ready: Vec<PartitionRef> = Vec::new(); // Partitions that don't exist or aren't Live
for upstream_ref in &want.upstreams {
match self.get_canonical_partition(&upstream_ref.r#ref) {
Some(partition) => {
match partition {
Partition::Live(p) => live.push(p.get_ref()),
Partition::Tainted(p) => tainted.push(p.get_ref()),
// All other states (Building, UpstreamBuilding, UpForRetry, Failed, UpstreamFailed) mean upstream is not ready
_ => not_ready.push(upstream_ref.clone()),
}
}
None => {
// Partition doesn't exist yet - it's not ready
not_ready.push(upstream_ref.clone());
}
}
}
// Check target partition statuses (what this want is trying to build)
// If any target partition is already Building, this want should wait
let mut building: Vec<BuildingPartitionRef> = Vec::new();
for target_ref in &want.partitions {
if let Some(partition) = self.get_canonical_partition(&target_ref.r#ref) {
if let Partition::Building(p) = partition {
building.push(p.get_ref());
}
}
}
WantSchedulability {
want: want.clone(),
status: WantUpstreamStatus {
live,
tainted,
not_ready,
building,
},
}
}
pub fn wants_schedulability(&self) -> WantsSchedulability {
WantsSchedulability(
self.wants
.values()
// Use type-safe is_schedulable() - only Idle wants are schedulable
.filter(|w| w.is_schedulable())
.map(|w| self.want_schedulability(&w.to_detail()))
.collect(),
)
}
}
/// The status of partitions required by a want to build (sensed from dep miss job run)
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WantUpstreamStatus {
pub live: Vec<LivePartitionRef>,
pub tainted: Vec<TaintedPartitionRef>,
/// Upstream partitions that are not ready (don't exist, or are in Building/UpstreamBuilding/UpForRetry/Failed/UpstreamFailed states)
pub not_ready: Vec<PartitionRef>,
/// Target partitions that are currently being built by another job
pub building: Vec<BuildingPartitionRef>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WantSchedulability {
pub want: WantDetail,
pub status: WantUpstreamStatus,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WantsSchedulability(pub Vec<WantSchedulability>);
impl WantsSchedulability {
pub fn schedulable_wants(self) -> Vec<WantDetail> {
self.0
.iter()
.filter_map(|ws| match ws.is_schedulable() {
false => None,
true => Some(ws.want.clone()),
})
.collect()
}
}
impl WantSchedulability {
pub fn is_schedulable(&self) -> bool {
// Want is schedulable if:
// - No not-ready upstream dependencies (must all be Live or Tainted)
// - No tainted upstream dependencies
// - No target partitions currently being built by another job
self.status.not_ready.is_empty()
&& self.status.tainted.is_empty()
&& self.status.building.is_empty()
}
}
fn list_state_items<T: Clone>(map: &BTreeMap<String, T>, page: u64, page_size: u64) -> Vec<T> {
// TODO when we add filtering, can we add it generically via some trait or filter object that can be provided?
let start = page * page_size;
let end = start + page_size;
map.values()
.skip(start as usize)
.take(end as usize)
.cloned()
.collect()
}
mod consts {
pub const DEFAULT_PAGE_SIZE: u64 = 100;
}
#[cfg(test)]
mod tests {
mod schedulable_wants {
use crate::build_state::BuildState;
use crate::want_state::{IdleState as WantIdleState, Want, WantInfo, WantWithState};
use crate::{PartitionDetail, PartitionRef, PartitionStatus, WantDetail, WantStatus};
use std::collections::BTreeMap;
impl WantDetail {
fn with_partitions(self, partitions: Vec<PartitionRef>) -> Self {
Self { partitions, ..self }
}
fn with_upstreams(self, upstreams: Vec<PartitionRef>) -> Self {
Self { upstreams, ..self }
}
fn with_status(self, status: Option<WantStatus>) -> Self {
Self { status, ..self }
}
}
impl PartitionDetail {
fn with_status(self, status: Option<PartitionStatus>) -> Self {
Self { status, ..self }
}
fn with_ref(self, r#ref: Option<PartitionRef>) -> Self {
Self { r#ref, ..self }
}
}
#[test]
fn test_empty_wants_noop() {
assert_eq!(BuildState::default().wants_schedulability().0.len(), 0);
}
// A want with satisfied upstreams (incl "none") should be schedulable
#[test]
fn test_simple_want_with_live_upstream_is_schedulable() {
// Given...
let test_partition = "test_partition";
let state = BuildState::default()
.with_wants(BTreeMap::from([(
"foo".to_string(),
Want::Idle(WantWithState {
want: WantInfo {
partitions: vec![test_partition.into()],
..Default::default()
},
state: WantIdleState {},
}),
)]))
.with_partitions(BTreeMap::from([(
test_partition.to_string(),
PartitionDetail::default().with_ref(Some(test_partition.into())),
)]));
// Should...
let schedulability = state.wants_schedulability();
let ws = schedulability.0.first().unwrap();
assert!(ws.is_schedulable());
}
}
mod sqlite_build_state {
mod want {
use crate::build_state::BuildState;
use crate::data_build_event::Event;
use crate::{MissingDeps, WantCancelEventV1, WantCreateEventV1, WantDetail};
#[test]
fn test_should_create_want() {
let mut e = WantCreateEventV1::default();
e.want_id = "1234".to_string();
e.partitions = vec!["mypart".into()];
let mut state = BuildState::default();
state.handle_event(&e.clone().into());
let want = state.get_want("1234").unwrap();
let mut expected: WantDetail = e.into();
// Into will set this field as current timestamp
expected.last_updated_timestamp = want.last_updated_timestamp;
assert_eq!(want, expected);
}
#[test]
fn test_should_cancel_want() {
let mut e = WantCreateEventV1::default();
e.want_id = "1234".to_string();
e.partitions = vec!["mypart".into()];
let mut state = BuildState::default();
state.handle_event(&e.clone().into());
// Should be able to cancel
let mut e = WantCancelEventV1::default();
e.want_id = "1234".to_string();
state.handle_event(&e.clone().into());
let want = state.get_want("1234").unwrap();
assert_eq!(
want.status,
Some(crate::WantStatusCode::WantCanceled.into())
);
}
#[test]
fn test_multihop_dependency_replay() {
use crate::data_build_event::Event;
use crate::{
JobRunBufferEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1,
JobRunSuccessEventV1, MissingDeps, PartitionRef, WantAttributedPartitions,
WantCreateEventV1,
};
let mut state = BuildState::default();
let mut events = vec![];
// 1. Create want for data/beta
let beta_want_id = "beta-want".to_string();
let mut create_beta = WantCreateEventV1::default();
create_beta.want_id = beta_want_id.clone();
create_beta.partitions = vec![PartitionRef {
r#ref: "data/beta".to_string(),
}];
events.push(Event::WantCreateV1(create_beta));
// 2. Queue beta job (first attempt)
let beta_job_1_id = "beta-job-1".to_string();
let mut buffer_beta_1 = JobRunBufferEventV1::default();
buffer_beta_1.job_run_id = beta_job_1_id.clone();
buffer_beta_1.job_label = "//job_beta".to_string();
buffer_beta_1.want_attributed_partitions = vec![WantAttributedPartitions {
want_id: beta_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
}];
buffer_beta_1.building_partitions = vec![PartitionRef {
r#ref: "data/beta".to_string(),
}];
events.push(Event::JobRunBufferV1(buffer_beta_1));
// 3. Beta job starts running
let mut heartbeat_beta_1 = JobRunHeartbeatEventV1::default();
heartbeat_beta_1.job_run_id = beta_job_1_id.clone();
events.push(Event::JobRunHeartbeatV1(heartbeat_beta_1));
// 4. Beta job reports missing dependency on data/alpha
let mut dep_miss_beta_1 = JobRunMissingDepsEventV1::default();
dep_miss_beta_1.job_run_id = beta_job_1_id.clone();
dep_miss_beta_1.missing_deps = vec![MissingDeps {
impacted: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
missing: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
}];
events.push(Event::JobRunMissingDepsV1(dep_miss_beta_1));
// 5. Create derivative want for data/alpha
let alpha_want_id = "alpha-want".to_string();
let mut create_alpha = WantCreateEventV1::default();
create_alpha.want_id = alpha_want_id.clone();
create_alpha.partitions = vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}];
events.push(Event::WantCreateV1(create_alpha));
// 6. Queue alpha job
let alpha_job_id = "alpha-job".to_string();
let mut buffer_alpha = JobRunBufferEventV1::default();
buffer_alpha.job_run_id = alpha_job_id.clone();
buffer_alpha.job_label = "//job_alpha".to_string();
buffer_alpha.want_attributed_partitions = vec![WantAttributedPartitions {
want_id: alpha_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
}];
buffer_alpha.building_partitions = vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}];
events.push(Event::JobRunBufferV1(buffer_alpha));
// 7. Alpha job starts running
let mut heartbeat_alpha = JobRunHeartbeatEventV1::default();
heartbeat_alpha.job_run_id = alpha_job_id.clone();
events.push(Event::JobRunHeartbeatV1(heartbeat_alpha));
// 8. Alpha job succeeds
let mut success_alpha = JobRunSuccessEventV1::default();
success_alpha.job_run_id = alpha_job_id.clone();
events.push(Event::JobRunSuccessV1(success_alpha));
// 9. Queue beta job again (second attempt) - THIS IS THE CRITICAL MOMENT
let beta_job_2_id = "beta-job-2".to_string();
let mut buffer_beta_2 = JobRunBufferEventV1::default();
buffer_beta_2.job_run_id = beta_job_2_id.clone();
buffer_beta_2.job_label = "//job_beta".to_string();
buffer_beta_2.want_attributed_partitions = vec![WantAttributedPartitions {
want_id: beta_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
}];
buffer_beta_2.building_partitions = vec![PartitionRef {
r#ref: "data/beta".to_string(),
}];
events.push(Event::JobRunBufferV1(buffer_beta_2));
// 10. Beta job starts running
let mut heartbeat_beta_2 = JobRunHeartbeatEventV1::default();
heartbeat_beta_2.job_run_id = beta_job_2_id.clone();
events.push(Event::JobRunHeartbeatV1(heartbeat_beta_2));
// 11. Beta job succeeds
let mut success_beta_2 = JobRunSuccessEventV1::default();
success_beta_2.job_run_id = beta_job_2_id.clone();
events.push(Event::JobRunSuccessV1(success_beta_2));
// Process all events - this simulates replay
for event in &events {
state.handle_event(event);
}
// Verify final state
let beta_want = state.get_want(&beta_want_id).unwrap();
assert_eq!(
beta_want.status,
Some(crate::WantStatusCode::WantSuccessful.into()),
"Beta want should be successful after multi-hop dependency resolution"
);
let alpha_want = state.get_want(&alpha_want_id).unwrap();
assert_eq!(
alpha_want.status,
Some(crate::WantStatusCode::WantSuccessful.into()),
"Alpha want should be successful"
);
}
}
}
}