1495 lines
62 KiB
Rust
1495 lines
62 KiB
Rust
use crate::data_build_event::Event;
|
|
use crate::data_deps::{WantTimestamps, missing_deps_to_want_events};
|
|
use crate::event_source::Source as EventSourceVariant;
|
|
use crate::job_run_state::{JobRun, JobRunWithState, QueuedState as JobQueuedState};
|
|
use crate::partition_state::{
|
|
BuildingPartitionRef, FailedPartitionRef, LivePartitionRef, MissingPartitionRef, MissingState,
|
|
Partition, PartitionWithState, TaintedPartitionRef,
|
|
};
|
|
use crate::util::current_timestamp;
|
|
use crate::want_state::{
|
|
FailedWantId, IdleState as WantIdleState, SuccessfulWantId, Want, WantWithState,
|
|
};
|
|
use crate::{
|
|
JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1,
|
|
JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1,
|
|
ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse,
|
|
ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail,
|
|
PartitionRef, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantCancelEventV1,
|
|
WantCreateEventV1, WantDetail,
|
|
};
|
|
use serde::{Deserialize, Serialize};
|
|
use std::collections::BTreeMap;
|
|
use tracing;
|
|
|
|
/**
|
|
Design Notes
|
|
|
|
The build state struct is the heart of the service and orchestrator, adapting build events to
|
|
higher level questions about build state. One temptation is to implement the build state as a set
|
|
of hierarchically defined reducers, to achieve information hiding and factor system capabilities and
|
|
state tracking simply. Unfortunately, to update state based on an event, you need a mutable borrow
|
|
of some part of the build state (that the reducer controls, for instance), and an immutable borrow
|
|
of the whole state for read/query purposes. The whole state needs to be available to handle state
|
|
updates like "this is the list of currently active job runs" in response to a job run event. Put
|
|
simply, this isn't possible without introducing some locking of the whole state and mutable state
|
|
subset, since they would conflict (the mutable subset would have already been borrowed, so can't
|
|
be borrowed immutably as part of the whole state borrow). You might also define a "query" phase
|
|
in which reducers query the state based on the received event, but that just increases complexity.
|
|
|
|
Instead, databuild opts for an entity-component system (ECS) that just provides the whole build
|
|
state mutably to all state update functionality, trusting that we know how to use it responsibly.
|
|
This means no boxing or "query phase", and means we can have all state updates happen as map lookups
|
|
and updates, which is exceptionally fast. The states of the different entities are managed by state
|
|
machines, in a pseudo-colored-petri-net style (only pseudo because we haven't formalized it). It is
|
|
critical that these state machines, their states, and their transitions are type-safe.
|
|
*/
|
|
|
|
/// Tracks all application state, defines valid state transitions, and manages cross-state machine
|
|
/// state transitions (e.g. job run success resulting in partition going from Building to Live)
|
|
#[derive(Debug, Clone, Default)]
|
|
pub struct BuildState {
|
|
wants: BTreeMap<String, Want>, // Type-safe want storage
|
|
taints: BTreeMap<String, TaintDetail>,
|
|
partitions: BTreeMap<String, Partition>, // Type-safe partition storage
|
|
job_runs: BTreeMap<String, JobRun>, // Type-safe job run storage
|
|
}
|
|
|
|
impl BuildState {
|
|
/// Reconstruct BuildState from a sequence of events (for read path in web server)
|
|
/// This allows the web server to rebuild state from BEL storage without holding a lock
|
|
pub fn from_events(events: &[crate::DataBuildEvent]) -> Self {
|
|
let mut state = BuildState::default();
|
|
for event in events {
|
|
if let Some(ref inner_event) = event.event {
|
|
// handle_event returns Vec<Event> for cascading events, but we ignore them
|
|
// since we're replaying from a complete event log
|
|
state.handle_event(inner_event);
|
|
}
|
|
}
|
|
state
|
|
}
|
|
|
|
pub fn count_job_runs(&self) -> usize {
|
|
self.job_runs.len()
|
|
}
|
|
|
|
/// Add want_id to partition's want_ids list
|
|
fn add_want_to_partition(&mut self, pref: &PartitionRef, want_id: &str) {
|
|
// Create partition if it doesn't exist
|
|
if !self.partitions.contains_key(&pref.r#ref) {
|
|
let partition = Partition::new_missing(pref.clone());
|
|
self.partitions.insert(pref.r#ref.clone(), partition);
|
|
}
|
|
|
|
// Add want_id
|
|
if let Some(partition) = self.partitions.get_mut(&pref.r#ref) {
|
|
let want_ids = partition.want_ids_mut();
|
|
if !want_ids.contains(&want_id.to_string()) {
|
|
want_ids.push(want_id.to_string());
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Handle creation of a derivative want (created due to job dep miss)
|
|
///
|
|
/// When a job reports missing dependencies, it returns WantCreateV1 events for those missing partitions.
|
|
/// Those events get appended to the BEL and eventually processed by handle_want_create().
|
|
///
|
|
/// This function is called when we detect a derivative want (has source.job_triggered) and transitions
|
|
/// the impacted wants to UpstreamBuilding state, tracking the derivative want ID as an upstream dependency.
|
|
///
|
|
/// KEY INSIGHT: We must use the actual want_id from the WantCreateV1 event, not synthetic UUIDs generated
|
|
/// during event processing. This ensures replay works correctly - the same want IDs are used both during
|
|
/// original execution and during replay from the BEL.
|
|
fn handle_derivative_want_creation(
|
|
&mut self,
|
|
derivative_want_id: &str,
|
|
derivative_want_partitions: &[PartitionRef],
|
|
source_job_run_id: &str,
|
|
) {
|
|
// Look up the job run that triggered this derivative want
|
|
// This job run must be in DepMiss state because it reported missing dependencies
|
|
let job_run = self.job_runs.get(source_job_run_id).expect(&format!(
|
|
"BUG: Job run {} must exist when derivative want created",
|
|
source_job_run_id
|
|
));
|
|
|
|
// Extract the missing deps from the DepMiss job run
|
|
let missing_deps = match job_run {
|
|
JobRun::DepMiss(dep_miss) => dep_miss.get_missing_deps(),
|
|
_ => {
|
|
panic!(
|
|
"BUG: Job run {} must be in DepMiss state when derivative want created, found {:?}",
|
|
source_job_run_id, job_run
|
|
);
|
|
}
|
|
};
|
|
|
|
// Find which MissingDeps entry corresponds to this derivative want
|
|
// The derivative want was created for a specific set of missing partitions,
|
|
// and we need to find which downstream partitions are impacted by those missing partitions
|
|
for md in missing_deps {
|
|
// Check if this derivative want's partitions match the missing partitions in this entry
|
|
// We need exact match because one dep miss event can create multiple derivative wants
|
|
let partitions_match = md.missing.iter().all(|missing_ref| {
|
|
derivative_want_partitions
|
|
.iter()
|
|
.any(|p| p.r#ref == missing_ref.r#ref)
|
|
}) && derivative_want_partitions.len() == md.missing.len();
|
|
|
|
if partitions_match {
|
|
// Now we know which partitions are impacted by this missing dependency
|
|
let impacted_partition_refs: Vec<String> =
|
|
md.impacted.iter().map(|p| p.r#ref.clone()).collect();
|
|
|
|
tracing::debug!(
|
|
derivative_want_id = %derivative_want_id,
|
|
source_job_run_id = %source_job_run_id,
|
|
missing_partitions = ?derivative_want_partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
|
|
impacted_partitions = ?impacted_partition_refs,
|
|
"Processing derivative want creation"
|
|
);
|
|
|
|
// Find all wants that include these impacted partitions
|
|
// These are the wants that need to wait for the derivative want to complete
|
|
let mut impacted_want_ids: std::collections::HashSet<String> =
|
|
std::collections::HashSet::new();
|
|
for partition_ref in &impacted_partition_refs {
|
|
if let Some(partition) = self.partitions.get(partition_ref) {
|
|
for want_id in partition.want_ids() {
|
|
impacted_want_ids.insert(want_id.clone());
|
|
}
|
|
}
|
|
}
|
|
|
|
// Transition each impacted want to UpstreamBuilding, tracking this derivative want as an upstream
|
|
for want_id in impacted_want_ids {
|
|
let want = self.wants.remove(&want_id).expect(&format!(
|
|
"BUG: Want {} must exist when processing derivative want",
|
|
want_id
|
|
));
|
|
|
|
let transitioned = match want {
|
|
Want::Building(building) => {
|
|
// First dep miss for this want: Building → UpstreamBuilding
|
|
tracing::info!(
|
|
want_id = %want_id,
|
|
derivative_want_id = %derivative_want_id,
|
|
"Want: Building → UpstreamBuilding (first missing dep detected)"
|
|
);
|
|
Want::UpstreamBuilding(
|
|
building.detect_missing_deps(vec![derivative_want_id.to_string()]),
|
|
)
|
|
}
|
|
Want::UpstreamBuilding(upstream) => {
|
|
// Additional dep miss: UpstreamBuilding → UpstreamBuilding (add another upstream)
|
|
// This can happen if multiple jobs report dep misses for different upstreams
|
|
tracing::info!(
|
|
want_id = %want_id,
|
|
derivative_want_id = %derivative_want_id,
|
|
"Want: UpstreamBuilding → UpstreamBuilding (additional upstream added)"
|
|
);
|
|
Want::UpstreamBuilding(
|
|
upstream.add_upstreams(vec![derivative_want_id.to_string()]),
|
|
)
|
|
}
|
|
_ => {
|
|
panic!(
|
|
"BUG: Want {} in invalid state {:?} when processing derivative want. Should be Building or UpstreamBuilding.",
|
|
want_id, want
|
|
);
|
|
}
|
|
};
|
|
|
|
self.wants.insert(want_id, transitioned);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Transition partitions from Missing to Building state
|
|
/// Used when a job run starts building partitions
|
|
fn transition_partitions_to_building(
|
|
&mut self,
|
|
partition_refs: &[BuildingPartitionRef],
|
|
job_run_id: &str,
|
|
) {
|
|
for building_ref in partition_refs {
|
|
if let Some(partition) = self.partitions.remove(&building_ref.0.r#ref) {
|
|
// Partition exists - transition based on current state
|
|
let transitioned = match partition {
|
|
// Valid: Missing -> Building
|
|
Partition::Missing(missing) => {
|
|
tracing::info!(
|
|
partition = %building_ref.0.r#ref,
|
|
job_run_id = %job_run_id,
|
|
"Partition: Missing → Building"
|
|
);
|
|
Partition::Building(missing.start_building(job_run_id.to_string()))
|
|
}
|
|
// Invalid state: partition should not already be Building, Live, Failed, or Tainted
|
|
_ => {
|
|
panic!(
|
|
"BUG: Invalid state - partition {} cannot start building from state {:?}",
|
|
building_ref.0.r#ref, partition
|
|
)
|
|
}
|
|
};
|
|
self.partitions
|
|
.insert(building_ref.0.r#ref.clone(), transitioned);
|
|
} else {
|
|
// Partition doesn't exist yet - create in Missing then transition to Building
|
|
let missing = Partition::new_missing(building_ref.0.clone());
|
|
if let Partition::Missing(m) = missing {
|
|
let building = m.start_building(job_run_id.to_string());
|
|
tracing::info!(
|
|
partition = %building_ref.0.r#ref,
|
|
job_run_id = %job_run_id,
|
|
"Partition: Missing → Building (created)"
|
|
);
|
|
self.partitions
|
|
.insert(building_ref.0.r#ref.clone(), Partition::Building(building));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Transition partitions from Building to Live state
|
|
/// Used when a job run successfully completes
|
|
fn transition_partitions_to_live(
|
|
&mut self,
|
|
partition_refs: &[LivePartitionRef],
|
|
job_run_id: &str,
|
|
timestamp: u64,
|
|
) {
|
|
for pref in partition_refs {
|
|
let partition = self.partitions.remove(&pref.0.r#ref).expect(&format!(
|
|
"BUG: Partition {} must exist and be in Building state before completion",
|
|
pref.0.r#ref
|
|
));
|
|
|
|
// ONLY valid transition: Building -> Live
|
|
let transitioned = match partition {
|
|
Partition::Building(building) => {
|
|
tracing::info!(
|
|
partition = %pref.0.r#ref,
|
|
job_run_id = %job_run_id,
|
|
"Partition: Building → Live"
|
|
);
|
|
Partition::Live(building.complete(job_run_id.to_string(), timestamp))
|
|
}
|
|
// All other states are invalid
|
|
_ => {
|
|
panic!(
|
|
"BUG: Invalid state - partition {} must be Building to transition to Live, found {:?}",
|
|
pref.0.r#ref, partition
|
|
)
|
|
}
|
|
};
|
|
self.partitions.insert(pref.0.r#ref.clone(), transitioned);
|
|
}
|
|
}
|
|
|
|
/// Transition partitions from Building to Failed state
|
|
/// Used when a job run fails
|
|
fn transition_partitions_to_failed(
|
|
&mut self,
|
|
partition_refs: &[FailedPartitionRef],
|
|
job_run_id: &str,
|
|
timestamp: u64,
|
|
) {
|
|
for pref in partition_refs {
|
|
let partition = self.partitions.remove(&pref.0.r#ref).expect(&format!(
|
|
"BUG: Partition {} must exist and be in Building state before failure",
|
|
pref.0.r#ref
|
|
));
|
|
|
|
// ONLY valid transition: Building -> Failed
|
|
let transitioned = match partition {
|
|
Partition::Building(building) => {
|
|
tracing::info!(
|
|
partition = %pref.0.r#ref,
|
|
job_run_id = %job_run_id,
|
|
"Partition: Building → Failed"
|
|
);
|
|
Partition::Failed(building.fail(job_run_id.to_string(), timestamp))
|
|
}
|
|
// All other states are invalid
|
|
_ => {
|
|
panic!(
|
|
"BUG: Invalid state - partition {} must be Building to transition to Failed, found {:?}",
|
|
pref.0.r#ref, partition
|
|
)
|
|
}
|
|
};
|
|
self.partitions.insert(pref.0.r#ref.clone(), transitioned);
|
|
}
|
|
}
|
|
|
|
/// Reset partitions from Building back to Missing state
|
|
/// Used when a job run encounters missing dependencies and cannot proceed
|
|
fn reset_partitions_to_missing(&mut self, partition_refs: &[BuildingPartitionRef]) {
|
|
for building_ref in partition_refs {
|
|
let partition = self
|
|
.partitions
|
|
.remove(&building_ref.0.r#ref)
|
|
.expect(&format!(
|
|
"BUG: Partition {} must exist and be in Building state during dep_miss",
|
|
building_ref.0.r#ref
|
|
));
|
|
|
|
// Only valid transition: Building -> Missing
|
|
let transitioned = match partition {
|
|
Partition::Building(building) => {
|
|
tracing::info!(
|
|
partition = %building_ref.0.r#ref,
|
|
"Partition: Building → Missing (dep miss)"
|
|
);
|
|
Partition::Missing(building.reset_to_missing())
|
|
}
|
|
// All other states are invalid
|
|
_ => {
|
|
panic!(
|
|
"BUG: Invalid state - partition {} must be Building during dep_miss, found {:?}",
|
|
building_ref.0.r#ref, partition
|
|
)
|
|
}
|
|
};
|
|
self.partitions
|
|
.insert(building_ref.0.r#ref.clone(), transitioned);
|
|
}
|
|
}
|
|
|
|
/// Complete wants when all their partitions become Live
|
|
/// Transitions Building → Successful, returns list of newly successful want IDs
|
|
fn complete_successful_wants(
|
|
&mut self,
|
|
newly_live_partitions: &[LivePartitionRef],
|
|
job_run_id: &str,
|
|
timestamp: u64,
|
|
) -> Vec<SuccessfulWantId> {
|
|
let mut newly_successful_wants: Vec<SuccessfulWantId> = Vec::new();
|
|
|
|
for pref in newly_live_partitions {
|
|
let want_ids = self
|
|
.partitions
|
|
.get(&pref.0.r#ref)
|
|
.map(|p| p.want_ids().clone())
|
|
.unwrap_or_default();
|
|
|
|
for want_id in want_ids {
|
|
let want = self.wants.remove(&want_id).expect(&format!(
|
|
"BUG: Want {} must exist when referenced by partition",
|
|
want_id
|
|
));
|
|
|
|
let transitioned = match want {
|
|
Want::Building(building) => {
|
|
// Check if ALL partitions for this want are now Live
|
|
let all_partitions_live = building.want.partitions.iter().all(|p| {
|
|
self.partitions
|
|
.get(&p.r#ref)
|
|
.map(|partition| partition.is_live())
|
|
.unwrap_or(false)
|
|
});
|
|
|
|
if all_partitions_live {
|
|
let successful_want =
|
|
building.complete(job_run_id.to_string(), timestamp);
|
|
tracing::info!(
|
|
want_id = %want_id,
|
|
job_run_id = %job_run_id,
|
|
"Want: Building → Successful"
|
|
);
|
|
newly_successful_wants.push(successful_want.get_id());
|
|
Want::Successful(successful_want)
|
|
} else {
|
|
Want::Building(building) // Still building other partitions
|
|
}
|
|
}
|
|
_ => {
|
|
panic!(
|
|
"BUG: Want {} in invalid state {:?} when partition {} became Live. Should be Building.",
|
|
want_id, want, pref.0.r#ref
|
|
);
|
|
}
|
|
};
|
|
|
|
self.wants.insert(want_id.clone(), transitioned);
|
|
}
|
|
}
|
|
|
|
newly_successful_wants
|
|
}
|
|
|
|
/// Fail wants when their partitions fail
|
|
/// Transitions Building → Failed, and adds to already-failed wants
|
|
/// Returns list of newly failed want IDs for downstream cascade
|
|
fn fail_directly_affected_wants(
|
|
&mut self,
|
|
failed_partitions: &[FailedPartitionRef],
|
|
) -> Vec<FailedWantId> {
|
|
let mut newly_failed_wants: Vec<FailedWantId> = Vec::new();
|
|
|
|
for pref in failed_partitions {
|
|
let want_ids = self
|
|
.partitions
|
|
.get(&pref.0.r#ref)
|
|
.map(|p| p.want_ids().clone())
|
|
.unwrap_or_default();
|
|
|
|
for want_id in want_ids {
|
|
let want = self.wants.remove(&want_id).expect(&format!(
|
|
"BUG: Want {} must exist when referenced by partition",
|
|
want_id
|
|
));
|
|
|
|
let transitioned = match want {
|
|
Want::Building(building) => {
|
|
let failed = building
|
|
.fail(vec![pref.0.clone()], "Partition build failed".to_string());
|
|
newly_failed_wants.push(failed.get_id());
|
|
Want::Failed(failed)
|
|
}
|
|
// Failed → Failed: add new failed partition to existing failed state
|
|
Want::Failed(failed) => {
|
|
Want::Failed(failed.add_failed_partitions(vec![pref.clone()]))
|
|
}
|
|
_ => {
|
|
panic!(
|
|
"BUG: Want {} in invalid state {:?} when partition {} failed. Should be Building or Failed.",
|
|
want_id, want, pref.0.r#ref
|
|
);
|
|
}
|
|
};
|
|
|
|
self.wants.insert(want_id.clone(), transitioned);
|
|
}
|
|
}
|
|
|
|
newly_failed_wants
|
|
}
|
|
|
|
/// Unblock downstream wants when their upstream dependencies succeed
|
|
/// Transitions UpstreamBuilding → Idle (when ready) or Building (when partitions already building)
|
|
fn unblock_downstream_wants(
|
|
&mut self,
|
|
newly_successful_wants: &[SuccessfulWantId],
|
|
job_run_id: &str,
|
|
timestamp: u64,
|
|
) {
|
|
tracing::debug!(
|
|
newly_successful_wants = ?newly_successful_wants
|
|
.iter()
|
|
.map(|w| &w.0)
|
|
.collect::<Vec<_>>(),
|
|
"Checking downstream wants for unblocking"
|
|
);
|
|
// Find downstream wants that are waiting for any of the newly successful wants
|
|
// TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants
|
|
let downstream_wants_to_check: Vec<String> = self
|
|
.wants
|
|
.iter()
|
|
.filter_map(|(id, want)| {
|
|
match want {
|
|
Want::UpstreamBuilding(downstream_want) => {
|
|
// Is this downstream want waiting for any of the newly successful wants?
|
|
let is_affected =
|
|
downstream_want.state.upstream_want_ids.iter().any(|up_id| {
|
|
newly_successful_wants.iter().any(|swid| &swid.0 == up_id)
|
|
});
|
|
if is_affected { Some(id.clone()) } else { None }
|
|
}
|
|
_ => None,
|
|
}
|
|
})
|
|
.collect();
|
|
tracing::debug!(
|
|
downstream_wants_to_check = ?downstream_wants_to_check,
|
|
"Found downstream wants affected by upstream completion"
|
|
);
|
|
|
|
for want_id in downstream_wants_to_check {
|
|
let want = self
|
|
.wants
|
|
.remove(&want_id)
|
|
.expect(&format!("BUG: Want {} must exist", want_id));
|
|
|
|
let transitioned = match want {
|
|
Want::UpstreamBuilding(downstream_want) => {
|
|
tracing::debug!(
|
|
want_id = %want_id,
|
|
upstreams = ?downstream_want.state.upstream_want_ids,
|
|
"Checking if all upstreams are satisfied"
|
|
);
|
|
// Check if ALL of this downstream want's upstream dependencies are now Successful
|
|
let all_upstreams_successful = downstream_want
|
|
.state
|
|
.upstream_want_ids
|
|
.iter()
|
|
.all(|up_want_id| {
|
|
self.wants
|
|
.get(up_want_id)
|
|
.map(|w| matches!(w, Want::Successful(_)))
|
|
.unwrap_or(false)
|
|
});
|
|
tracing::debug!(
|
|
want_id = %want_id,
|
|
all_upstreams_successful = %all_upstreams_successful,
|
|
"Upstream satisfaction check complete"
|
|
);
|
|
|
|
if all_upstreams_successful {
|
|
// Check if any of this want's partitions are still being built
|
|
// If a job dep-missed, its partitions transitioned back to Missing
|
|
// But other jobs might still be building other partitions for this want
|
|
let any_partition_building =
|
|
downstream_want.want.partitions.iter().any(|p| {
|
|
self.partitions
|
|
.get(&p.r#ref)
|
|
.map(|partition| matches!(partition, Partition::Building(_)))
|
|
.unwrap_or(false)
|
|
});
|
|
tracing::debug!(
|
|
want_id = %want_id,
|
|
any_partition_building = %any_partition_building,
|
|
"Partition building status check"
|
|
);
|
|
|
|
if any_partition_building {
|
|
// Some partitions still being built, continue in Building state
|
|
tracing::info!(
|
|
want_id = %want_id,
|
|
job_run_id = %job_run_id,
|
|
"Want: UpstreamBuilding → Building (upstreams satisfied, partitions building)"
|
|
);
|
|
Want::Building(
|
|
downstream_want
|
|
.continue_building(job_run_id.to_string(), timestamp),
|
|
)
|
|
} else {
|
|
// No partitions being built, become schedulable again
|
|
tracing::info!(
|
|
want_id = %want_id,
|
|
"Want: UpstreamBuilding → Idle (upstreams satisfied, ready to schedule)"
|
|
);
|
|
Want::Idle(downstream_want.upstreams_satisfied())
|
|
}
|
|
} else {
|
|
// Upstreams not all satisfied yet, stay in UpstreamBuilding
|
|
tracing::debug!(
|
|
want_id = %want_id,
|
|
"Want remains in UpstreamBuilding state (upstreams not yet satisfied)"
|
|
);
|
|
Want::UpstreamBuilding(downstream_want)
|
|
}
|
|
}
|
|
_ => {
|
|
panic!("BUG: Want {} should be UpstreamBuilding here", want_id);
|
|
}
|
|
};
|
|
|
|
self.wants.insert(want_id, transitioned);
|
|
}
|
|
}
|
|
|
|
/// Cascade failures to downstream wants when their upstream dependencies fail
|
|
/// Transitions UpstreamBuilding → UpstreamFailed
|
|
fn cascade_failures_to_downstream_wants(
|
|
&mut self,
|
|
newly_failed_wants: &[FailedWantId],
|
|
timestamp: u64,
|
|
) {
|
|
// Find downstream wants that are waiting for any of the newly failed wants
|
|
// TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants
|
|
let downstream_wants_to_fail: Vec<String> = self
|
|
.wants
|
|
.iter()
|
|
.filter_map(|(id, want)| {
|
|
match want {
|
|
Want::UpstreamBuilding(downstream_want) => {
|
|
// Is this downstream want waiting for any of the newly failed wants?
|
|
let is_affected =
|
|
downstream_want.state.upstream_want_ids.iter().any(|up_id| {
|
|
newly_failed_wants.iter().any(|fwid| &fwid.0 == up_id)
|
|
});
|
|
if is_affected { Some(id.clone()) } else { None }
|
|
}
|
|
_ => None,
|
|
}
|
|
})
|
|
.collect();
|
|
|
|
for want_id in downstream_wants_to_fail {
|
|
let want = self
|
|
.wants
|
|
.remove(&want_id)
|
|
.expect(&format!("BUG: Want {} must exist", want_id));
|
|
|
|
let transitioned = match want {
|
|
Want::UpstreamBuilding(downstream_want) => Want::UpstreamFailed(
|
|
downstream_want.upstream_failed(
|
|
newly_failed_wants
|
|
.iter()
|
|
.map(|fwid| fwid.0.clone())
|
|
.collect(),
|
|
timestamp,
|
|
),
|
|
),
|
|
_ => {
|
|
panic!("BUG: Want {} should be UpstreamBuilding here", want_id);
|
|
}
|
|
};
|
|
|
|
self.wants.insert(want_id, transitioned);
|
|
}
|
|
}
|
|
|
|
/// Handles reacting to events, updating state, and erroring if its an invalid state transition
|
|
/// Event handlers can return vecs of events that will then be appended to the BEL
|
|
pub fn handle_event(&mut self, event: &Event) -> Vec<Event> {
|
|
match event {
|
|
// JobRun events
|
|
Event::JobRunBufferV1(e) => self.handle_job_run_buffer(e),
|
|
Event::JobRunHeartbeatV1(e) => self.handle_job_run_heartbeat(e),
|
|
Event::JobRunFailureV1(e) => self.handle_job_run_failure(e),
|
|
Event::JobRunCancelV1(e) => self.handle_job_run_cancel(e),
|
|
Event::JobRunSuccessV1(e) => self.handle_job_run_success(e),
|
|
Event::JobRunMissingDepsV1(e) => self.handle_job_run_dep_miss(e),
|
|
// Want events
|
|
Event::WantCreateV1(e) => self.handle_want_create(e),
|
|
Event::WantCancelV1(e) => self.handle_want_cancel(e),
|
|
// Taint events
|
|
Event::TaintCreateV1(e) => self.handle_taint_create(e),
|
|
Event::TaintCancelV1(e) => self.handle_taint_delete(e),
|
|
// Ruh roh!
|
|
_ => panic!("Unhandled event type! {:?}", event),
|
|
}
|
|
}
|
|
|
|
fn handle_want_create(&mut self, event: &WantCreateEventV1) -> Vec<Event> {
|
|
// Use From impl to create want in Idle state
|
|
let want_idle: WantWithState<WantIdleState> = event.clone().into();
|
|
|
|
// Log creation with derivative vs user-created distinction
|
|
if let Some(source) = &event.source {
|
|
if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source {
|
|
tracing::info!(
|
|
want_id = %event.want_id,
|
|
partitions = ?event.partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
|
|
source_job_run_id = %job_triggered.job_run_id,
|
|
"Want created (derivative - auto-created due to missing dependency)"
|
|
);
|
|
}
|
|
} else {
|
|
tracing::info!(
|
|
want_id = %event.want_id,
|
|
partitions = ?event.partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
|
|
"Want created (user-requested)"
|
|
);
|
|
}
|
|
|
|
self.wants
|
|
.insert(event.want_id.clone(), Want::Idle(want_idle));
|
|
|
|
// Register this want with all its partitions
|
|
for pref in &event.partitions {
|
|
self.add_want_to_partition(pref, &event.want_id);
|
|
}
|
|
|
|
// If this is a derivative want (triggered by a job's dep miss), transition impacted wants to UpstreamBuilding
|
|
if let Some(source) = &event.source {
|
|
if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source {
|
|
self.handle_derivative_want_creation(
|
|
&event.want_id,
|
|
&event.partitions,
|
|
&job_triggered.job_run_id,
|
|
);
|
|
}
|
|
}
|
|
|
|
vec![]
|
|
}
|
|
|
|
fn handle_want_cancel(&mut self, event: &WantCancelEventV1) -> Vec<Event> {
|
|
// TODO actually cancel in-progress job runs that no longer have a sponsoring want
|
|
|
|
// Type-safe transition (API layer should prevent canceling terminal wants)
|
|
let want = self.wants.remove(&event.want_id).expect(&format!(
|
|
"BUG: Want {} must exist when cancel event received",
|
|
event.want_id
|
|
));
|
|
|
|
let canceled = match want {
|
|
Want::Idle(idle) => {
|
|
Want::Canceled(idle.cancel(event.source.clone(), event.comment.clone()))
|
|
}
|
|
Want::Building(building) => Want::Canceled(building.cancel(
|
|
event.source.clone(),
|
|
current_timestamp(),
|
|
event.comment.clone(),
|
|
)),
|
|
Want::UpstreamBuilding(upstream) => Want::Canceled(upstream.cancel(
|
|
event.source.clone(),
|
|
current_timestamp(),
|
|
event.comment.clone(),
|
|
)),
|
|
// Terminal states: panic because API should have prevented this
|
|
Want::Successful(_) | Want::Failed(_) | Want::UpstreamFailed(_) | Want::Canceled(_) => {
|
|
panic!(
|
|
"BUG: Received WantCancelEvent for want {} in terminal state {:?}. API layer should prevent this.",
|
|
event.want_id, want
|
|
);
|
|
}
|
|
};
|
|
self.wants.insert(event.want_id.clone(), canceled);
|
|
|
|
vec![]
|
|
}
|
|
|
|
fn handle_job_run_buffer(&mut self, event: &JobRunBufferEventV1) -> Vec<Event> {
|
|
// No job run should exist - if it does, that's a BUG in the orchestrator
|
|
if self.job_runs.get(&event.job_run_id).is_some() {
|
|
panic!(
|
|
"BUG: Job run ID collision on job run ID {}. Orchestrator should generate unique IDs.",
|
|
event.job_run_id
|
|
);
|
|
}
|
|
|
|
// Create job run in Queued state
|
|
let queued: JobRunWithState<JobQueuedState> = event.clone().into();
|
|
|
|
// Transition wants to Building
|
|
// Valid states when job buffer event arrives:
|
|
// - Idle: First job starting for this want (normal case)
|
|
// - Building: Another job already started for this want (multiple jobs can service same want)
|
|
// Invalid states (panic - indicates orchestrator bug):
|
|
// - UpstreamBuilding: Not schedulable, waiting for dependencies
|
|
// - Successful/Failed/UpstreamFailed/Canceled: Terminal states, not schedulable
|
|
for wap in &queued.info.servicing_wants {
|
|
let want = self.wants.remove(&wap.want_id).expect(&format!(
|
|
"BUG: Want {} must exist when job buffer event received",
|
|
wap.want_id
|
|
));
|
|
|
|
let transitioned = match want {
|
|
Want::Idle(idle) => {
|
|
// First job starting for this want
|
|
tracing::info!(
|
|
want_id = %wap.want_id,
|
|
job_run_id = %event.job_run_id,
|
|
"Want: Idle → Building (job scheduled)"
|
|
);
|
|
Want::Building(idle.start_building(current_timestamp()))
|
|
}
|
|
Want::Building(building) => {
|
|
// Another job already started, stay in Building (no-op)
|
|
Want::Building(building)
|
|
}
|
|
_ => {
|
|
panic!(
|
|
"BUG: Want {} in invalid state {:?} for job buffer. Only Idle or Building wants should be scheduled.",
|
|
wap.want_id, want
|
|
);
|
|
}
|
|
};
|
|
|
|
self.wants.insert(wap.want_id.clone(), transitioned);
|
|
}
|
|
|
|
// Get building partition refs from queued job - job is source of truth for building partitions
|
|
let building_refs: Vec<BuildingPartitionRef> = queued
|
|
.info
|
|
.building_partitions
|
|
.iter()
|
|
.map(|p| BuildingPartitionRef(p.clone()))
|
|
.collect();
|
|
|
|
// Transition partitions to Building state
|
|
self.transition_partitions_to_building(&building_refs, &event.job_run_id);
|
|
|
|
self.job_runs
|
|
.insert(event.job_run_id.clone(), JobRun::Queued(queued));
|
|
vec![]
|
|
}
|
|
|
|
fn handle_job_run_heartbeat(&mut self, event: &JobRunHeartbeatEventV1) -> Vec<Event> {
|
|
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
|
|
"BUG: Job run {} must exist when heartbeat received",
|
|
event.job_run_id
|
|
));
|
|
|
|
let running = match job_run {
|
|
// First heartbeat: Queued -> Running
|
|
JobRun::Queued(queued) => {
|
|
tracing::info!(
|
|
job_run_id = %event.job_run_id,
|
|
"JobRun: Queued → Running"
|
|
);
|
|
queued.start_running(current_timestamp())
|
|
}
|
|
// Subsequent heartbeat: update timestamp
|
|
JobRun::Running(running) => running.heartbeat(current_timestamp()),
|
|
_ => {
|
|
panic!(
|
|
"BUG: Heartbeat received for job run {} in invalid state {:?}",
|
|
event.job_run_id, job_run
|
|
);
|
|
}
|
|
};
|
|
|
|
self.job_runs
|
|
.insert(event.job_run_id.clone(), JobRun::Running(running));
|
|
vec![]
|
|
}
|
|
|
|
fn handle_job_run_success(&mut self, event: &JobRunSuccessEventV1) -> Vec<Event> {
|
|
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
|
|
"BUG: Job run {} must exist when success event received",
|
|
event.job_run_id
|
|
));
|
|
|
|
let succeeded = match job_run {
|
|
JobRun::Running(running) => {
|
|
tracing::info!(
|
|
job_run_id = %event.job_run_id,
|
|
"JobRun: Running → Succeeded"
|
|
);
|
|
running.succeed(current_timestamp())
|
|
}
|
|
_ => {
|
|
panic!(
|
|
"BUG: Success event received for job run {} in invalid state {:?}. Job must be Running to succeed.",
|
|
event.job_run_id, job_run
|
|
);
|
|
}
|
|
};
|
|
|
|
// Job run success is SOURCE of truth that partitions are live
|
|
let newly_live_partitions = succeeded.get_completed_partitions();
|
|
|
|
// Update partitions being built by this job (strict type-safe transitions)
|
|
self.transition_partitions_to_live(
|
|
&newly_live_partitions,
|
|
&event.job_run_id,
|
|
current_timestamp(),
|
|
);
|
|
|
|
// Building → Successful (when all partitions Live)
|
|
let newly_successful_wants: Vec<SuccessfulWantId> = self.complete_successful_wants(
|
|
&newly_live_partitions,
|
|
&event.job_run_id,
|
|
current_timestamp(),
|
|
);
|
|
|
|
// UpstreamBuilding → Idle/Building (for downstream wants waiting on newly successful wants)
|
|
self.unblock_downstream_wants(
|
|
&newly_successful_wants,
|
|
&event.job_run_id,
|
|
current_timestamp(),
|
|
);
|
|
|
|
self.job_runs
|
|
.insert(event.job_run_id.clone(), JobRun::Succeeded(succeeded));
|
|
vec![]
|
|
}
|
|
|
|
fn handle_job_run_failure(&mut self, event: &JobRunFailureEventV1) -> Vec<Event> {
|
|
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
|
|
"BUG: Job run {} must exist when failure event received",
|
|
event.job_run_id
|
|
));
|
|
|
|
let failed = match job_run {
|
|
JobRun::Running(running) => {
|
|
tracing::info!(
|
|
job_run_id = %event.job_run_id,
|
|
reason = %event.reason,
|
|
"JobRun: Running → Failed"
|
|
);
|
|
running.fail(current_timestamp(), event.reason.clone())
|
|
}
|
|
_ => {
|
|
panic!(
|
|
"BUG: Failure event received for job run {} in invalid state {:?}. Job must be Running to fail.",
|
|
event.job_run_id, job_run
|
|
);
|
|
}
|
|
};
|
|
|
|
// Job run failure is SOURCE of truth that partitions failed
|
|
let failed_partitions = failed.get_failed_partitions();
|
|
|
|
// Transition partitions using strict type-safe methods
|
|
self.transition_partitions_to_failed(
|
|
&failed_partitions,
|
|
&event.job_run_id,
|
|
current_timestamp(),
|
|
);
|
|
|
|
// Building → Failed (for wants directly building failed partitions)
|
|
let newly_failed_wants: Vec<FailedWantId> =
|
|
self.fail_directly_affected_wants(&failed_partitions);
|
|
|
|
// UpstreamBuilding → UpstreamFailed (for downstream wants waiting on newly failed wants)
|
|
self.cascade_failures_to_downstream_wants(&newly_failed_wants, current_timestamp());
|
|
|
|
self.job_runs
|
|
.insert(event.job_run_id.clone(), JobRun::Failed(failed));
|
|
vec![]
|
|
}
|
|
|
|
fn handle_job_run_cancel(&mut self, event: &JobRunCancelEventV1) -> Vec<Event> {
|
|
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
|
|
"BUG: Job run {} must exist when cancel event received",
|
|
event.job_run_id
|
|
));
|
|
|
|
let canceled = match job_run {
|
|
JobRun::Queued(queued) => queued.cancel(
|
|
current_timestamp(),
|
|
event.source.clone(),
|
|
event.comment.clone().unwrap_or_default(),
|
|
),
|
|
JobRun::Running(running) => running.cancel(
|
|
current_timestamp(),
|
|
event.source.clone(),
|
|
event.comment.clone().unwrap_or_default(),
|
|
),
|
|
_ => {
|
|
panic!(
|
|
"BUG: Cancel event received for job run {} in invalid state {:?}",
|
|
event.job_run_id, job_run
|
|
);
|
|
}
|
|
};
|
|
|
|
// Canceled job means building partitions should reset to Missing
|
|
let building_refs_to_reset = canceled.get_building_partitions_to_reset();
|
|
self.reset_partitions_to_missing(&building_refs_to_reset);
|
|
|
|
self.job_runs
|
|
.insert(event.job_run_id.clone(), JobRun::Canceled(canceled));
|
|
vec![]
|
|
}
|
|
|
|
pub fn handle_job_run_dep_miss(&mut self, event: &JobRunMissingDepsEventV1) -> Vec<Event> {
|
|
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
|
|
"BUG: Job run {} must exist when dep miss event received",
|
|
event.job_run_id
|
|
));
|
|
|
|
let dep_miss = match job_run {
|
|
JobRun::Running(running) => {
|
|
tracing::info!(
|
|
job_run_id = %event.job_run_id,
|
|
missing_deps = ?event.missing_deps.iter()
|
|
.flat_map(|md| md.missing.iter().map(|p| &p.r#ref))
|
|
.collect::<Vec<_>>(),
|
|
"JobRun: Running → DepMiss (missing dependencies detected)"
|
|
);
|
|
running.dep_miss(
|
|
current_timestamp(),
|
|
event.missing_deps.clone(),
|
|
event.read_deps.clone(),
|
|
)
|
|
}
|
|
_ => {
|
|
panic!(
|
|
"BUG: DepMiss event received for job run {} in invalid state {:?}. Job must be Running to hit dep miss.",
|
|
event.job_run_id, job_run
|
|
);
|
|
}
|
|
};
|
|
|
|
// Infer data/SLA timestamps from servicing wants
|
|
let want_timestamps: WantTimestamps = dep_miss
|
|
.info
|
|
.servicing_wants
|
|
.iter()
|
|
.flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into()))
|
|
.reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b))
|
|
.expect("BUG: No servicing wants found");
|
|
|
|
// Transition partitions back to Missing since this job can't build them yet
|
|
let building_refs_to_reset = dep_miss.get_building_partitions_to_reset();
|
|
self.reset_partitions_to_missing(&building_refs_to_reset);
|
|
|
|
// Generate WantCreateV1 events for the missing dependencies
|
|
// These events will be returned and appended to the BEL by BuildEventLog.append_event()
|
|
let want_events = missing_deps_to_want_events(
|
|
dep_miss.get_missing_deps().to_vec(),
|
|
&event.job_run_id,
|
|
want_timestamps,
|
|
);
|
|
|
|
// Store the job run in DepMiss state so we can access the missing_deps later
|
|
// When the derivative WantCreateV1 events get processed by handle_want_create(),
|
|
// they will look up this job run and use handle_derivative_want_creation() to
|
|
// transition impacted wants to UpstreamBuilding with the correct want IDs.
|
|
//
|
|
// KEY: We do NOT transition wants here because the want_events have randomly generated UUIDs
|
|
// that won't match during replay. Instead, we transition wants when processing the actual
|
|
// WantCreateV1 events that get written to and read from the BEL.
|
|
self.job_runs
|
|
.insert(event.job_run_id.clone(), JobRun::DepMiss(dep_miss));
|
|
|
|
// Return derivative want events to be appended to the BEL
|
|
want_events
|
|
}
|
|
|
|
fn handle_taint_create(&mut self, _event: &TaintCreateEventV1) -> Vec<Event> {
|
|
todo!("...?")
|
|
}
|
|
|
|
fn handle_taint_delete(&mut self, _event: &TaintCancelEventV1) -> Vec<Event> {
|
|
todo!("...?")
|
|
}
|
|
|
|
fn with_wants(self, wants: BTreeMap<String, Want>) -> Self {
|
|
Self { wants, ..self }
|
|
}
|
|
|
|
#[cfg(test)]
|
|
fn with_partitions(self, old_partitions: BTreeMap<String, PartitionDetail>) -> Self {
|
|
// Convert PartitionDetail to Partition (for backfill scenarios)
|
|
let partitions: BTreeMap<String, Partition> = old_partitions
|
|
.into_iter()
|
|
.map(|(key, detail)| {
|
|
// For now, just create in Missing state - real migration would be more sophisticated
|
|
let partition = Partition::new_missing(detail.r#ref.clone().unwrap_or_default());
|
|
(key, partition)
|
|
})
|
|
.collect();
|
|
Self { partitions, ..self }
|
|
}
|
|
|
|
pub fn get_want(&self, want_id: &str) -> Option<WantDetail> {
|
|
self.wants.get(want_id).map(|w| w.to_detail())
|
|
}
|
|
pub fn get_taint(&self, taint_id: &str) -> Option<TaintDetail> {
|
|
self.taints.get(taint_id).cloned()
|
|
}
|
|
pub fn get_partition(&self, partition_id: &str) -> Option<PartitionDetail> {
|
|
self.partitions.get(partition_id).map(|p| p.to_detail())
|
|
}
|
|
pub fn get_job_run(&self, job_run_id: &str) -> Option<JobRunDetail> {
|
|
self.job_runs.get(job_run_id).map(|jr| jr.to_detail())
|
|
}
|
|
|
|
pub fn list_wants(&self, request: &ListWantsRequest) -> ListWantsResponse {
|
|
let page = request.page.unwrap_or(0);
|
|
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
|
|
|
|
let start = page * page_size;
|
|
|
|
// Paginate first, then convert only the needed wants to WantDetail
|
|
let data: Vec<WantDetail> = self
|
|
.wants
|
|
.values()
|
|
.skip(start as usize)
|
|
.take(page_size as usize)
|
|
.map(|w| w.to_detail())
|
|
.collect();
|
|
|
|
ListWantsResponse {
|
|
data,
|
|
match_count: self.wants.len() as u64,
|
|
page,
|
|
page_size,
|
|
}
|
|
}
|
|
|
|
pub fn list_taints(&self, request: &ListTaintsRequest) -> ListTaintsResponse {
|
|
let page = request.page.unwrap_or(0);
|
|
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
|
|
ListTaintsResponse {
|
|
data: list_state_items(&self.taints, page, page_size),
|
|
match_count: self.wants.len() as u64,
|
|
page,
|
|
page_size,
|
|
}
|
|
}
|
|
|
|
pub fn list_partitions(&self, request: &ListPartitionsRequest) -> ListPartitionsResponse {
|
|
let page = request.page.unwrap_or(0);
|
|
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
|
|
// Convert Partition to PartitionDetail for API
|
|
let partition_details: BTreeMap<String, PartitionDetail> = self
|
|
.partitions
|
|
.iter()
|
|
.map(|(k, v)| (k.clone(), v.to_detail()))
|
|
.collect();
|
|
ListPartitionsResponse {
|
|
data: list_state_items(&partition_details, page, page_size),
|
|
match_count: self.wants.len() as u64,
|
|
page,
|
|
page_size,
|
|
}
|
|
}
|
|
|
|
pub fn list_job_runs(&self, request: &ListJobRunsRequest) -> ListJobRunsResponse {
|
|
let page = request.page.unwrap_or(0);
|
|
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
|
|
|
|
let start = page * page_size;
|
|
let data: Vec<JobRunDetail> = self
|
|
.job_runs
|
|
.values()
|
|
.skip(start as usize)
|
|
.take(page_size as usize)
|
|
.map(|jr| jr.to_detail())
|
|
.collect();
|
|
|
|
ListJobRunsResponse {
|
|
data,
|
|
match_count: self.job_runs.len() as u64,
|
|
page,
|
|
page_size,
|
|
}
|
|
}
|
|
|
|
/**
|
|
Wants are schedulable when their partition is live and not tainted
|
|
*/
|
|
pub fn want_schedulability(&self, want: &WantDetail) -> WantSchedulability {
|
|
// Use type-safe partition checks from partitions
|
|
let mut live: Vec<LivePartitionRef> = Vec::new();
|
|
let mut tainted: Vec<TaintedPartitionRef> = Vec::new();
|
|
let mut missing: Vec<MissingPartitionRef> = Vec::new();
|
|
|
|
for upstream_ref in &want.upstreams {
|
|
match self.partitions.get(&upstream_ref.r#ref) {
|
|
Some(partition) => {
|
|
match partition {
|
|
Partition::Live(p) => live.push(p.get_ref()),
|
|
Partition::Tainted(p) => tainted.push(p.get_ref()),
|
|
Partition::Missing(p) => missing.push(p.get_ref()),
|
|
_ => (), // Other states (Missing, Building, Failed) don't add to any list
|
|
}
|
|
}
|
|
None => {
|
|
// TODO this definitely feels dirty, but we can't take a mutable ref of self to
|
|
// insert the missing partition here, and it feels a little over the top to
|
|
// create a more elaborate way to mint a missing ref.
|
|
missing.push(MissingPartitionRef(upstream_ref.clone()));
|
|
}
|
|
}
|
|
}
|
|
|
|
WantSchedulability {
|
|
want: want.clone(),
|
|
status: WantUpstreamStatus {
|
|
live,
|
|
tainted,
|
|
missing,
|
|
},
|
|
}
|
|
}
|
|
|
|
pub fn wants_schedulability(&self) -> WantsSchedulability {
|
|
WantsSchedulability(
|
|
self.wants
|
|
.values()
|
|
// Use type-safe is_schedulable() - only Idle wants are schedulable
|
|
.filter(|w| w.is_schedulable())
|
|
.map(|w| self.want_schedulability(&w.to_detail()))
|
|
.collect(),
|
|
)
|
|
}
|
|
}
|
|
|
|
/// The status of partitions required by a want to build (sensed from dep miss job run)
|
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
|
pub struct WantUpstreamStatus {
|
|
pub live: Vec<LivePartitionRef>,
|
|
pub tainted: Vec<TaintedPartitionRef>,
|
|
pub missing: Vec<MissingPartitionRef>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
|
pub struct WantSchedulability {
|
|
pub want: WantDetail,
|
|
pub status: WantUpstreamStatus,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
|
pub struct WantsSchedulability(pub Vec<WantSchedulability>);
|
|
|
|
impl WantsSchedulability {
|
|
pub fn schedulable_wants(self) -> Vec<WantDetail> {
|
|
self.0
|
|
.iter()
|
|
.filter_map(|ws| match ws.is_schedulable() {
|
|
false => None,
|
|
true => Some(ws.want.clone()),
|
|
})
|
|
.collect()
|
|
}
|
|
}
|
|
|
|
impl WantSchedulability {
|
|
pub fn is_schedulable(&self) -> bool {
|
|
self.status.missing.is_empty() && self.status.tainted.is_empty()
|
|
}
|
|
}
|
|
|
|
fn list_state_items<T: Clone>(map: &BTreeMap<String, T>, page: u64, page_size: u64) -> Vec<T> {
|
|
// TODO when we add filtering, can we add it generically via some trait or filter object that can be provided?
|
|
let start = page * page_size;
|
|
let end = start + page_size;
|
|
map.values()
|
|
.skip(start as usize)
|
|
.take(end as usize)
|
|
.cloned()
|
|
.collect()
|
|
}
|
|
|
|
mod consts {
|
|
pub const DEFAULT_PAGE_SIZE: u64 = 100;
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
mod schedulable_wants {
|
|
use crate::build_state::BuildState;
|
|
use crate::want_state::{IdleState as WantIdleState, Want, WantInfo, WantWithState};
|
|
use crate::{PartitionDetail, PartitionRef, PartitionStatus, WantDetail, WantStatus};
|
|
use std::collections::BTreeMap;
|
|
|
|
impl WantDetail {
|
|
fn with_partitions(self, partitions: Vec<PartitionRef>) -> Self {
|
|
Self { partitions, ..self }
|
|
}
|
|
fn with_upstreams(self, upstreams: Vec<PartitionRef>) -> Self {
|
|
Self { upstreams, ..self }
|
|
}
|
|
fn with_status(self, status: Option<WantStatus>) -> Self {
|
|
Self { status, ..self }
|
|
}
|
|
}
|
|
|
|
impl PartitionDetail {
|
|
fn with_status(self, status: Option<PartitionStatus>) -> Self {
|
|
Self { status, ..self }
|
|
}
|
|
fn with_ref(self, r#ref: Option<PartitionRef>) -> Self {
|
|
Self { r#ref, ..self }
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_empty_wants_noop() {
|
|
assert_eq!(BuildState::default().wants_schedulability().0.len(), 0);
|
|
}
|
|
|
|
// A want with satisfied upstreams (incl "none") should be schedulable
|
|
#[test]
|
|
fn test_simple_want_with_live_upstream_is_schedulable() {
|
|
// Given...
|
|
let test_partition = "test_partition";
|
|
let state = BuildState::default()
|
|
.with_wants(BTreeMap::from([(
|
|
"foo".to_string(),
|
|
Want::Idle(WantWithState {
|
|
want: WantInfo {
|
|
partitions: vec![test_partition.into()],
|
|
..Default::default()
|
|
},
|
|
state: WantIdleState {},
|
|
}),
|
|
)]))
|
|
.with_partitions(BTreeMap::from([(
|
|
test_partition.to_string(),
|
|
PartitionDetail::default().with_ref(Some(test_partition.into())),
|
|
)]));
|
|
|
|
// Should...
|
|
let schedulability = state.wants_schedulability();
|
|
let ws = schedulability.0.first().unwrap();
|
|
assert!(ws.is_schedulable());
|
|
}
|
|
}
|
|
|
|
mod sqlite_build_state {
|
|
mod want {
|
|
use crate::build_state::BuildState;
|
|
use crate::data_build_event::Event;
|
|
use crate::{MissingDeps, WantCancelEventV1, WantCreateEventV1, WantDetail};
|
|
|
|
#[test]
|
|
fn test_should_create_want() {
|
|
let mut e = WantCreateEventV1::default();
|
|
e.want_id = "1234".to_string();
|
|
e.partitions = vec!["mypart".into()];
|
|
|
|
let mut state = BuildState::default();
|
|
state.handle_event(&e.clone().into());
|
|
let want = state.get_want("1234").unwrap();
|
|
let mut expected: WantDetail = e.into();
|
|
// Into will set this field as current timestamp
|
|
expected.last_updated_timestamp = want.last_updated_timestamp;
|
|
assert_eq!(want, expected);
|
|
}
|
|
|
|
#[test]
|
|
fn test_should_cancel_want() {
|
|
let mut e = WantCreateEventV1::default();
|
|
e.want_id = "1234".to_string();
|
|
e.partitions = vec!["mypart".into()];
|
|
|
|
let mut state = BuildState::default();
|
|
state.handle_event(&e.clone().into());
|
|
|
|
// Should be able to cancel
|
|
let mut e = WantCancelEventV1::default();
|
|
e.want_id = "1234".to_string();
|
|
state.handle_event(&e.clone().into());
|
|
let want = state.get_want("1234").unwrap();
|
|
|
|
assert_eq!(
|
|
want.status,
|
|
Some(crate::WantStatusCode::WantCanceled.into())
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn test_multihop_dependency_replay() {
|
|
use crate::data_build_event::Event;
|
|
use crate::{
|
|
JobRunBufferEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1,
|
|
JobRunSuccessEventV1, MissingDeps, PartitionRef, WantAttributedPartitions,
|
|
WantCreateEventV1,
|
|
};
|
|
|
|
let mut state = BuildState::default();
|
|
let mut events = vec![];
|
|
|
|
// 1. Create want for data/beta
|
|
let beta_want_id = "beta-want".to_string();
|
|
let mut create_beta = WantCreateEventV1::default();
|
|
create_beta.want_id = beta_want_id.clone();
|
|
create_beta.partitions = vec![PartitionRef {
|
|
r#ref: "data/beta".to_string(),
|
|
}];
|
|
events.push(Event::WantCreateV1(create_beta));
|
|
|
|
// 2. Queue beta job (first attempt)
|
|
let beta_job_1_id = "beta-job-1".to_string();
|
|
let mut buffer_beta_1 = JobRunBufferEventV1::default();
|
|
buffer_beta_1.job_run_id = beta_job_1_id.clone();
|
|
buffer_beta_1.job_label = "//job_beta".to_string();
|
|
buffer_beta_1.want_attributed_partitions = vec![WantAttributedPartitions {
|
|
want_id: beta_want_id.clone(),
|
|
partitions: vec![PartitionRef {
|
|
r#ref: "data/beta".to_string(),
|
|
}],
|
|
}];
|
|
buffer_beta_1.building_partitions = vec![PartitionRef {
|
|
r#ref: "data/beta".to_string(),
|
|
}];
|
|
events.push(Event::JobRunBufferV1(buffer_beta_1));
|
|
|
|
// 3. Beta job starts running
|
|
let mut heartbeat_beta_1 = JobRunHeartbeatEventV1::default();
|
|
heartbeat_beta_1.job_run_id = beta_job_1_id.clone();
|
|
events.push(Event::JobRunHeartbeatV1(heartbeat_beta_1));
|
|
|
|
// 4. Beta job reports missing dependency on data/alpha
|
|
let mut dep_miss_beta_1 = JobRunMissingDepsEventV1::default();
|
|
dep_miss_beta_1.job_run_id = beta_job_1_id.clone();
|
|
dep_miss_beta_1.missing_deps = vec![MissingDeps {
|
|
impacted: vec![PartitionRef {
|
|
r#ref: "data/beta".to_string(),
|
|
}],
|
|
missing: vec![PartitionRef {
|
|
r#ref: "data/alpha".to_string(),
|
|
}],
|
|
}];
|
|
events.push(Event::JobRunMissingDepsV1(dep_miss_beta_1));
|
|
|
|
// 5. Create derivative want for data/alpha
|
|
let alpha_want_id = "alpha-want".to_string();
|
|
let mut create_alpha = WantCreateEventV1::default();
|
|
create_alpha.want_id = alpha_want_id.clone();
|
|
create_alpha.partitions = vec![PartitionRef {
|
|
r#ref: "data/alpha".to_string(),
|
|
}];
|
|
events.push(Event::WantCreateV1(create_alpha));
|
|
|
|
// 6. Queue alpha job
|
|
let alpha_job_id = "alpha-job".to_string();
|
|
let mut buffer_alpha = JobRunBufferEventV1::default();
|
|
buffer_alpha.job_run_id = alpha_job_id.clone();
|
|
buffer_alpha.job_label = "//job_alpha".to_string();
|
|
buffer_alpha.want_attributed_partitions = vec![WantAttributedPartitions {
|
|
want_id: alpha_want_id.clone(),
|
|
partitions: vec![PartitionRef {
|
|
r#ref: "data/alpha".to_string(),
|
|
}],
|
|
}];
|
|
buffer_alpha.building_partitions = vec![PartitionRef {
|
|
r#ref: "data/alpha".to_string(),
|
|
}];
|
|
events.push(Event::JobRunBufferV1(buffer_alpha));
|
|
|
|
// 7. Alpha job starts running
|
|
let mut heartbeat_alpha = JobRunHeartbeatEventV1::default();
|
|
heartbeat_alpha.job_run_id = alpha_job_id.clone();
|
|
events.push(Event::JobRunHeartbeatV1(heartbeat_alpha));
|
|
|
|
// 8. Alpha job succeeds
|
|
let mut success_alpha = JobRunSuccessEventV1::default();
|
|
success_alpha.job_run_id = alpha_job_id.clone();
|
|
events.push(Event::JobRunSuccessV1(success_alpha));
|
|
|
|
// 9. Queue beta job again (second attempt) - THIS IS THE CRITICAL MOMENT
|
|
let beta_job_2_id = "beta-job-2".to_string();
|
|
let mut buffer_beta_2 = JobRunBufferEventV1::default();
|
|
buffer_beta_2.job_run_id = beta_job_2_id.clone();
|
|
buffer_beta_2.job_label = "//job_beta".to_string();
|
|
buffer_beta_2.want_attributed_partitions = vec![WantAttributedPartitions {
|
|
want_id: beta_want_id.clone(),
|
|
partitions: vec![PartitionRef {
|
|
r#ref: "data/beta".to_string(),
|
|
}],
|
|
}];
|
|
buffer_beta_2.building_partitions = vec![PartitionRef {
|
|
r#ref: "data/beta".to_string(),
|
|
}];
|
|
events.push(Event::JobRunBufferV1(buffer_beta_2));
|
|
|
|
// 10. Beta job starts running
|
|
let mut heartbeat_beta_2 = JobRunHeartbeatEventV1::default();
|
|
heartbeat_beta_2.job_run_id = beta_job_2_id.clone();
|
|
events.push(Event::JobRunHeartbeatV1(heartbeat_beta_2));
|
|
|
|
// 11. Beta job succeeds
|
|
let mut success_beta_2 = JobRunSuccessEventV1::default();
|
|
success_beta_2.job_run_id = beta_job_2_id.clone();
|
|
events.push(Event::JobRunSuccessV1(success_beta_2));
|
|
|
|
// Process all events - this simulates replay
|
|
for event in &events {
|
|
state.handle_event(event);
|
|
}
|
|
|
|
// Verify final state
|
|
let beta_want = state.get_want(&beta_want_id).unwrap();
|
|
assert_eq!(
|
|
beta_want.status,
|
|
Some(crate::WantStatusCode::WantSuccessful.into()),
|
|
"Beta want should be successful after multi-hop dependency resolution"
|
|
);
|
|
|
|
let alpha_want = state.get_want(&alpha_want_id).unwrap();
|
|
assert_eq!(
|
|
alpha_want.status,
|
|
Some(crate::WantStatusCode::WantSuccessful.into()),
|
|
"Alpha want should be successful"
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|