From a9b68bfa6a62c1786e1ebbf9ffde57f04b7cc3a6 Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Sat, 22 Nov 2025 09:53:56 +0800 Subject: [PATCH] Implement partitions typestate state machine --- .gitignore | 2 +- databuild/build_state.rs | 260 ++++++++++++++++++++++----------- databuild/lib.rs | 1 + databuild/orchestrator.rs | 57 +++++++- databuild/partition_state.rs | 268 ++++++++++++++++++++++++++++++++++ docs/ideas/cpn-build-state.md | 21 +++ 6 files changed, 521 insertions(+), 88 deletions(-) create mode 100644 databuild/partition_state.rs create mode 100644 docs/ideas/cpn-build-state.md diff --git a/.gitignore b/.gitignore index 0af2eb6..1b0fdd0 100644 --- a/.gitignore +++ b/.gitignore @@ -19,4 +19,4 @@ logs/databuild/ # DSL generated code **/generated/ -!/databuild/databuild.rs +/databuild/databuild.rs diff --git a/databuild/build_state.rs b/databuild/build_state.rs index 4e7701e..b350084 100644 --- a/databuild/build_state.rs +++ b/databuild/build_state.rs @@ -1,5 +1,6 @@ use crate::data_build_event::Event; use crate::data_deps::{WantTimestamps, missing_deps_to_want_events}; +use crate::partition_state::{Partition, PartitionWithState, MissingState, BuildingState, LiveState, FailedState, TaintedState}; use crate::util::{DatabuildError, current_timestamp}; use crate::{ JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1, @@ -33,11 +34,13 @@ This means no boxing or "query phase", and means we can have all state updates h and updates, which is exceptionally fast. */ +/// Tracks all application state, defines valid state transitions, and manages cross-state machine +/// state transitions (e.g. job run success resulting in partition going from Building to Live) #[derive(Debug, Clone)] pub struct BuildState { wants: BTreeMap, taints: BTreeMap, - partitions: BTreeMap, + partitions: BTreeMap, // Type-safe partition storage job_runs: BTreeMap, } @@ -57,6 +60,23 @@ impl BuildState { self.job_runs.len() } + /// Add want_id to partition's want_ids list + fn add_want_to_partition(&mut self, pref: &PartitionRef, want_id: &str) { + // Create partition if it doesn't exist + if !self.partitions.contains_key(&pref.r#ref) { + let partition = Partition::new_missing(pref.clone()); + self.partitions.insert(pref.r#ref.clone(), partition); + } + + // Add want_id + if let Some(partition) = self.partitions.get_mut(&pref.r#ref) { + let want_ids = partition.want_ids_mut(); + if !want_ids.contains(&want_id.to_string()) { + want_ids.push(want_id.to_string()); + } + } + } + /// Handles reacting to events, updating state, and erroring if its an invalid state transition /// Event handlers can return vecs of events that will then be appended to the BEL pub fn handle_event(&mut self, event: &Event) -> Result, DatabuildError> { @@ -119,6 +139,34 @@ impl BuildState { } } + // Transition partitions to Building state + for pref in &job_run.building_partitions { + if let Some(partition) = self.partitions.remove(&pref.r#ref) { + // Partition exists - transition based on current state + let transitioned = match partition { + // Valid: Missing -> Building + Partition::Missing(missing) => { + Partition::Building(missing.start_building(event.job_run_id.clone())) + } + // Invalid state: partition should not already be Building, Live, Failed, or Tainted + _ => { + return Err(format!( + "Invalid state: partition {} cannot start building from state {:?}", + pref.r#ref, partition + ).into()) + } + }; + self.partitions.insert(pref.r#ref.clone(), transitioned); + } else { + // Partition doesn't exist yet - create in Missing then transition to Building + let missing = Partition::new_missing(pref.clone()); + if let Partition::Missing(m) = missing { + let building = m.start_building(event.job_run_id.clone()); + self.partitions.insert(pref.r#ref.clone(), Partition::Building(building)); + } + } + } + self.job_runs .insert(event.job_run_id.clone(), job_run.clone()); println!("Inserted job run: {:?}", job_run); @@ -140,51 +188,6 @@ impl BuildState { } } - fn update_partition_status( - &mut self, - pref: &PartitionRef, - status: PartitionStatusCode, - job_run_id: Option<&str>, - ) -> Result<(), DatabuildError> { - if let Some(partition) = self.partitions.get_mut(&pref.r#ref) { - partition.status = Some(status.clone().into()); - partition.last_updated_timestamp = Some(current_timestamp()); - if let Some(job_run_id) = job_run_id.map(str::to_string) { - if !partition.job_run_ids.contains(&job_run_id) { - partition.job_run_ids.push(job_run_id); - } - } - } else { - // Partition doesn't exist yet, needs to be inserted - let want_ids = if let Some(jrid) = job_run_id { - let job_run = self - .get_job_run(jrid) - .expect("Job run must exist for partition"); - job_run - .servicing_wants - .iter() - .map(|wap| wap.want_id.clone()) - .collect() - } else { - vec![] - }; - - let partition = PartitionDetail { - r#ref: Some(pref.clone()), - status: Some(status.into()), - last_updated_timestamp: Some(current_timestamp()), - job_run_ids: job_run_id - .map(|jrid| vec![jrid.to_string()]) - .unwrap_or(vec![]), - want_ids, - ..PartitionDetail::default() - }; - self.partitions.insert(pref.r#ref.clone(), partition); - }; - - self.update_wants_for_partition(&pref) - } - /// Walks the state from this want ID to update its status. fn update_want_status(&mut self, want_id: &str) -> Result<(), DatabuildError> { if let Some(want) = self.wants.get(want_id) { @@ -223,13 +226,35 @@ impl BuildState { // Clone building_partitions before we use it multiple times let newly_live_partitions: Vec = job_run.building_partitions.clone(); - // Update partitions being build by this job + // Update partitions being built by this job (strict type-safe transitions) for pref in &newly_live_partitions { - self.update_partition_status( - pref, - PartitionStatusCode::PartitionLive, - Some(&event.job_run_id), - )?; + let partition = self.partitions.remove(&pref.r#ref).ok_or_else(|| { + format!( + "Partition {} must exist and be in Building state before completion", + pref.r#ref + ) + })?; + + // ONLY valid transition: Building -> Live + let transitioned = match partition { + Partition::Building(building) => { + Partition::Live(building.complete( + event.job_run_id.clone(), + current_timestamp() + )) + } + // All other states are invalid + _ => { + return Err(format!( + "Invalid state: partition {} must be Building to transition to Live, found {:?}", + pref.r#ref, partition + ).into()) + } + }; + self.partitions.insert(pref.r#ref.clone(), transitioned); + + // Update wants that reference this partition + self.update_wants_for_partition(pref)?; } // Check all wants in WantUpstreamBuilding status to see if their dependencies are now satisfied @@ -253,12 +278,11 @@ impl BuildState { for want_id in wants_to_update { if let Some(want) = self.wants.get_mut(&want_id) { - // Check if all upstreams are now satisfied + // Check if all upstreams are now satisfied (using type-safe check) let all_upstreams_satisfied = want.upstreams.iter().all(|upstream| { self.partitions .get(&upstream.r#ref) - .and_then(|p| p.status.as_ref()) - .map(|s| s.code == PartitionStatusCode::PartitionLive as i32) + .map(|p| p.is_live()) .unwrap_or(false) }); @@ -274,11 +298,11 @@ impl BuildState { } fn update_wants_for_partition(&mut self, pref: &PartitionRef) -> Result<(), DatabuildError> { - // todo!("Go to every want that references this partition and update its status") + // Use type-safe partitions storage let want_ids = self .partitions .get(&pref.r#ref) - .map(|p| p.want_ids.clone()) + .map(|p| p.want_ids().clone()) .ok_or(format!("Partition for ref {} not found", pref.r#ref))?; for want_id in want_ids.iter() { self.update_want_status(want_id)?; @@ -296,12 +320,35 @@ impl BuildState { // Clone building_partitions before we use it multiple times let failed_partitions: Vec = job_run.building_partitions.clone(); + // Transition partitions using strict type-safe methods for pref in &failed_partitions { - self.update_partition_status( - pref, - PartitionStatusCode::PartitionFailed, - Some(&event.job_run_id), - )?; + let partition = self.partitions.remove(&pref.r#ref).ok_or_else(|| { + format!( + "Partition {} must exist and be in Building state before failure", + pref.r#ref + ) + })?; + + // ONLY valid transition: Building -> Failed + let transitioned = match partition { + Partition::Building(building) => { + Partition::Failed(building.fail( + event.job_run_id.clone(), + current_timestamp() + )) + } + // All other states are invalid + _ => { + return Err(format!( + "Invalid state: partition {} must be Building to transition to Failed, found {:?}", + pref.r#ref, partition + ).into()) + } + }; + self.partitions.insert(pref.r#ref.clone(), transitioned); + + // Update wants that reference this partition + self.update_wants_for_partition(pref)?; } // Check all wants in WantUpstreamBuilding status to see if they were waiting for the failed partitions @@ -388,6 +435,31 @@ impl BuildState { } } + // Transition partitions back to Missing since this job can't build them yet + for pref in &job_run_detail.building_partitions { + let partition = self.partitions.remove(&pref.r#ref).ok_or_else(|| { + format!( + "Partition {} must exist and be in Building state during dep_miss", + pref.r#ref + ) + })?; + + // Only valid transition: Building -> Missing + let transitioned = match partition { + Partition::Building(building) => { + Partition::Missing(building.reset_to_missing()) + } + // All other states are invalid + _ => { + return Err(format!( + "Invalid state: partition {} must be Building during dep_miss, found {:?}", + pref.r#ref, partition + ).into()) + } + }; + self.partitions.insert(pref.r#ref.clone(), transitioned); + } + // Create wants from dep misses let want_events = missing_deps_to_want_events( event.missing_deps.clone(), @@ -416,7 +488,17 @@ impl BuildState { Self { wants, ..self } } - fn with_partitions(self, partitions: BTreeMap) -> Self { + #[cfg(test)] + fn with_partitions(self, old_partitions: BTreeMap) -> Self { + // Convert PartitionDetail to Partition (for backfill scenarios) + let partitions: BTreeMap = old_partitions + .into_iter() + .map(|(key, detail)| { + // For now, just create in Missing state - real migration would be more sophisticated + let partition = Partition::new_missing(detail.r#ref.clone().unwrap_or_default()); + (key, partition) + }) + .collect(); Self { partitions, ..self } } @@ -427,7 +509,7 @@ impl BuildState { self.taints.get(taint_id).cloned() } pub fn get_partition(&self, partition_id: &str) -> Option { - self.partitions.get(partition_id).cloned() + self.partitions.get(partition_id).map(|p| p.to_detail()) } pub fn get_job_run(&self, job_run_id: &str) -> Option { self.job_runs.get(job_run_id).cloned() @@ -458,8 +540,14 @@ impl BuildState { pub fn list_partitions(&self, request: &ListPartitionsRequest) -> ListPartitionsResponse { let page = request.page.unwrap_or(0); let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE); + // Convert Partition to PartitionDetail for API + let partition_details: BTreeMap = self + .partitions + .iter() + .map(|(k, v)| (k.clone(), v.to_detail())) + .collect(); ListPartitionsResponse { - data: list_state_items(&self.partitions, page, page_size), + data: list_state_items(&partition_details, page, page_size), match_count: self.wants.len() as u64, page, page_size, @@ -481,27 +569,27 @@ impl BuildState { Wants are schedulable when their partition is live and not tainted */ pub fn want_schedulability(&self, want: &WantDetail) -> WantSchedulability { - let live_details: Vec<&PartitionDetail> = want - .upstreams - .iter() - .map(|pref| self.partitions.get(&pref.r#ref)) - .flatten() - .collect(); - let live: Vec = live_details - .iter() - .map(|pd| pd.r#ref.clone().expect("pref must have ref")) - .collect(); - let missing: Vec = want - .upstreams - .iter() - .filter(|pref| self.partitions.get(&pref.r#ref).is_none()) - .cloned() - .collect(); - let tainted: Vec = live_details - .iter() - .filter(|p| p.status == Some(PartitionStatusCode::PartitionTainted.into())) - .map(|pref| pref.r#ref.clone().unwrap()) - .collect(); + // Use type-safe partition checks from partitions + let mut live: Vec = Vec::new(); + let mut tainted: Vec = Vec::new(); + let mut missing: Vec = Vec::new(); + + for upstream_ref in &want.upstreams { + match self.partitions.get(&upstream_ref.r#ref) { + Some(partition) => { + if partition.is_live() { + live.push(upstream_ref.clone()); + } else if matches!(partition, Partition::Tainted(_)) { + tainted.push(upstream_ref.clone()); + } + // Other states (Missing, Building, Failed) don't add to any list + } + None => { + missing.push(upstream_ref.clone()); + } + } + } + WantSchedulability { want: want.clone(), status: WantUpstreamStatus { diff --git a/databuild/lib.rs b/databuild/lib.rs index 0479561..5d669dc 100644 --- a/databuild/lib.rs +++ b/databuild/lib.rs @@ -2,6 +2,7 @@ mod build_event_log; mod orchestrator; mod job_run; mod job; +mod partition_state; mod util; mod build_state; mod event_transforms; diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index ef31745..434d038 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -554,7 +554,7 @@ mod tests { mod orchestration { use crate::data_build_event::Event; use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b}; - use crate::{PartitionStatusCode, WantCreateEventV1}; + use crate::{PartitionStatusCode, WantCreateEventV1, WantStatusCode}; use std::thread; use std::time::Duration; @@ -584,6 +584,19 @@ mod tests { orchestrator.step().expect("should start run"); assert_eq!(orchestrator.count_running_jobs(), 1); assert_eq!(orchestrator.bel.state.count_job_runs(), 1); + + // Partition should be in Building state after job starts + assert_eq!( + orchestrator + .bel + .state + .get_partition(partition) + .unwrap() + .status, + Some(PartitionStatusCode::PartitionBuilding.into()), + "partition should be in Building state after job starts" + ); + thread::sleep(Duration::from_millis(1)); // Should still be running after 1ms orchestrator @@ -726,6 +739,18 @@ echo 'Beta succeeded' "beta job should be running" ); + // Beta partition should be in Building state after job starts + assert_eq!( + orchestrator + .bel + .state + .get_partition(partition_beta) + .unwrap() + .status, + Some(PartitionStatusCode::PartitionBuilding.into()), + "beta partition should be in Building state after job starts" + ); + // Step 3: Beta job detects missing alpha dep and creates want wait_for_jobs_to_complete(&mut orchestrator, 10).expect("beta job should complete"); // (Beta should now be in dep_miss state, and a want for alpha should be created) @@ -735,6 +760,24 @@ echo 'Beta succeeded' "beta should have dep miss" ); + // Beta want should be in UpstreamBuilding state waiting for alpha + // (Check that at least one want referencing beta is in UpstreamBuilding) + let wants_response = orchestrator + .bel + .state + .list_wants(&crate::ListWantsRequest::default()); + let beta_wants: Vec<_> = wants_response + .data + .iter() + .filter(|w| w.partitions.iter().any(|p| p.r#ref == partition_beta)) + .collect(); + + assert!( + beta_wants.iter().any(|w| w.status.as_ref().map(|s| s.code) == Some(WantStatusCode::WantUpstreamBuilding as i32)), + "At least one beta want should be in UpstreamBuilding state, found: {:?}", + beta_wants.iter().map(|w| &w.status).collect::>() + ); + // Step 4: Should schedule and start alpha job // (dep miss handler created the alpha want, which will be picked up by poll_wants) orchestrator.step().expect("step 4"); @@ -744,6 +787,18 @@ echo 'Beta succeeded' "alpha job should be running" ); + // Alpha partition should be in Building state after job starts + assert_eq!( + orchestrator + .bel + .state + .get_partition(partition_alpha) + .unwrap() + .status, + Some(PartitionStatusCode::PartitionBuilding.into()), + "alpha partition should be in Building state after job starts" + ); + // Step 6: Alpha completes successfully wait_for_jobs_to_complete(&mut orchestrator, 10).expect("alpha job should complete"); assert_eq!( diff --git a/databuild/partition_state.rs b/databuild/partition_state.rs new file mode 100644 index 0000000..21f4d35 --- /dev/null +++ b/databuild/partition_state.rs @@ -0,0 +1,268 @@ +use crate::{PartitionRef, PartitionDetail, PartitionStatus, PartitionStatusCode}; + +/// State: Partition has been referenced but not yet built +#[derive(Debug, Clone)] +pub struct MissingState {} + +/// State: Partition is currently being built by one or more jobs +#[derive(Debug, Clone)] +pub struct BuildingState { + pub building_by: Vec, // job_run_ids +} + +/// State: Partition has been successfully built +#[derive(Debug, Clone)] +pub struct LiveState { + pub built_at: u64, + pub built_by: String, // job_run_id +} + +/// State: Partition build failed +#[derive(Debug, Clone)] +pub struct FailedState { + pub failed_at: u64, + pub failed_by: String, // job_run_id +} + +/// State: Partition has been marked as invalid/tainted +#[derive(Debug, Clone)] +pub struct TaintedState { + pub tainted_at: u64, + pub taint_ids: Vec, +} + +/// Generic partition struct parameterized by state +#[derive(Debug, Clone)] +pub struct PartitionWithState { + pub partition_ref: PartitionRef, + pub want_ids: Vec, + pub state: S, +} + +/// Wrapper enum for storing partitions in collections +#[derive(Debug, Clone)] +pub enum Partition { + Missing(PartitionWithState), + Building(PartitionWithState), + Live(PartitionWithState), + Failed(PartitionWithState), + Tainted(PartitionWithState), +} + +// Type-safe transition methods for MissingState +impl PartitionWithState { + /// Transition from Missing to Building when a job starts building this partition + pub fn start_building(self, job_run_id: String) -> PartitionWithState { + PartitionWithState { + partition_ref: self.partition_ref, + want_ids: self.want_ids, + state: BuildingState { + building_by: vec![job_run_id], + }, + } + } +} + +// Type-safe transition methods for BuildingState +impl PartitionWithState { + /// Transition from Building to Live when a job successfully completes + pub fn complete(self, job_run_id: String, timestamp: u64) -> PartitionWithState { + PartitionWithState { + partition_ref: self.partition_ref, + want_ids: self.want_ids, + state: LiveState { + built_at: timestamp, + built_by: job_run_id, + }, + } + } + + /// Transition from Building to Failed when a job fails + pub fn fail(self, job_run_id: String, timestamp: u64) -> PartitionWithState { + PartitionWithState { + partition_ref: self.partition_ref, + want_ids: self.want_ids, + state: FailedState { + failed_at: timestamp, + failed_by: job_run_id, + }, + } + } + + /// Add another job to the list of jobs building this partition + pub fn add_building_job(mut self, job_run_id: String) -> Self { + if !self.state.building_by.contains(&job_run_id) { + self.state.building_by.push(job_run_id); + } + self + } + + /// Transition from Building back to Missing when a job discovers missing dependencies + pub fn reset_to_missing(self) -> PartitionWithState { + PartitionWithState { + partition_ref: self.partition_ref, + want_ids: self.want_ids, + state: MissingState {}, + } + } +} + +// Type-safe transition methods for LiveState +impl PartitionWithState { + /// Transition from Live to Tainted when a taint is applied + pub fn taint(self, taint_id: String, timestamp: u64) -> PartitionWithState { + PartitionWithState { + partition_ref: self.partition_ref, + want_ids: self.want_ids, + state: TaintedState { + tainted_at: timestamp, + taint_ids: vec![taint_id], + }, + } + } +} + +// Type-safe transition methods for TaintedState +impl PartitionWithState { + /// Add another taint to an already-tainted partition + pub fn add_taint(mut self, taint_id: String) -> Self { + if !self.state.taint_ids.contains(&taint_id) { + self.state.taint_ids.push(taint_id); + } + self + } +} + +// Helper methods on the Partition enum +impl Partition { + /// Create a new partition in the Missing state + pub fn new_missing(partition_ref: PartitionRef) -> Self { + Partition::Missing(PartitionWithState { + partition_ref, + want_ids: vec![], + state: MissingState {}, + }) + } + + /// Get the partition reference from any state + pub fn partition_ref(&self) -> &PartitionRef { + match self { + Partition::Missing(p) => &p.partition_ref, + Partition::Building(p) => &p.partition_ref, + Partition::Live(p) => &p.partition_ref, + Partition::Failed(p) => &p.partition_ref, + Partition::Tainted(p) => &p.partition_ref, + } + } + + /// Get want_ids from any state + pub fn want_ids(&self) -> &Vec { + match self { + Partition::Missing(p) => &p.want_ids, + Partition::Building(p) => &p.want_ids, + Partition::Live(p) => &p.want_ids, + Partition::Failed(p) => &p.want_ids, + Partition::Tainted(p) => &p.want_ids, + } + } + + /// Get mutable want_ids from any state + pub fn want_ids_mut(&mut self) -> &mut Vec { + match self { + Partition::Missing(p) => &mut p.want_ids, + Partition::Building(p) => &mut p.want_ids, + Partition::Live(p) => &mut p.want_ids, + Partition::Failed(p) => &mut p.want_ids, + Partition::Tainted(p) => &mut p.want_ids, + } + } + + /// Check if partition is in Live state + pub fn is_live(&self) -> bool { + matches!(self, Partition::Live(_)) + } + + /// Check if partition is satisfied (Live or Tainted both count as "available") + pub fn is_satisfied(&self) -> bool { + matches!(self, Partition::Live(_) | Partition::Tainted(_)) + } + + /// Check if partition is in a terminal state (Live, Failed, or Tainted) + pub fn is_terminal(&self) -> bool { + matches!( + self, + Partition::Live(_) | Partition::Failed(_) | Partition::Tainted(_) + ) + } + + /// Check if partition is currently being built + pub fn is_building(&self) -> bool { + matches!(self, Partition::Building(_)) + } + + /// Check if partition is missing (referenced but not built) + pub fn is_missing(&self) -> bool { + matches!(self, Partition::Missing(_)) + } + + /// Convert to PartitionDetail for API responses and queries + pub fn to_detail(&self) -> PartitionDetail { + match self { + Partition::Missing(p) => PartitionDetail { + r#ref: Some(p.partition_ref.clone()), + status: Some(PartitionStatus { + code: PartitionStatusCode::PartitionWanted as i32, + name: "PartitionWanted".to_string(), + }), + want_ids: p.want_ids.clone(), + job_run_ids: vec![], + taint_ids: vec![], + last_updated_timestamp: None, + }, + Partition::Building(p) => PartitionDetail { + r#ref: Some(p.partition_ref.clone()), + status: Some(PartitionStatus { + code: PartitionStatusCode::PartitionBuilding as i32, + name: "PartitionBuilding".to_string(), + }), + want_ids: p.want_ids.clone(), + job_run_ids: p.state.building_by.clone(), + taint_ids: vec![], + last_updated_timestamp: None, + }, + Partition::Live(p) => PartitionDetail { + r#ref: Some(p.partition_ref.clone()), + status: Some(PartitionStatus { + code: PartitionStatusCode::PartitionLive as i32, + name: "PartitionLive".to_string(), + }), + want_ids: p.want_ids.clone(), + job_run_ids: vec![p.state.built_by.clone()], + taint_ids: vec![], + last_updated_timestamp: Some(p.state.built_at), + }, + Partition::Failed(p) => PartitionDetail { + r#ref: Some(p.partition_ref.clone()), + status: Some(PartitionStatus { + code: PartitionStatusCode::PartitionFailed as i32, + name: "PartitionFailed".to_string(), + }), + want_ids: p.want_ids.clone(), + job_run_ids: vec![p.state.failed_by.clone()], + taint_ids: vec![], + last_updated_timestamp: Some(p.state.failed_at), + }, + Partition::Tainted(p) => PartitionDetail { + r#ref: Some(p.partition_ref.clone()), + status: Some(PartitionStatus { + code: PartitionStatusCode::PartitionTainted as i32, + name: "PartitionTainted".to_string(), + }), + want_ids: p.want_ids.clone(), + job_run_ids: vec![], + taint_ids: p.state.taint_ids.clone(), + last_updated_timestamp: Some(p.state.tainted_at), + }, + } + } +} diff --git a/docs/ideas/cpn-build-state.md b/docs/ideas/cpn-build-state.md new file mode 100644 index 0000000..2591149 --- /dev/null +++ b/docs/ideas/cpn-build-state.md @@ -0,0 +1,21 @@ + +If you look at the BEL definition, you'll see that there's two components to it, the literal serialized event stream, and the build state, a projection of the events into objects (e.g. via reducer, etc): + +```rust +pub struct BuildEventLog { + pub storage: S, + pub state: BuildState, +} +``` + +`storage` is the literal events that happened: a job run being launched, a want being requested, a job run finishing and producing some number of partitions, etc. `state` answers questions about the state of the world as a result of the serial occurrence of the recorded events, like "is the partition x/y/z live?" and "why hasn't partition a/b/c been built yet"? `state` is essentially the thing responsible for system consistency. + +Most of the code in this project is in calculating next states for build state objects: determining wants that can have jobs run to satisfy them, updating partitions to live after a job run succeeds, etc. Can we formalize this into a composition of state machines to simplify the codebase, achieve more compile-time safety, and potentially unlock greater concurrency as a byproduct? + +CPN concurrency can be describe succinctly: if the workloads touch disjoint places, they can be run concurrently. This seems to overwhelmingly be the case for the domain databuild is interested in, where a single "data service" is traditionally responsible for producing partitions in a given dataset. Another huge benefit to using a CPN framing for databuild is to separate concerns between state updates/consistency and all the stuff that connects to it. + +# Appendix + +## Partition Collisions? + +Random thought, we also have this lingering "what if unrelated wants collide in the partition space", specifically for a paradigm where job runs produce multiple partitions based on their parameterization. This may also give us the confidence to just cancel the later of the colliding jobs and have it reschedule (how would partitions be diff?). Or, given that we update partition building status on job schedule, we would be confident that we just never get into that situation at the later want grouping stage (pre job scheduling), it would see the conflict partition as building thanks to the earlier job being started. Probably worth constructing a literal situation for this to war game it or implement a literal integration test.