From eb443508652d15087db8b96025374c8a1741454f Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Sat, 22 Nov 2025 19:31:08 +0800 Subject: [PATCH] Implement typed IDs (and reformat) --- databuild/build_event_log.rs | 42 +++++-- databuild/build_state.rs | 230 +++++++++++++++++----------------- databuild/event_transforms.rs | 53 ++++---- databuild/job.rs | 16 ++- databuild/job_run.rs | 2 +- databuild/lib.rs | 12 +- databuild/mock_job_run.rs | 16 +-- databuild/orchestrator.rs | 101 +++++++++------ databuild/partition_state.rs | 47 ++++++- databuild/util.rs | 12 +- databuild/want_state.rs | 71 ++++++++++- 11 files changed, 381 insertions(+), 221 deletions(-) diff --git a/databuild/build_event_log.rs b/databuild/build_event_log.rs index b46384a..450f9dd 100644 --- a/databuild/build_event_log.rs +++ b/databuild/build_event_log.rs @@ -1,7 +1,13 @@ use crate::build_state::BuildState; use crate::data_build_event::Event; -use crate::util::{current_timestamp, DatabuildError}; -use crate::{CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, CreateWantRequest, CreateWantResponse, DataBuildEvent, GetTaintRequest, GetTaintResponse, GetWantRequest, GetWantResponse, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, TaintCreateEventV1, WantCancelEventV1, WantCreateEventV1}; +use crate::util::{DatabuildError, current_timestamp}; +use crate::{ + CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, + CreateWantRequest, CreateWantResponse, DataBuildEvent, GetTaintRequest, GetTaintResponse, + GetWantRequest, GetWantResponse, ListJobRunsRequest, ListJobRunsResponse, + ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, + ListWantsRequest, ListWantsResponse, TaintCreateEventV1, WantCancelEventV1, WantCreateEventV1, +}; use prost::Message; use rusqlite::Connection; use std::fmt::Debug; @@ -173,7 +179,6 @@ impl BuildEventLog { Ok(idx) } - // API methods pub fn api_handle_list_wants(&self, req: ListWantsRequest) -> ListWantsResponse { self.state.list_wants(&req) @@ -191,7 +196,10 @@ impl BuildEventLog { self.state.list_job_runs(&req) } - pub fn api_handle_want_create(&mut self, req: CreateWantRequest) -> Result { + pub fn api_handle_want_create( + &mut self, + req: CreateWantRequest, + ) -> Result { let ev: WantCreateEventV1 = req.into(); self.append_event(&ev.clone().into())?; Ok(self.state.get_want(&ev.want_id).into()) @@ -201,13 +209,19 @@ impl BuildEventLog { self.state.get_want(&req.want_id).into() } - pub fn api_handle_want_cancel(&mut self, req: CancelWantRequest) -> Result { + pub fn api_handle_want_cancel( + &mut self, + req: CancelWantRequest, + ) -> Result { let ev: WantCancelEventV1 = req.into(); self.append_event(&ev.clone().into())?; Ok(self.state.get_want(&ev.want_id).into()) } - pub fn api_handle_taint_create(&mut self, req: CreateTaintRequest) -> Result { + pub fn api_handle_taint_create( + &mut self, + req: CreateTaintRequest, + ) -> Result { // TODO Need to do this hierarchically? A taint will impact downstream partitions also todo!(); let ev: TaintCreateEventV1 = req.into(); @@ -264,7 +278,9 @@ mod tests { // Append an event let mut e = WantCreateEventV1::default(); e.want_id = want_id.clone(); - e.partitions = vec!(PartitionRef { r#ref: "sqlite_partition_1234".to_string() }); + e.partitions = vec![PartitionRef { + r#ref: "sqlite_partition_1234".to_string(), + }]; let event_id = log .append_event(&Event::WantCreateV1(e)) .expect("append_event failed"); @@ -298,7 +314,8 @@ mod tests { "want_id not found in state" ); assert_eq!( - log.state.get_want(&want_id) + log.state + .get_want(&want_id) .map(|want| want.want_id.clone()) .expect("state.wants want_id not found"), want_id, @@ -307,13 +324,16 @@ mod tests { let mut e2 = WantCreateEventV1::default(); e2.want_id = Uuid::new_v4().into(); - log.append_event(&Event::WantCreateV1(e2)).expect("append_event failed"); + log.append_event(&Event::WantCreateV1(e2)) + .expect("append_event failed"); let mut e3 = WantCreateEventV1::default(); e3.want_id = Uuid::new_v4().into(); - log.append_event(&Event::WantCreateV1(e3)).expect("append_event failed"); + log.append_event(&Event::WantCreateV1(e3)) + .expect("append_event failed"); let mut e4 = WantCreateEventV1::default(); e4.want_id = Uuid::new_v4().into(); - log.append_event(&Event::WantCreateV1(e4)).expect("append_event failed"); + log.append_event(&Event::WantCreateV1(e4)) + .expect("append_event failed"); let events = log .storage diff --git a/databuild/build_state.rs b/databuild/build_state.rs index 3f0c6b0..b3edb1e 100644 --- a/databuild/build_state.rs +++ b/databuild/build_state.rs @@ -1,8 +1,13 @@ use crate::data_build_event::Event; use crate::data_deps::{WantTimestamps, missing_deps_to_want_events}; -use crate::partition_state::Partition; +use crate::partition_state::{ + FailedPartitionRef, LivePartitionRef, MissingPartitionRef, MissingState, Partition, + PartitionWithState, TaintedPartitionRef, +}; use crate::util::current_timestamp; -use crate::want_state::{IdleState as WantIdleState, Want, WantWithState}; +use crate::want_state::{ + FailedWantId, IdleState as WantIdleState, SuccessfulWantId, Want, WantWithState, +}; use crate::{ JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1, @@ -120,14 +125,14 @@ impl BuildState { /// Used when a job run successfully completes fn transition_partitions_to_live( &mut self, - partition_refs: &[PartitionRef], + partition_refs: &[LivePartitionRef], job_run_id: &str, timestamp: u64, ) { for pref in partition_refs { - let partition = self.partitions.remove(&pref.r#ref).expect(&format!( + let partition = self.partitions.remove(&pref.0.r#ref).expect(&format!( "BUG: Partition {} must exist and be in Building state before completion", - pref.r#ref + pref.0.r#ref )); // ONLY valid transition: Building -> Live @@ -139,11 +144,11 @@ impl BuildState { _ => { panic!( "BUG: Invalid state - partition {} must be Building to transition to Live, found {:?}", - pref.r#ref, partition + pref.0.r#ref, partition ) } }; - self.partitions.insert(pref.r#ref.clone(), transitioned); + self.partitions.insert(pref.0.r#ref.clone(), transitioned); } } @@ -151,14 +156,14 @@ impl BuildState { /// Used when a job run fails fn transition_partitions_to_failed( &mut self, - partition_refs: &[PartitionRef], + partition_refs: &[FailedPartitionRef], job_run_id: &str, timestamp: u64, ) { for pref in partition_refs { - let partition = self.partitions.remove(&pref.r#ref).expect(&format!( + let partition = self.partitions.remove(&pref.0.r#ref).expect(&format!( "BUG: Partition {} must exist and be in Building state before failure", - pref.r#ref + pref.0.r#ref )); // ONLY valid transition: Building -> Failed @@ -170,20 +175,17 @@ impl BuildState { _ => { panic!( "BUG: Invalid state - partition {} must be Building to transition to Failed, found {:?}", - pref.r#ref, partition + pref.0.r#ref, partition ) } }; - self.partitions.insert(pref.r#ref.clone(), transitioned); + self.partitions.insert(pref.0.r#ref.clone(), transitioned); } } /// Reset partitions from Building back to Missing state /// Used when a job run encounters missing dependencies and cannot proceed - fn reset_partitions_to_missing( - &mut self, - partition_refs: &[PartitionRef], - ) { + fn reset_partitions_to_missing(&mut self, partition_refs: &[PartitionRef]) { for pref in partition_refs { let partition = self.partitions.remove(&pref.r#ref).expect(&format!( "BUG: Partition {} must exist and be in Building state during dep_miss", @@ -209,16 +211,16 @@ impl BuildState { /// Transitions Building → Successful, returns list of newly successful want IDs fn complete_successful_wants( &mut self, - newly_live_partitions: &[PartitionRef], + newly_live_partitions: &[LivePartitionRef], job_run_id: &str, timestamp: u64, - ) -> Vec { - let mut newly_successful_wants: Vec = Vec::new(); + ) -> Vec { + let mut newly_successful_wants: Vec = Vec::new(); for pref in newly_live_partitions { let want_ids = self .partitions - .get(&pref.r#ref) + .get(&pref.0.r#ref) .map(|p| p.want_ids().clone()) .unwrap_or_default(); @@ -239,10 +241,10 @@ impl BuildState { }); if all_partitions_live { - newly_successful_wants.push(want_id.clone()); - Want::Successful( - building.complete(job_run_id.to_string(), timestamp), - ) + let successful_want = + building.complete(job_run_id.to_string(), timestamp); + newly_successful_wants.push(successful_want.get_id()); + Want::Successful(successful_want) } else { Want::Building(building) // Still building other partitions } @@ -250,7 +252,7 @@ impl BuildState { _ => { panic!( "BUG: Want {} in invalid state {:?} when partition {} became Live. Should be Building.", - want_id, want, pref.r#ref + want_id, want, pref.0.r#ref ); } }; @@ -267,14 +269,14 @@ impl BuildState { /// Returns list of newly failed want IDs for downstream cascade fn fail_directly_affected_wants( &mut self, - failed_partitions: &[PartitionRef], - ) -> Vec { - let mut newly_failed_wants: Vec = Vec::new(); + failed_partitions: &[FailedPartitionRef], + ) -> Vec { + let mut newly_failed_wants: Vec = Vec::new(); for pref in failed_partitions { let want_ids = self .partitions - .get(&pref.r#ref) + .get(&pref.0.r#ref) .map(|p| p.want_ids().clone()) .unwrap_or_default(); @@ -286,10 +288,10 @@ impl BuildState { let transitioned = match want { Want::Building(building) => { - newly_failed_wants.push(want_id.clone()); - Want::Failed( - building.fail(vec![pref.clone()], "Partition build failed".to_string()), - ) + let failed = building + .fail(vec![pref.0.clone()], "Partition build failed".to_string()); + newly_failed_wants.push(failed.get_id()); + Want::Failed(failed) } // Failed → Failed: add new failed partition to existing failed state Want::Failed(failed) => { @@ -298,7 +300,7 @@ impl BuildState { _ => { panic!( "BUG: Want {} in invalid state {:?} when partition {} failed. Should be Building or Failed.", - want_id, want, pref.r#ref + want_id, want, pref.0.r#ref ); } }; @@ -314,7 +316,7 @@ impl BuildState { /// Transitions UpstreamBuilding → Idle (when ready) or Building (when partitions already building) fn unblock_downstream_wants( &mut self, - newly_successful_wants: &[String], + newly_successful_wants: &[SuccessfulWantId], job_run_id: &str, timestamp: u64, ) { @@ -327,11 +329,10 @@ impl BuildState { match want { Want::UpstreamBuilding(downstream_want) => { // Is this downstream want waiting for any of the newly successful wants? - let is_affected = downstream_want - .state - .upstream_want_ids - .iter() - .any(|up_id| newly_successful_wants.contains(up_id)); + let is_affected = + downstream_want.state.upstream_want_ids.iter().any(|up_id| { + newly_successful_wants.iter().any(|swid| &swid.0 == up_id) + }); if is_affected { Some(id.clone()) } else { None } } _ => None, @@ -374,10 +375,8 @@ impl BuildState { if any_partition_building { // Some partitions still being built, continue in Building state Want::Building( - downstream_want.continue_building( - job_run_id.to_string(), - timestamp, - ), + downstream_want + .continue_building(job_run_id.to_string(), timestamp), ) } else { // No partitions being built, become schedulable again @@ -401,7 +400,7 @@ impl BuildState { /// Transitions UpstreamBuilding → UpstreamFailed fn cascade_failures_to_downstream_wants( &mut self, - newly_failed_wants: &[String], + newly_failed_wants: &[FailedWantId], timestamp: u64, ) { // Find downstream wants that are waiting for any of the newly failed wants @@ -413,11 +412,10 @@ impl BuildState { match want { Want::UpstreamBuilding(downstream_want) => { // Is this downstream want waiting for any of the newly failed wants? - let is_affected = downstream_want - .state - .upstream_want_ids - .iter() - .any(|up_id| newly_failed_wants.contains(up_id)); + let is_affected = + downstream_want.state.upstream_want_ids.iter().any(|up_id| { + newly_failed_wants.iter().any(|fwid| &fwid.0 == up_id) + }); if is_affected { Some(id.clone()) } else { None } } _ => None, @@ -433,8 +431,13 @@ impl BuildState { let transitioned = match want { Want::UpstreamBuilding(downstream_want) => Want::UpstreamFailed( - downstream_want - .upstream_failed(newly_failed_wants.to_vec(), timestamp), + downstream_want.upstream_failed( + newly_failed_wants + .iter() + .map(|fwid| fwid.0.clone()) + .collect(), + timestamp, + ), ), _ => { panic!("BUG: Want {} should be UpstreamBuilding here", want_id); @@ -570,10 +573,7 @@ impl BuildState { } } - fn handle_want_create( - &mut self, - event: &WantCreateEventV1, - ) -> Vec { + fn handle_want_create(&mut self, event: &WantCreateEventV1) -> Vec { // Use From impl to create want in Idle state let want_idle: WantWithState = event.clone().into(); self.wants @@ -587,10 +587,7 @@ impl BuildState { vec![] } - fn handle_want_cancel( - &mut self, - event: &WantCancelEventV1, - ) -> Vec { + fn handle_want_cancel(&mut self, event: &WantCancelEventV1) -> Vec { // TODO actually cancel in-progress job runs that no longer have a sponsoring want // Type-safe transition (API layer should prevent canceling terminal wants) @@ -626,10 +623,7 @@ impl BuildState { vec![] } - fn handle_job_run_buffer( - &mut self, - event: &JobRunBufferEventV1, - ) -> Vec { + fn handle_job_run_buffer(&mut self, event: &JobRunBufferEventV1) -> Vec { // No job run should exist - if it does, that's a BUG in the orchestrator if self.job_runs.get(&event.job_run_id).is_some() { panic!( @@ -683,11 +677,7 @@ impl BuildState { vec![] } - fn update_job_run_status( - &mut self, - job_run_id: &str, - status: JobRunStatusCode, - ) { + fn update_job_run_status(&mut self, job_run_id: &str, status: JobRunStatusCode) { let job_run = self.job_runs.get_mut(job_run_id).expect(&format!( "BUG: Job run ID {} must exist to update status", job_run_id @@ -696,52 +686,69 @@ impl BuildState { job_run.status = Some(status.into()); } - fn handle_job_run_heartbeat( - &mut self, - event: &JobRunHeartbeatEventV1, - ) -> Vec { + fn handle_job_run_heartbeat(&mut self, event: &JobRunHeartbeatEventV1) -> Vec { self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunRunning); vec![] } - fn handle_job_run_success( - &mut self, - event: &JobRunSuccessEventV1, - ) -> Vec { + fn handle_job_run_success(&mut self, event: &JobRunSuccessEventV1) -> Vec { println!("Job run success event: {:?}", event); self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunSucceeded); let job_run = self.get_job_run(&event.job_run_id).unwrap(); // Clone building_partitions before we use it multiple times - let newly_live_partitions: Vec = job_run.building_partitions.clone(); + // TODO correct this explicit upcasting of partition ref type + let newly_live_partitions: Vec = job_run + .building_partitions + .iter() + .map(|pref| LivePartitionRef(pref.clone())) + .collect(); // Update partitions being built by this job (strict type-safe transitions) - self.transition_partitions_to_live(&newly_live_partitions, &event.job_run_id, current_timestamp()); + self.transition_partitions_to_live( + &newly_live_partitions, + &event.job_run_id, + current_timestamp(), + ); // Building → Successful (when all partitions Live) - let newly_successful_wants = self.complete_successful_wants(&newly_live_partitions, &event.job_run_id, current_timestamp()); + let newly_successful_wants: Vec = self.complete_successful_wants( + &newly_live_partitions, + &event.job_run_id, + current_timestamp(), + ); // UpstreamBuilding → Idle/Building (for downstream wants waiting on newly successful wants) - self.unblock_downstream_wants(&newly_successful_wants, &event.job_run_id, current_timestamp()); + self.unblock_downstream_wants( + &newly_successful_wants, + &event.job_run_id, + current_timestamp(), + ); vec![] } - fn handle_job_run_failure( - &mut self, - event: &JobRunFailureEventV1, - ) -> Vec { + fn handle_job_run_failure(&mut self, event: &JobRunFailureEventV1) -> Vec { self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed); let job_run = self.get_job_run(&event.job_run_id).unwrap(); // Clone building_partitions before we use it multiple times - let failed_partitions: Vec = job_run.building_partitions.clone(); + let failed_partitions: Vec = job_run + .building_partitions + .iter() + .map(|pref| FailedPartitionRef(pref.clone())) + .collect(); // Transition partitions using strict type-safe methods - self.transition_partitions_to_failed(&failed_partitions, &event.job_run_id, current_timestamp()); + self.transition_partitions_to_failed( + &failed_partitions, + &event.job_run_id, + current_timestamp(), + ); // Building → Failed (for wants directly building failed partitions) - let newly_failed_wants = self.fail_directly_affected_wants(&failed_partitions); + let newly_failed_wants: Vec = + self.fail_directly_affected_wants(&failed_partitions); // UpstreamBuilding → UpstreamFailed (for downstream wants waiting on newly failed wants) self.cascade_failures_to_downstream_wants(&newly_failed_wants, current_timestamp()); @@ -749,17 +756,11 @@ impl BuildState { vec![] } - fn handle_job_run_cancel( - &mut self, - _event: &JobRunCancelEventV1, - ) -> Vec { + fn handle_job_run_cancel(&mut self, _event: &JobRunCancelEventV1) -> Vec { todo!("should update already inserted job run, partition status, want status") } - pub fn handle_job_run_dep_miss( - &mut self, - event: &JobRunMissingDepsEventV1, - ) -> Vec { + pub fn handle_job_run_dep_miss(&mut self, event: &JobRunMissingDepsEventV1) -> Vec { let job_run_detail = self.get_job_run(&event.job_run_id).expect(&format!( "BUG: Unable to find job run with id `{}`", event.job_run_id @@ -801,17 +802,11 @@ impl BuildState { want_events } - fn handle_taint_create( - &mut self, - _event: &TaintCreateEventV1, - ) -> Vec { + fn handle_taint_create(&mut self, _event: &TaintCreateEventV1) -> Vec { todo!("...?") } - fn handle_taint_delete( - &mut self, - _event: &TaintCancelEventV1, - ) -> Vec { + fn handle_taint_delete(&mut self, _event: &TaintCancelEventV1) -> Vec { todo!("...?") } @@ -913,22 +908,25 @@ impl BuildState { */ pub fn want_schedulability(&self, want: &WantDetail) -> WantSchedulability { // Use type-safe partition checks from partitions - let mut live: Vec = Vec::new(); - let mut tainted: Vec = Vec::new(); - let mut missing: Vec = Vec::new(); + let mut live: Vec = Vec::new(); + let mut tainted: Vec = Vec::new(); + let mut missing: Vec = Vec::new(); for upstream_ref in &want.upstreams { match self.partitions.get(&upstream_ref.r#ref) { Some(partition) => { - if partition.is_live() { - live.push(upstream_ref.clone()); - } else if matches!(partition, Partition::Tainted(_)) { - tainted.push(upstream_ref.clone()); + match partition { + Partition::Live(p) => live.push(p.get_ref()), + Partition::Tainted(p) => tainted.push(p.get_ref()), + Partition::Missing(p) => missing.push(p.get_ref()), + _ => (), // Other states (Missing, Building, Failed) don't add to any list } - // Other states (Missing, Building, Failed) don't add to any list } None => { - missing.push(upstream_ref.clone()); + // TODO this definitely feels dirty, but we can't take a mutable ref of self to + // insert the missing partition here, and it feels a little over the top to + // create a more elaborate way to mint a missing ref. + missing.push(MissingPartitionRef(upstream_ref.clone())); } } } @@ -958,9 +956,9 @@ impl BuildState { /// The status of partitions required by a want to build (sensed from dep miss job run) #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct WantUpstreamStatus { - pub live: Vec, - pub tainted: Vec, - pub missing: Vec, + pub live: Vec, + pub tainted: Vec, + pub missing: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, Default)] diff --git a/databuild/event_transforms.rs b/databuild/event_transforms.rs index 6d1340d..cb91ccb 100644 --- a/databuild/event_transforms.rs +++ b/databuild/event_transforms.rs @@ -1,8 +1,15 @@ -use uuid::Uuid; +use crate::PartitionStatusCode::{PartitionFailed, PartitionLive}; use crate::data_build_event::Event; use crate::util::current_timestamp; -use crate::{event_source, CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, CreateWantRequest, CreateWantResponse, EventSource, GetWantResponse, JobRunBufferEventV1, JobRunDetail, JobRunStatus, JobRunStatusCode, JobTriggeredEvent, ManuallyTriggeredEvent, PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantAttributedPartitions, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode}; -use crate::PartitionStatusCode::{PartitionFailed, PartitionLive}; +use crate::{ + CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, + CreateWantRequest, CreateWantResponse, EventSource, GetWantResponse, JobRunBufferEventV1, + JobRunDetail, JobRunStatus, JobRunStatusCode, JobTriggeredEvent, ManuallyTriggeredEvent, + PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, TaintCancelEventV1, + TaintCreateEventV1, TaintDetail, WantAttributedPartitions, WantCancelEventV1, + WantCreateEventV1, WantDetail, WantStatus, WantStatusCode, event_source, +}; +use uuid::Uuid; impl From<&WantCreateEventV1> for WantDetail { fn from(e: &WantCreateEventV1) -> Self { @@ -76,25 +83,31 @@ impl From for JobRunDetail { } } - -pub fn want_status_matches_any(pds: &Vec>, status: PartitionStatusCode) -> bool { - pds.iter() - .any(|pd| pd.clone() +pub fn want_status_matches_any( + pds: &Vec>, + status: PartitionStatusCode, +) -> bool { + pds.iter().any(|pd| { + pd.clone() .map(|pd| pd.status == Some(status.into())) - .unwrap_or(false)) + .unwrap_or(false) + }) } -pub fn want_status_matches_all(pds: &Vec>, status: PartitionStatusCode) -> bool { - pds.iter() - .all(|pd| pd.clone() +pub fn want_status_matches_all( + pds: &Vec>, + status: PartitionStatusCode, +) -> bool { + pds.iter().all(|pd| { + pd.clone() .map(|pd| pd.status == Some(status.into())) - .unwrap_or(false)) + .unwrap_or(false) + }) } /// Merges a list of partition details into a single status code. /// Takes the lowest state as the want status. impl Into for Vec> { - fn into(self) -> WantStatusCode { if want_status_matches_any(&self, PartitionFailed) { WantStatusCode::WantFailed @@ -175,17 +188,13 @@ impl From for WantCreateEventV1 { impl Into for Option { fn into(self) -> CreateWantResponse { - CreateWantResponse { - data: self, - } + CreateWantResponse { data: self } } } impl Into for Option { fn into(self) -> GetWantResponse { - GetWantResponse { - data: self, - } + GetWantResponse { data: self } } } @@ -201,9 +210,7 @@ impl From for WantCancelEventV1 { impl Into for Option { fn into(self) -> CancelWantResponse { - CancelWantResponse { - data: self, - } + CancelWantResponse { data: self } } } @@ -219,4 +226,4 @@ impl Into for Option { // TODO } } -} \ No newline at end of file +} diff --git a/databuild/job.rs b/databuild/job.rs index 759f1a4..feabb8d 100644 --- a/databuild/job.rs +++ b/databuild/job.rs @@ -1,7 +1,7 @@ use crate::job_run::{JobRun, SubProcessBackend}; +use crate::util::DatabuildError; use crate::{JobConfig, PartitionRef, WantDetail}; use regex::Regex; -use crate::util::DatabuildError; #[derive(Debug, Clone)] pub struct JobConfiguration { @@ -12,17 +12,21 @@ pub struct JobConfiguration { impl JobConfiguration { /** Launch job to build the partitions specified by the provided wants. */ - pub fn spawn(&self, wants: Vec) -> Result, std::io::Error> { - let wanted_refs: Vec = - wants.iter().flat_map(|want| want.partitions.clone()).collect(); + pub fn spawn( + &self, + wants: Vec, + ) -> Result, std::io::Error> { + let wanted_refs: Vec = wants + .iter() + .flat_map(|want| want.partitions.clone()) + .collect(); let args: Vec = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect(); Ok(JobRun::spawn(self.entry_point.clone(), args)) } pub fn matches(&self, refs: &PartitionRef) -> bool { self.patterns.iter().any(|pattern| { - let regex = - Regex::new(&pattern).expect(&format!("Invalid regex pattern: {}", pattern)); + let regex = Regex::new(&pattern).expect(&format!("Invalid regex pattern: {}", pattern)); regex.is_match(&refs.r#ref) }) } diff --git a/databuild/job_run.rs b/databuild/job_run.rs index 08c8299..055acd0 100644 --- a/databuild/job_run.rs +++ b/databuild/job_run.rs @@ -417,7 +417,7 @@ impl ToEvent for SubProcessDepMiss { mod tests { use crate::data_build_event::Event; use crate::data_deps::DATABUILD_MISSING_DEPS_JSON; - use crate::job_run::{JobRun, JobRunBackend, VisitResult, SubProcessBackend}; + use crate::job_run::{JobRun, JobRunBackend, SubProcessBackend, VisitResult}; use crate::mock_job_run::MockJobRun; use crate::{JobRunMissingDeps, MissingDeps}; diff --git a/databuild/lib.rs b/databuild/lib.rs index ac9339c..311bba7 100644 --- a/databuild/lib.rs +++ b/databuild/lib.rs @@ -1,13 +1,13 @@ mod build_event_log; -mod orchestrator; -mod job_run; +mod build_state; +mod data_deps; +mod event_transforms; mod job; +mod job_run; +mod mock_job_run; +mod orchestrator; mod partition_state; mod util; -mod build_state; -mod event_transforms; -mod data_deps; -mod mock_job_run; mod want_state; // Include generated protobuf code diff --git a/databuild/mock_job_run.rs b/databuild/mock_job_run.rs index 4089836..c69c1a3 100644 --- a/databuild/mock_job_run.rs +++ b/databuild/mock_job_run.rs @@ -1,6 +1,6 @@ -use std::collections::HashMap; use crate::data_deps::DataDepLogLine; use crate::{JobRunMissingDeps, MissingDeps}; +use std::collections::HashMap; pub struct MockJobRun { sleep_ms: u64, @@ -54,13 +54,13 @@ impl MockJobRun { } pub fn dep_miss(self, missing_deps: Vec) -> Self { - self.exit_code(1) - .stdout_msg( - &DataDepLogLine::DepMiss(JobRunMissingDeps { - version: "1".to_string(), - missing_deps, - }).into() - ) + self.exit_code(1).stdout_msg( + &DataDepLogLine::DepMiss(JobRunMissingDeps { + version: "1".to_string(), + missing_deps, + }) + .into(), + ) } pub fn to_env(&self) -> HashMap { diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index 434d038..f57e465 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -97,7 +97,6 @@ struct WantGroup { wants: Vec, } - #[derive(Debug, Clone)] struct GroupedWants { want_groups: Vec, @@ -151,32 +150,30 @@ impl Orchestrator { let mut new_jobs = Vec::new(); for job in self.job_runs.drain(..) { let transitioned = match job { - JobRun::Running(running) => { - match running.visit()? { - VisitResult::StillRunning(still_running) => { - println!("Still running job: {:?}", still_running.job_run_id); - JobRun::Running(still_running) - } - VisitResult::Completed(completed) => { - println!("Completed job: {:?}", completed.job_run_id); - let event = completed.state.to_event(&completed.job_run_id); - self.bel.append_event(&event)?; - JobRun::Completed(completed) - } - VisitResult::Failed(failed) => { - println!("Failed job: {:?}", failed.job_run_id); - let event = failed.state.to_event(&failed.job_run_id); - self.bel.append_event(&event)?; - JobRun::Failed(failed) - } - VisitResult::DepMiss(dep_miss) => { - println!("Dep miss job: {:?}", dep_miss.job_run_id); - let event = dep_miss.state.to_event(&dep_miss.job_run_id); - self.bel.append_event(&event)?; - JobRun::DepMiss(dep_miss) - } + JobRun::Running(running) => match running.visit()? { + VisitResult::StillRunning(still_running) => { + println!("Still running job: {:?}", still_running.job_run_id); + JobRun::Running(still_running) } - } + VisitResult::Completed(completed) => { + println!("Completed job: {:?}", completed.job_run_id); + let event = completed.state.to_event(&completed.job_run_id); + self.bel.append_event(&event)?; + JobRun::Completed(completed) + } + VisitResult::Failed(failed) => { + println!("Failed job: {:?}", failed.job_run_id); + let event = failed.state.to_event(&failed.job_run_id); + self.bel.append_event(&event)?; + JobRun::Failed(failed) + } + VisitResult::DepMiss(dep_miss) => { + println!("Dep miss job: {:?}", dep_miss.job_run_id); + let event = dep_miss.state.to_event(&dep_miss.job_run_id); + self.bel.append_event(&event)?; + JobRun::DepMiss(dep_miss) + } + }, other => other, // Pass through all non-running states unchanged }; new_jobs.push(transitioned); @@ -231,8 +228,11 @@ impl Orchestrator { use crate::job_run::JobRun; // Compute args from wants the same way JobConfiguration::spawn() does - let wanted_refs: Vec = - wg.wants.iter().flat_map(|want| want.partitions.clone()).collect(); + let wanted_refs: Vec = wg + .wants + .iter() + .flat_map(|want| want.partitions.clone()) + .collect(); let args: Vec = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect(); let job_run = JobRun::spawn(wg.job.entry_point.clone(), args); @@ -264,7 +264,10 @@ impl Orchestrator { #[cfg(test)] fn count_running_jobs(&self) -> usize { use crate::job_run::JobRun; - self.job_runs.iter().filter(|j| matches!(j, JobRun::Running(_))).count() + self.job_runs + .iter() + .filter(|j| matches!(j, JobRun::Running(_))) + .count() } #[cfg(test)] @@ -275,19 +278,28 @@ impl Orchestrator { #[cfg(test)] fn count_not_started_jobs(&self) -> usize { use crate::job_run::JobRun; - self.job_runs.iter().filter(|j| matches!(j, JobRun::NotStarted(_))).count() + self.job_runs + .iter() + .filter(|j| matches!(j, JobRun::NotStarted(_))) + .count() } #[cfg(test)] fn count_dep_miss_jobs(&self) -> usize { use crate::job_run::JobRun; - self.job_runs.iter().filter(|j| matches!(j, JobRun::DepMiss(_))).count() + self.job_runs + .iter() + .filter(|j| matches!(j, JobRun::DepMiss(_))) + .count() } #[cfg(test)] fn count_completed_jobs(&self) -> usize { use crate::job_run::JobRun; - self.job_runs.iter().filter(|j| matches!(j, JobRun::Completed(_))).count() + self.job_runs + .iter() + .filter(|j| matches!(j, JobRun::Completed(_))) + .count() } /** Entrypoint for running jobs */ @@ -433,9 +445,17 @@ mod tests { assert_eq!(orchestrator.count_not_started_jobs(), 1); // Verify the job has the right args by checking the first NotStarted job use crate::job_run::JobRun; - let not_started_job = orchestrator.job_runs.iter().find(|j| matches!(j, JobRun::NotStarted(_))).unwrap(); + let not_started_job = orchestrator + .job_runs + .iter() + .find(|j| matches!(j, JobRun::NotStarted(_))) + .unwrap(); if let JobRun::NotStarted(job) = not_started_job { - assert_eq!(job.state.args, vec!["data/alpha"], "should have scheduled alpha job"); + assert_eq!( + job.state.args, + vec!["data/alpha"], + "should have scheduled alpha job" + ); } assert_eq!(orchestrator.bel.state.count_job_runs(), 1); } @@ -599,9 +619,7 @@ mod tests { thread::sleep(Duration::from_millis(1)); // Should still be running after 1ms - orchestrator - .step() - .expect("should still be running"); + orchestrator.step().expect("should still be running"); assert_eq!(orchestrator.count_running_jobs(), 1); assert_eq!(orchestrator.bel.state.count_job_runs(), 1); println!("STATE: {:?}", orchestrator.bel.state); @@ -773,7 +791,8 @@ echo 'Beta succeeded' .collect(); assert!( - beta_wants.iter().any(|w| w.status.as_ref().map(|s| s.code) == Some(WantStatusCode::WantUpstreamBuilding as i32)), + beta_wants.iter().any(|w| w.status.as_ref().map(|s| s.code) + == Some(WantStatusCode::WantUpstreamBuilding as i32)), "At least one beta want should be in UpstreamBuilding state, found: {:?}", beta_wants.iter().map(|w| &w.status).collect::>() ); @@ -819,7 +838,11 @@ echo 'Beta succeeded' // Step 7: Beta is rescheduled and started (want -> running_jobs) orchestrator.step().expect("step 7"); - assert_eq!(orchestrator.count_running_jobs(), 1, "beta should be running"); + assert_eq!( + orchestrator.count_running_jobs(), + 1, + "beta should be running" + ); // Step 8: Beta completes successfully wait_for_jobs_to_complete(&mut orchestrator, 10).expect("beta job should complete"); diff --git a/databuild/partition_state.rs b/databuild/partition_state.rs index 21f4d35..05e2f92 100644 --- a/databuild/partition_state.rs +++ b/databuild/partition_state.rs @@ -1,4 +1,5 @@ -use crate::{PartitionRef, PartitionDetail, PartitionStatus, PartitionStatusCode}; +use crate::{PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode}; +use serde::{Deserialize, Serialize}; /// State: Partition has been referenced but not yet built #[derive(Debug, Clone)] @@ -49,6 +50,50 @@ pub enum Partition { Tainted(PartitionWithState), } +/// Type-safe partition reference wrappers that encode state expectations in function signatures. It +/// is critical that these be treated with respect, not just summoned because it's convenient. +/// These should be created ephemerally from typestate objects via .get_ref() and used +/// immediately — never stored long-term, as partition state can change. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct MissingPartitionRef(pub PartitionRef); +impl PartitionWithState { + pub fn get_ref(&self) -> MissingPartitionRef { + MissingPartitionRef(self.partition_ref.clone()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct BuildingPartitionRef(pub PartitionRef); +impl PartitionWithState { + pub fn get_ref(&self) -> BuildingPartitionRef { + BuildingPartitionRef(self.partition_ref.clone()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LivePartitionRef(pub PartitionRef); +impl PartitionWithState { + pub fn get_ref(&self) -> LivePartitionRef { + LivePartitionRef(self.partition_ref.clone()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct FailedPartitionRef(pub PartitionRef); +impl PartitionWithState { + pub fn get_ref(&self) -> FailedPartitionRef { + FailedPartitionRef(self.partition_ref.clone()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct TaintedPartitionRef(pub PartitionRef); +impl PartitionWithState { + pub fn get_ref(&self) -> TaintedPartitionRef { + TaintedPartitionRef(self.partition_ref.clone()) + } +} + // Type-safe transition methods for MissingState impl PartitionWithState { /// Transition from Missing to Building when a job starts building this partition diff --git a/databuild/util.rs b/databuild/util.rs index 826eaec..cfd5738 100644 --- a/databuild/util.rs +++ b/databuild/util.rs @@ -1,5 +1,5 @@ -use std::time::{SystemTime, UNIX_EPOCH}; use std::backtrace::Backtrace; +use std::time::{SystemTime, UNIX_EPOCH}; pub fn current_timestamp() -> u64 { let now = SystemTime::now(); @@ -27,7 +27,7 @@ impl DatabuildError { Self { msg: msg.into(), source: None, - backtrace: maybe_backtrace() + backtrace: maybe_backtrace(), } } } @@ -37,7 +37,7 @@ impl From for DatabuildError { Self { msg: err.to_string(), source: Some(Box::new(err)), - backtrace: maybe_backtrace() + backtrace: maybe_backtrace(), } } } @@ -47,7 +47,7 @@ impl From for DatabuildError { Self { msg: err.to_string(), source: Some(Box::new(err)), - backtrace: maybe_backtrace() + backtrace: maybe_backtrace(), } } } @@ -57,7 +57,7 @@ impl From for DatabuildError { Self { msg: err.to_string(), source: Some(Box::new(err)), - backtrace: maybe_backtrace() + backtrace: maybe_backtrace(), } } } @@ -67,7 +67,7 @@ impl From for DatabuildError { Self { msg: err.to_string(), source: Some(Box::new(err)), - backtrace: maybe_backtrace() + backtrace: maybe_backtrace(), } } } diff --git a/databuild/want_state.rs b/databuild/want_state.rs index d2f2adc..299409c 100644 --- a/databuild/want_state.rs +++ b/databuild/want_state.rs @@ -1,5 +1,8 @@ +use crate::partition_state::FailedPartitionRef; use crate::util::current_timestamp; use crate::{EventSource, PartitionRef, WantCreateEventV1, WantDetail, WantStatusCode}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; /// State: Want has been created and is ready to be scheduled #[derive(Debug, Clone)] @@ -101,6 +104,66 @@ pub enum Want { Canceled(WantWithState), } +/// Type-safe partition reference wrappers that encode state expectations in function signatures. It +/// is critical that these be treated with respect, not just summoned because it's convenient. +/// These should be created ephemerally from typestate objects via .get_ref() and used +/// immediately — never stored long-term, as partition state can change. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct IdleWantId(pub String); +impl WantWithState { + pub fn get_id(&self) -> IdleWantId { + IdleWantId(self.want.want_id.clone()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct BuildingWantId(pub String); +impl WantWithState { + pub fn get_id(&self) -> BuildingWantId { + BuildingWantId(self.want.want_id.clone()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct UpstreamBuildingWantId(pub String); +impl WantWithState { + pub fn get_id(&self) -> UpstreamBuildingWantId { + UpstreamBuildingWantId(self.want.want_id.clone()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct SuccessfulWantId(pub String); +impl WantWithState { + pub fn get_id(&self) -> SuccessfulWantId { + SuccessfulWantId(self.want.want_id.clone()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct FailedWantId(pub String); +impl WantWithState { + pub fn get_id(&self) -> FailedWantId { + FailedWantId(self.want.want_id.clone()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct UpstreamFailedWantId(pub String); +impl WantWithState { + pub fn get_id(&self) -> UpstreamFailedWantId { + UpstreamFailedWantId(self.want.want_id.clone()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct CanceledWantId(pub String); +impl WantWithState { + pub fn get_id(&self) -> CanceledWantId { + CanceledWantId(self.want.want_id.clone()) + } +} + // From impl for creating want from event impl From for WantWithState { fn from(event: WantCreateEventV1) -> Self { @@ -207,20 +270,20 @@ impl WantWithState { // Type-safe transition methods for FailedState impl WantWithState { /// Add more failed partitions to an already-failed want (self-transition) - pub fn add_failed_partitions(mut self, partition_refs: Vec) -> Self { + pub fn add_failed_partitions(mut self, partition_refs: Vec) -> Self { for partition_ref in partition_refs { if self .state .failed_partition_refs .iter() - .any(|p| p.r#ref == partition_ref.r#ref) + .any(|p| p.r#ref == partition_ref.0.r#ref) { panic!( "BUG: Attempted to add failed partition {} that already exists in want {}", - partition_ref.r#ref, self.want.want_id + partition_ref.0.r#ref, self.want.want_id ); } - self.state.failed_partition_refs.push(partition_ref); + self.state.failed_partition_refs.push(partition_ref.0); } WantWithState {