Implement partitions typestate state machine
Some checks are pending
/ setup (push) Waiting to run

This commit is contained in:
Stuart Axelbrooke 2025-11-22 09:53:56 +08:00
parent a641822ead
commit a9b68bfa6a
6 changed files with 521 additions and 88 deletions

2
.gitignore vendored
View file

@ -19,4 +19,4 @@ logs/databuild/
# DSL generated code # DSL generated code
**/generated/ **/generated/
!/databuild/databuild.rs /databuild/databuild.rs

View file

@ -1,5 +1,6 @@
use crate::data_build_event::Event; use crate::data_build_event::Event;
use crate::data_deps::{WantTimestamps, missing_deps_to_want_events}; use crate::data_deps::{WantTimestamps, missing_deps_to_want_events};
use crate::partition_state::{Partition, PartitionWithState, MissingState, BuildingState, LiveState, FailedState, TaintedState};
use crate::util::{DatabuildError, current_timestamp}; use crate::util::{DatabuildError, current_timestamp};
use crate::{ use crate::{
JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1, JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1,
@ -33,11 +34,13 @@ This means no boxing or "query phase", and means we can have all state updates h
and updates, which is exceptionally fast. and updates, which is exceptionally fast.
*/ */
/// Tracks all application state, defines valid state transitions, and manages cross-state machine
/// state transitions (e.g. job run success resulting in partition going from Building to Live)
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct BuildState { pub struct BuildState {
wants: BTreeMap<String, WantDetail>, wants: BTreeMap<String, WantDetail>,
taints: BTreeMap<String, TaintDetail>, taints: BTreeMap<String, TaintDetail>,
partitions: BTreeMap<String, PartitionDetail>, partitions: BTreeMap<String, Partition>, // Type-safe partition storage
job_runs: BTreeMap<String, JobRunDetail>, job_runs: BTreeMap<String, JobRunDetail>,
} }
@ -57,6 +60,23 @@ impl BuildState {
self.job_runs.len() self.job_runs.len()
} }
/// Add want_id to partition's want_ids list
fn add_want_to_partition(&mut self, pref: &PartitionRef, want_id: &str) {
// Create partition if it doesn't exist
if !self.partitions.contains_key(&pref.r#ref) {
let partition = Partition::new_missing(pref.clone());
self.partitions.insert(pref.r#ref.clone(), partition);
}
// Add want_id
if let Some(partition) = self.partitions.get_mut(&pref.r#ref) {
let want_ids = partition.want_ids_mut();
if !want_ids.contains(&want_id.to_string()) {
want_ids.push(want_id.to_string());
}
}
}
/// Handles reacting to events, updating state, and erroring if its an invalid state transition /// Handles reacting to events, updating state, and erroring if its an invalid state transition
/// Event handlers can return vecs of events that will then be appended to the BEL /// Event handlers can return vecs of events that will then be appended to the BEL
pub fn handle_event(&mut self, event: &Event) -> Result<Vec<Event>, DatabuildError> { pub fn handle_event(&mut self, event: &Event) -> Result<Vec<Event>, DatabuildError> {
@ -119,6 +139,34 @@ impl BuildState {
} }
} }
// Transition partitions to Building state
for pref in &job_run.building_partitions {
if let Some(partition) = self.partitions.remove(&pref.r#ref) {
// Partition exists - transition based on current state
let transitioned = match partition {
// Valid: Missing -> Building
Partition::Missing(missing) => {
Partition::Building(missing.start_building(event.job_run_id.clone()))
}
// Invalid state: partition should not already be Building, Live, Failed, or Tainted
_ => {
return Err(format!(
"Invalid state: partition {} cannot start building from state {:?}",
pref.r#ref, partition
).into())
}
};
self.partitions.insert(pref.r#ref.clone(), transitioned);
} else {
// Partition doesn't exist yet - create in Missing then transition to Building
let missing = Partition::new_missing(pref.clone());
if let Partition::Missing(m) = missing {
let building = m.start_building(event.job_run_id.clone());
self.partitions.insert(pref.r#ref.clone(), Partition::Building(building));
}
}
}
self.job_runs self.job_runs
.insert(event.job_run_id.clone(), job_run.clone()); .insert(event.job_run_id.clone(), job_run.clone());
println!("Inserted job run: {:?}", job_run); println!("Inserted job run: {:?}", job_run);
@ -140,51 +188,6 @@ impl BuildState {
} }
} }
fn update_partition_status(
&mut self,
pref: &PartitionRef,
status: PartitionStatusCode,
job_run_id: Option<&str>,
) -> Result<(), DatabuildError> {
if let Some(partition) = self.partitions.get_mut(&pref.r#ref) {
partition.status = Some(status.clone().into());
partition.last_updated_timestamp = Some(current_timestamp());
if let Some(job_run_id) = job_run_id.map(str::to_string) {
if !partition.job_run_ids.contains(&job_run_id) {
partition.job_run_ids.push(job_run_id);
}
}
} else {
// Partition doesn't exist yet, needs to be inserted
let want_ids = if let Some(jrid) = job_run_id {
let job_run = self
.get_job_run(jrid)
.expect("Job run must exist for partition");
job_run
.servicing_wants
.iter()
.map(|wap| wap.want_id.clone())
.collect()
} else {
vec![]
};
let partition = PartitionDetail {
r#ref: Some(pref.clone()),
status: Some(status.into()),
last_updated_timestamp: Some(current_timestamp()),
job_run_ids: job_run_id
.map(|jrid| vec![jrid.to_string()])
.unwrap_or(vec![]),
want_ids,
..PartitionDetail::default()
};
self.partitions.insert(pref.r#ref.clone(), partition);
};
self.update_wants_for_partition(&pref)
}
/// Walks the state from this want ID to update its status. /// Walks the state from this want ID to update its status.
fn update_want_status(&mut self, want_id: &str) -> Result<(), DatabuildError> { fn update_want_status(&mut self, want_id: &str) -> Result<(), DatabuildError> {
if let Some(want) = self.wants.get(want_id) { if let Some(want) = self.wants.get(want_id) {
@ -223,13 +226,35 @@ impl BuildState {
// Clone building_partitions before we use it multiple times // Clone building_partitions before we use it multiple times
let newly_live_partitions: Vec<PartitionRef> = job_run.building_partitions.clone(); let newly_live_partitions: Vec<PartitionRef> = job_run.building_partitions.clone();
// Update partitions being build by this job // Update partitions being built by this job (strict type-safe transitions)
for pref in &newly_live_partitions { for pref in &newly_live_partitions {
self.update_partition_status( let partition = self.partitions.remove(&pref.r#ref).ok_or_else(|| {
pref, format!(
PartitionStatusCode::PartitionLive, "Partition {} must exist and be in Building state before completion",
Some(&event.job_run_id), pref.r#ref
)?; )
})?;
// ONLY valid transition: Building -> Live
let transitioned = match partition {
Partition::Building(building) => {
Partition::Live(building.complete(
event.job_run_id.clone(),
current_timestamp()
))
}
// All other states are invalid
_ => {
return Err(format!(
"Invalid state: partition {} must be Building to transition to Live, found {:?}",
pref.r#ref, partition
).into())
}
};
self.partitions.insert(pref.r#ref.clone(), transitioned);
// Update wants that reference this partition
self.update_wants_for_partition(pref)?;
} }
// Check all wants in WantUpstreamBuilding status to see if their dependencies are now satisfied // Check all wants in WantUpstreamBuilding status to see if their dependencies are now satisfied
@ -253,12 +278,11 @@ impl BuildState {
for want_id in wants_to_update { for want_id in wants_to_update {
if let Some(want) = self.wants.get_mut(&want_id) { if let Some(want) = self.wants.get_mut(&want_id) {
// Check if all upstreams are now satisfied // Check if all upstreams are now satisfied (using type-safe check)
let all_upstreams_satisfied = want.upstreams.iter().all(|upstream| { let all_upstreams_satisfied = want.upstreams.iter().all(|upstream| {
self.partitions self.partitions
.get(&upstream.r#ref) .get(&upstream.r#ref)
.and_then(|p| p.status.as_ref()) .map(|p| p.is_live())
.map(|s| s.code == PartitionStatusCode::PartitionLive as i32)
.unwrap_or(false) .unwrap_or(false)
}); });
@ -274,11 +298,11 @@ impl BuildState {
} }
fn update_wants_for_partition(&mut self, pref: &PartitionRef) -> Result<(), DatabuildError> { fn update_wants_for_partition(&mut self, pref: &PartitionRef) -> Result<(), DatabuildError> {
// todo!("Go to every want that references this partition and update its status") // Use type-safe partitions storage
let want_ids = self let want_ids = self
.partitions .partitions
.get(&pref.r#ref) .get(&pref.r#ref)
.map(|p| p.want_ids.clone()) .map(|p| p.want_ids().clone())
.ok_or(format!("Partition for ref {} not found", pref.r#ref))?; .ok_or(format!("Partition for ref {} not found", pref.r#ref))?;
for want_id in want_ids.iter() { for want_id in want_ids.iter() {
self.update_want_status(want_id)?; self.update_want_status(want_id)?;
@ -296,12 +320,35 @@ impl BuildState {
// Clone building_partitions before we use it multiple times // Clone building_partitions before we use it multiple times
let failed_partitions: Vec<PartitionRef> = job_run.building_partitions.clone(); let failed_partitions: Vec<PartitionRef> = job_run.building_partitions.clone();
// Transition partitions using strict type-safe methods
for pref in &failed_partitions { for pref in &failed_partitions {
self.update_partition_status( let partition = self.partitions.remove(&pref.r#ref).ok_or_else(|| {
pref, format!(
PartitionStatusCode::PartitionFailed, "Partition {} must exist and be in Building state before failure",
Some(&event.job_run_id), pref.r#ref
)?; )
})?;
// ONLY valid transition: Building -> Failed
let transitioned = match partition {
Partition::Building(building) => {
Partition::Failed(building.fail(
event.job_run_id.clone(),
current_timestamp()
))
}
// All other states are invalid
_ => {
return Err(format!(
"Invalid state: partition {} must be Building to transition to Failed, found {:?}",
pref.r#ref, partition
).into())
}
};
self.partitions.insert(pref.r#ref.clone(), transitioned);
// Update wants that reference this partition
self.update_wants_for_partition(pref)?;
} }
// Check all wants in WantUpstreamBuilding status to see if they were waiting for the failed partitions // Check all wants in WantUpstreamBuilding status to see if they were waiting for the failed partitions
@ -388,6 +435,31 @@ impl BuildState {
} }
} }
// Transition partitions back to Missing since this job can't build them yet
for pref in &job_run_detail.building_partitions {
let partition = self.partitions.remove(&pref.r#ref).ok_or_else(|| {
format!(
"Partition {} must exist and be in Building state during dep_miss",
pref.r#ref
)
})?;
// Only valid transition: Building -> Missing
let transitioned = match partition {
Partition::Building(building) => {
Partition::Missing(building.reset_to_missing())
}
// All other states are invalid
_ => {
return Err(format!(
"Invalid state: partition {} must be Building during dep_miss, found {:?}",
pref.r#ref, partition
).into())
}
};
self.partitions.insert(pref.r#ref.clone(), transitioned);
}
// Create wants from dep misses // Create wants from dep misses
let want_events = missing_deps_to_want_events( let want_events = missing_deps_to_want_events(
event.missing_deps.clone(), event.missing_deps.clone(),
@ -416,7 +488,17 @@ impl BuildState {
Self { wants, ..self } Self { wants, ..self }
} }
fn with_partitions(self, partitions: BTreeMap<String, PartitionDetail>) -> Self { #[cfg(test)]
fn with_partitions(self, old_partitions: BTreeMap<String, PartitionDetail>) -> Self {
// Convert PartitionDetail to Partition (for backfill scenarios)
let partitions: BTreeMap<String, Partition> = old_partitions
.into_iter()
.map(|(key, detail)| {
// For now, just create in Missing state - real migration would be more sophisticated
let partition = Partition::new_missing(detail.r#ref.clone().unwrap_or_default());
(key, partition)
})
.collect();
Self { partitions, ..self } Self { partitions, ..self }
} }
@ -427,7 +509,7 @@ impl BuildState {
self.taints.get(taint_id).cloned() self.taints.get(taint_id).cloned()
} }
pub fn get_partition(&self, partition_id: &str) -> Option<PartitionDetail> { pub fn get_partition(&self, partition_id: &str) -> Option<PartitionDetail> {
self.partitions.get(partition_id).cloned() self.partitions.get(partition_id).map(|p| p.to_detail())
} }
pub fn get_job_run(&self, job_run_id: &str) -> Option<JobRunDetail> { pub fn get_job_run(&self, job_run_id: &str) -> Option<JobRunDetail> {
self.job_runs.get(job_run_id).cloned() self.job_runs.get(job_run_id).cloned()
@ -458,8 +540,14 @@ impl BuildState {
pub fn list_partitions(&self, request: &ListPartitionsRequest) -> ListPartitionsResponse { pub fn list_partitions(&self, request: &ListPartitionsRequest) -> ListPartitionsResponse {
let page = request.page.unwrap_or(0); let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE); let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
// Convert Partition to PartitionDetail for API
let partition_details: BTreeMap<String, PartitionDetail> = self
.partitions
.iter()
.map(|(k, v)| (k.clone(), v.to_detail()))
.collect();
ListPartitionsResponse { ListPartitionsResponse {
data: list_state_items(&self.partitions, page, page_size), data: list_state_items(&partition_details, page, page_size),
match_count: self.wants.len() as u64, match_count: self.wants.len() as u64,
page, page,
page_size, page_size,
@ -481,27 +569,27 @@ impl BuildState {
Wants are schedulable when their partition is live and not tainted Wants are schedulable when their partition is live and not tainted
*/ */
pub fn want_schedulability(&self, want: &WantDetail) -> WantSchedulability { pub fn want_schedulability(&self, want: &WantDetail) -> WantSchedulability {
let live_details: Vec<&PartitionDetail> = want // Use type-safe partition checks from partitions
.upstreams let mut live: Vec<PartitionRef> = Vec::new();
.iter() let mut tainted: Vec<PartitionRef> = Vec::new();
.map(|pref| self.partitions.get(&pref.r#ref)) let mut missing: Vec<PartitionRef> = Vec::new();
.flatten()
.collect(); for upstream_ref in &want.upstreams {
let live: Vec<PartitionRef> = live_details match self.partitions.get(&upstream_ref.r#ref) {
.iter() Some(partition) => {
.map(|pd| pd.r#ref.clone().expect("pref must have ref")) if partition.is_live() {
.collect(); live.push(upstream_ref.clone());
let missing: Vec<PartitionRef> = want } else if matches!(partition, Partition::Tainted(_)) {
.upstreams tainted.push(upstream_ref.clone());
.iter() }
.filter(|pref| self.partitions.get(&pref.r#ref).is_none()) // Other states (Missing, Building, Failed) don't add to any list
.cloned() }
.collect(); None => {
let tainted: Vec<PartitionRef> = live_details missing.push(upstream_ref.clone());
.iter() }
.filter(|p| p.status == Some(PartitionStatusCode::PartitionTainted.into())) }
.map(|pref| pref.r#ref.clone().unwrap()) }
.collect();
WantSchedulability { WantSchedulability {
want: want.clone(), want: want.clone(),
status: WantUpstreamStatus { status: WantUpstreamStatus {

View file

@ -2,6 +2,7 @@ mod build_event_log;
mod orchestrator; mod orchestrator;
mod job_run; mod job_run;
mod job; mod job;
mod partition_state;
mod util; mod util;
mod build_state; mod build_state;
mod event_transforms; mod event_transforms;

View file

@ -554,7 +554,7 @@ mod tests {
mod orchestration { mod orchestration {
use crate::data_build_event::Event; use crate::data_build_event::Event;
use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b}; use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b};
use crate::{PartitionStatusCode, WantCreateEventV1}; use crate::{PartitionStatusCode, WantCreateEventV1, WantStatusCode};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
@ -584,6 +584,19 @@ mod tests {
orchestrator.step().expect("should start run"); orchestrator.step().expect("should start run");
assert_eq!(orchestrator.count_running_jobs(), 1); assert_eq!(orchestrator.count_running_jobs(), 1);
assert_eq!(orchestrator.bel.state.count_job_runs(), 1); assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
// Partition should be in Building state after job starts
assert_eq!(
orchestrator
.bel
.state
.get_partition(partition)
.unwrap()
.status,
Some(PartitionStatusCode::PartitionBuilding.into()),
"partition should be in Building state after job starts"
);
thread::sleep(Duration::from_millis(1)); thread::sleep(Duration::from_millis(1));
// Should still be running after 1ms // Should still be running after 1ms
orchestrator orchestrator
@ -726,6 +739,18 @@ echo 'Beta succeeded'
"beta job should be running" "beta job should be running"
); );
// Beta partition should be in Building state after job starts
assert_eq!(
orchestrator
.bel
.state
.get_partition(partition_beta)
.unwrap()
.status,
Some(PartitionStatusCode::PartitionBuilding.into()),
"beta partition should be in Building state after job starts"
);
// Step 3: Beta job detects missing alpha dep and creates want // Step 3: Beta job detects missing alpha dep and creates want
wait_for_jobs_to_complete(&mut orchestrator, 10).expect("beta job should complete"); wait_for_jobs_to_complete(&mut orchestrator, 10).expect("beta job should complete");
// (Beta should now be in dep_miss state, and a want for alpha should be created) // (Beta should now be in dep_miss state, and a want for alpha should be created)
@ -735,6 +760,24 @@ echo 'Beta succeeded'
"beta should have dep miss" "beta should have dep miss"
); );
// Beta want should be in UpstreamBuilding state waiting for alpha
// (Check that at least one want referencing beta is in UpstreamBuilding)
let wants_response = orchestrator
.bel
.state
.list_wants(&crate::ListWantsRequest::default());
let beta_wants: Vec<_> = wants_response
.data
.iter()
.filter(|w| w.partitions.iter().any(|p| p.r#ref == partition_beta))
.collect();
assert!(
beta_wants.iter().any(|w| w.status.as_ref().map(|s| s.code) == Some(WantStatusCode::WantUpstreamBuilding as i32)),
"At least one beta want should be in UpstreamBuilding state, found: {:?}",
beta_wants.iter().map(|w| &w.status).collect::<Vec<_>>()
);
// Step 4: Should schedule and start alpha job // Step 4: Should schedule and start alpha job
// (dep miss handler created the alpha want, which will be picked up by poll_wants) // (dep miss handler created the alpha want, which will be picked up by poll_wants)
orchestrator.step().expect("step 4"); orchestrator.step().expect("step 4");
@ -744,6 +787,18 @@ echo 'Beta succeeded'
"alpha job should be running" "alpha job should be running"
); );
// Alpha partition should be in Building state after job starts
assert_eq!(
orchestrator
.bel
.state
.get_partition(partition_alpha)
.unwrap()
.status,
Some(PartitionStatusCode::PartitionBuilding.into()),
"alpha partition should be in Building state after job starts"
);
// Step 6: Alpha completes successfully // Step 6: Alpha completes successfully
wait_for_jobs_to_complete(&mut orchestrator, 10).expect("alpha job should complete"); wait_for_jobs_to_complete(&mut orchestrator, 10).expect("alpha job should complete");
assert_eq!( assert_eq!(

View file

@ -0,0 +1,268 @@
use crate::{PartitionRef, PartitionDetail, PartitionStatus, PartitionStatusCode};
/// State: Partition has been referenced but not yet built
#[derive(Debug, Clone)]
pub struct MissingState {}
/// State: Partition is currently being built by one or more jobs
#[derive(Debug, Clone)]
pub struct BuildingState {
pub building_by: Vec<String>, // job_run_ids
}
/// State: Partition has been successfully built
#[derive(Debug, Clone)]
pub struct LiveState {
pub built_at: u64,
pub built_by: String, // job_run_id
}
/// State: Partition build failed
#[derive(Debug, Clone)]
pub struct FailedState {
pub failed_at: u64,
pub failed_by: String, // job_run_id
}
/// State: Partition has been marked as invalid/tainted
#[derive(Debug, Clone)]
pub struct TaintedState {
pub tainted_at: u64,
pub taint_ids: Vec<String>,
}
/// Generic partition struct parameterized by state
#[derive(Debug, Clone)]
pub struct PartitionWithState<S> {
pub partition_ref: PartitionRef,
pub want_ids: Vec<String>,
pub state: S,
}
/// Wrapper enum for storing partitions in collections
#[derive(Debug, Clone)]
pub enum Partition {
Missing(PartitionWithState<MissingState>),
Building(PartitionWithState<BuildingState>),
Live(PartitionWithState<LiveState>),
Failed(PartitionWithState<FailedState>),
Tainted(PartitionWithState<TaintedState>),
}
// Type-safe transition methods for MissingState
impl PartitionWithState<MissingState> {
/// Transition from Missing to Building when a job starts building this partition
pub fn start_building(self, job_run_id: String) -> PartitionWithState<BuildingState> {
PartitionWithState {
partition_ref: self.partition_ref,
want_ids: self.want_ids,
state: BuildingState {
building_by: vec![job_run_id],
},
}
}
}
// Type-safe transition methods for BuildingState
impl PartitionWithState<BuildingState> {
/// Transition from Building to Live when a job successfully completes
pub fn complete(self, job_run_id: String, timestamp: u64) -> PartitionWithState<LiveState> {
PartitionWithState {
partition_ref: self.partition_ref,
want_ids: self.want_ids,
state: LiveState {
built_at: timestamp,
built_by: job_run_id,
},
}
}
/// Transition from Building to Failed when a job fails
pub fn fail(self, job_run_id: String, timestamp: u64) -> PartitionWithState<FailedState> {
PartitionWithState {
partition_ref: self.partition_ref,
want_ids: self.want_ids,
state: FailedState {
failed_at: timestamp,
failed_by: job_run_id,
},
}
}
/// Add another job to the list of jobs building this partition
pub fn add_building_job(mut self, job_run_id: String) -> Self {
if !self.state.building_by.contains(&job_run_id) {
self.state.building_by.push(job_run_id);
}
self
}
/// Transition from Building back to Missing when a job discovers missing dependencies
pub fn reset_to_missing(self) -> PartitionWithState<MissingState> {
PartitionWithState {
partition_ref: self.partition_ref,
want_ids: self.want_ids,
state: MissingState {},
}
}
}
// Type-safe transition methods for LiveState
impl PartitionWithState<LiveState> {
/// Transition from Live to Tainted when a taint is applied
pub fn taint(self, taint_id: String, timestamp: u64) -> PartitionWithState<TaintedState> {
PartitionWithState {
partition_ref: self.partition_ref,
want_ids: self.want_ids,
state: TaintedState {
tainted_at: timestamp,
taint_ids: vec![taint_id],
},
}
}
}
// Type-safe transition methods for TaintedState
impl PartitionWithState<TaintedState> {
/// Add another taint to an already-tainted partition
pub fn add_taint(mut self, taint_id: String) -> Self {
if !self.state.taint_ids.contains(&taint_id) {
self.state.taint_ids.push(taint_id);
}
self
}
}
// Helper methods on the Partition enum
impl Partition {
/// Create a new partition in the Missing state
pub fn new_missing(partition_ref: PartitionRef) -> Self {
Partition::Missing(PartitionWithState {
partition_ref,
want_ids: vec![],
state: MissingState {},
})
}
/// Get the partition reference from any state
pub fn partition_ref(&self) -> &PartitionRef {
match self {
Partition::Missing(p) => &p.partition_ref,
Partition::Building(p) => &p.partition_ref,
Partition::Live(p) => &p.partition_ref,
Partition::Failed(p) => &p.partition_ref,
Partition::Tainted(p) => &p.partition_ref,
}
}
/// Get want_ids from any state
pub fn want_ids(&self) -> &Vec<String> {
match self {
Partition::Missing(p) => &p.want_ids,
Partition::Building(p) => &p.want_ids,
Partition::Live(p) => &p.want_ids,
Partition::Failed(p) => &p.want_ids,
Partition::Tainted(p) => &p.want_ids,
}
}
/// Get mutable want_ids from any state
pub fn want_ids_mut(&mut self) -> &mut Vec<String> {
match self {
Partition::Missing(p) => &mut p.want_ids,
Partition::Building(p) => &mut p.want_ids,
Partition::Live(p) => &mut p.want_ids,
Partition::Failed(p) => &mut p.want_ids,
Partition::Tainted(p) => &mut p.want_ids,
}
}
/// Check if partition is in Live state
pub fn is_live(&self) -> bool {
matches!(self, Partition::Live(_))
}
/// Check if partition is satisfied (Live or Tainted both count as "available")
pub fn is_satisfied(&self) -> bool {
matches!(self, Partition::Live(_) | Partition::Tainted(_))
}
/// Check if partition is in a terminal state (Live, Failed, or Tainted)
pub fn is_terminal(&self) -> bool {
matches!(
self,
Partition::Live(_) | Partition::Failed(_) | Partition::Tainted(_)
)
}
/// Check if partition is currently being built
pub fn is_building(&self) -> bool {
matches!(self, Partition::Building(_))
}
/// Check if partition is missing (referenced but not built)
pub fn is_missing(&self) -> bool {
matches!(self, Partition::Missing(_))
}
/// Convert to PartitionDetail for API responses and queries
pub fn to_detail(&self) -> PartitionDetail {
match self {
Partition::Missing(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
status: Some(PartitionStatus {
code: PartitionStatusCode::PartitionWanted as i32,
name: "PartitionWanted".to_string(),
}),
want_ids: p.want_ids.clone(),
job_run_ids: vec![],
taint_ids: vec![],
last_updated_timestamp: None,
},
Partition::Building(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
status: Some(PartitionStatus {
code: PartitionStatusCode::PartitionBuilding as i32,
name: "PartitionBuilding".to_string(),
}),
want_ids: p.want_ids.clone(),
job_run_ids: p.state.building_by.clone(),
taint_ids: vec![],
last_updated_timestamp: None,
},
Partition::Live(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
status: Some(PartitionStatus {
code: PartitionStatusCode::PartitionLive as i32,
name: "PartitionLive".to_string(),
}),
want_ids: p.want_ids.clone(),
job_run_ids: vec![p.state.built_by.clone()],
taint_ids: vec![],
last_updated_timestamp: Some(p.state.built_at),
},
Partition::Failed(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
status: Some(PartitionStatus {
code: PartitionStatusCode::PartitionFailed as i32,
name: "PartitionFailed".to_string(),
}),
want_ids: p.want_ids.clone(),
job_run_ids: vec![p.state.failed_by.clone()],
taint_ids: vec![],
last_updated_timestamp: Some(p.state.failed_at),
},
Partition::Tainted(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
status: Some(PartitionStatus {
code: PartitionStatusCode::PartitionTainted as i32,
name: "PartitionTainted".to_string(),
}),
want_ids: p.want_ids.clone(),
job_run_ids: vec![],
taint_ids: p.state.taint_ids.clone(),
last_updated_timestamp: Some(p.state.tainted_at),
},
}
}
}

View file

@ -0,0 +1,21 @@
If you look at the BEL definition, you'll see that there's two components to it, the literal serialized event stream, and the build state, a projection of the events into objects (e.g. via reducer, etc):
```rust
pub struct BuildEventLog<S: BELStorage + Debug> {
pub storage: S,
pub state: BuildState,
}
```
`storage` is the literal events that happened: a job run being launched, a want being requested, a job run finishing and producing some number of partitions, etc. `state` answers questions about the state of the world as a result of the serial occurrence of the recorded events, like "is the partition x/y/z live?" and "why hasn't partition a/b/c been built yet"? `state` is essentially the thing responsible for system consistency.
Most of the code in this project is in calculating next states for build state objects: determining wants that can have jobs run to satisfy them, updating partitions to live after a job run succeeds, etc. Can we formalize this into a composition of state machines to simplify the codebase, achieve more compile-time safety, and potentially unlock greater concurrency as a byproduct?
CPN concurrency can be describe succinctly: if the workloads touch disjoint places, they can be run concurrently. This seems to overwhelmingly be the case for the domain databuild is interested in, where a single "data service" is traditionally responsible for producing partitions in a given dataset. Another huge benefit to using a CPN framing for databuild is to separate concerns between state updates/consistency and all the stuff that connects to it.
# Appendix
## Partition Collisions?
Random thought, we also have this lingering "what if unrelated wants collide in the partition space", specifically for a paradigm where job runs produce multiple partitions based on their parameterization. This may also give us the confidence to just cancel the later of the colliding jobs and have it reschedule (how would partitions be diff?). Or, given that we update partition building status on job schedule, we would be confident that we just never get into that situation at the later want grouping stage (pre job scheduling), it would see the conflict partition as building thanks to the earlier job being started. Probably worth constructing a literal situation for this to war game it or implement a literal integration test.