From cf163b294d8cf81240ff89a8531b6a0997bc613d Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Sat, 22 Nov 2025 20:27:37 +0800 Subject: [PATCH] Refactor job run into type-state state machines + typed IDs --- databuild/build_state.rs | 231 ++++++++++++++++------- databuild/databuild.proto | 1 + databuild/event_transforms.rs | 16 ++ databuild/job.rs | 6 +- databuild/job_run.rs | 93 ++++----- databuild/job_run_state.rs | 342 ++++++++++++++++++++++++++++++++++ databuild/lib.rs | 1 + databuild/orchestrator.rs | 62 +++--- 8 files changed, 617 insertions(+), 135 deletions(-) create mode 100644 databuild/job_run_state.rs diff --git a/databuild/build_state.rs b/databuild/build_state.rs index b3edb1e..d693366 100644 --- a/databuild/build_state.rs +++ b/databuild/build_state.rs @@ -1,8 +1,9 @@ use crate::data_build_event::Event; use crate::data_deps::{WantTimestamps, missing_deps_to_want_events}; +use crate::job_run_state::{JobRun, JobRunWithState, QueuedState as JobQueuedState}; use crate::partition_state::{ - FailedPartitionRef, LivePartitionRef, MissingPartitionRef, MissingState, Partition, - PartitionWithState, TaintedPartitionRef, + BuildingPartitionRef, FailedPartitionRef, LivePartitionRef, MissingPartitionRef, MissingState, + Partition, PartitionWithState, TaintedPartitionRef, }; use crate::util::current_timestamp; use crate::want_state::{ @@ -49,7 +50,7 @@ pub struct BuildState { wants: BTreeMap, // Type-safe want storage taints: BTreeMap, partitions: BTreeMap, // Type-safe partition storage - job_runs: BTreeMap, + job_runs: BTreeMap, // Type-safe job run storage } impl Default for BuildState { @@ -89,11 +90,11 @@ impl BuildState { /// Used when a job run starts building partitions fn transition_partitions_to_building( &mut self, - partition_refs: &[PartitionRef], + partition_refs: &[BuildingPartitionRef], job_run_id: &str, ) { - for pref in partition_refs { - if let Some(partition) = self.partitions.remove(&pref.r#ref) { + for building_ref in partition_refs { + if let Some(partition) = self.partitions.remove(&building_ref.0.r#ref) { // Partition exists - transition based on current state let transitioned = match partition { // Valid: Missing -> Building @@ -104,18 +105,19 @@ impl BuildState { _ => { panic!( "BUG: Invalid state - partition {} cannot start building from state {:?}", - pref.r#ref, partition + building_ref.0.r#ref, partition ) } }; - self.partitions.insert(pref.r#ref.clone(), transitioned); + self.partitions + .insert(building_ref.0.r#ref.clone(), transitioned); } else { // Partition doesn't exist yet - create in Missing then transition to Building - let missing = Partition::new_missing(pref.clone()); + let missing = Partition::new_missing(building_ref.0.clone()); if let Partition::Missing(m) = missing { let building = m.start_building(job_run_id.to_string()); self.partitions - .insert(pref.r#ref.clone(), Partition::Building(building)); + .insert(building_ref.0.r#ref.clone(), Partition::Building(building)); } } } @@ -185,12 +187,15 @@ impl BuildState { /// Reset partitions from Building back to Missing state /// Used when a job run encounters missing dependencies and cannot proceed - fn reset_partitions_to_missing(&mut self, partition_refs: &[PartitionRef]) { - for pref in partition_refs { - let partition = self.partitions.remove(&pref.r#ref).expect(&format!( - "BUG: Partition {} must exist and be in Building state during dep_miss", - pref.r#ref - )); + fn reset_partitions_to_missing(&mut self, partition_refs: &[BuildingPartitionRef]) { + for building_ref in partition_refs { + let partition = self + .partitions + .remove(&building_ref.0.r#ref) + .expect(&format!( + "BUG: Partition {} must exist and be in Building state during dep_miss", + building_ref.0.r#ref + )); // Only valid transition: Building -> Missing let transitioned = match partition { @@ -199,11 +204,12 @@ impl BuildState { _ => { panic!( "BUG: Invalid state - partition {} must be Building during dep_miss, found {:?}", - pref.r#ref, partition + building_ref.0.r#ref, partition ) } }; - self.partitions.insert(pref.r#ref.clone(), transitioned); + self.partitions + .insert(building_ref.0.r#ref.clone(), transitioned); } } @@ -632,8 +638,8 @@ impl BuildState { ); } - // Create job run to be inserted - let job_run: JobRunDetail = event.clone().into(); + // Create job run in Queued state + let queued: JobRunWithState = event.clone().into(); // Transition wants to Building // Valid states when job buffer event arrives: @@ -642,7 +648,7 @@ impl BuildState { // Invalid states (panic - indicates orchestrator bug): // - UpstreamBuilding: Not schedulable, waiting for dependencies // - Successful/Failed/UpstreamFailed/Canceled: Terminal states, not schedulable - for wap in &job_run.servicing_wants { + for wap in &queued.info.servicing_wants { let want = self.wants.remove(&wap.want_id).expect(&format!( "BUG: Want {} must exist when job buffer event received", wap.want_id @@ -668,41 +674,64 @@ impl BuildState { self.wants.insert(wap.want_id.clone(), transitioned); } + // Get building partition refs from queued job - job is source of truth for building partitions + let building_refs: Vec = queued + .info + .building_partitions + .iter() + .map(|p| BuildingPartitionRef(p.clone())) + .collect(); + // Transition partitions to Building state - self.transition_partitions_to_building(&job_run.building_partitions, &event.job_run_id); + self.transition_partitions_to_building(&building_refs, &event.job_run_id); self.job_runs - .insert(event.job_run_id.clone(), job_run.clone()); - println!("Inserted job run: {:?}", job_run); + .insert(event.job_run_id.clone(), JobRun::Queued(queued)); vec![] } - fn update_job_run_status(&mut self, job_run_id: &str, status: JobRunStatusCode) { - let job_run = self.job_runs.get_mut(job_run_id).expect(&format!( - "BUG: Job run ID {} must exist to update status", - job_run_id - )); - job_run.last_heartbeat_at = Some(current_timestamp()); - job_run.status = Some(status.into()); - } - fn handle_job_run_heartbeat(&mut self, event: &JobRunHeartbeatEventV1) -> Vec { - self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunRunning); + let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!( + "BUG: Job run {} must exist when heartbeat received", + event.job_run_id + )); + + let running = match job_run { + // First heartbeat: Queued -> Running + JobRun::Queued(queued) => queued.start_running(current_timestamp()), + // Subsequent heartbeat: update timestamp + JobRun::Running(running) => running.heartbeat(current_timestamp()), + _ => { + panic!( + "BUG: Heartbeat received for job run {} in invalid state {:?}", + event.job_run_id, job_run + ); + } + }; + + self.job_runs + .insert(event.job_run_id.clone(), JobRun::Running(running)); vec![] } fn handle_job_run_success(&mut self, event: &JobRunSuccessEventV1) -> Vec { - println!("Job run success event: {:?}", event); - self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunSucceeded); - let job_run = self.get_job_run(&event.job_run_id).unwrap(); + let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!( + "BUG: Job run {} must exist when success event received", + event.job_run_id + )); - // Clone building_partitions before we use it multiple times - // TODO correct this explicit upcasting of partition ref type - let newly_live_partitions: Vec = job_run - .building_partitions - .iter() - .map(|pref| LivePartitionRef(pref.clone())) - .collect(); + let succeeded = match job_run { + JobRun::Running(running) => running.succeed(current_timestamp()), + _ => { + panic!( + "BUG: Success event received for job run {} in invalid state {:?}. Job must be Running to succeed.", + event.job_run_id, job_run + ); + } + }; + + // Job run success is SOURCE of truth that partitions are live + let newly_live_partitions = succeeded.get_completed_partitions(); // Update partitions being built by this job (strict type-safe transitions) self.transition_partitions_to_live( @@ -725,19 +754,29 @@ impl BuildState { current_timestamp(), ); + self.job_runs + .insert(event.job_run_id.clone(), JobRun::Succeeded(succeeded)); vec![] } fn handle_job_run_failure(&mut self, event: &JobRunFailureEventV1) -> Vec { - self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed); - let job_run = self.get_job_run(&event.job_run_id).unwrap(); + let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!( + "BUG: Job run {} must exist when failure event received", + event.job_run_id + )); - // Clone building_partitions before we use it multiple times - let failed_partitions: Vec = job_run - .building_partitions - .iter() - .map(|pref| FailedPartitionRef(pref.clone())) - .collect(); + let failed = match job_run { + JobRun::Running(running) => running.fail(current_timestamp(), event.reason.clone()), + _ => { + panic!( + "BUG: Failure event received for job run {} in invalid state {:?}. Job must be Running to fail.", + event.job_run_id, job_run + ); + } + }; + + // Job run failure is SOURCE of truth that partitions failed + let failed_partitions = failed.get_failed_partitions(); // Transition partitions using strict type-safe methods self.transition_partitions_to_failed( @@ -753,20 +792,68 @@ impl BuildState { // UpstreamBuilding → UpstreamFailed (for downstream wants waiting on newly failed wants) self.cascade_failures_to_downstream_wants(&newly_failed_wants, current_timestamp()); + self.job_runs + .insert(event.job_run_id.clone(), JobRun::Failed(failed)); vec![] } - fn handle_job_run_cancel(&mut self, _event: &JobRunCancelEventV1) -> Vec { - todo!("should update already inserted job run, partition status, want status") + fn handle_job_run_cancel(&mut self, event: &JobRunCancelEventV1) -> Vec { + let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!( + "BUG: Job run {} must exist when cancel event received", + event.job_run_id + )); + + let canceled = match job_run { + JobRun::Queued(queued) => queued.cancel( + current_timestamp(), + event.source.clone(), + event.comment.clone().unwrap_or_default(), + ), + JobRun::Running(running) => running.cancel( + current_timestamp(), + event.source.clone(), + event.comment.clone().unwrap_or_default(), + ), + _ => { + panic!( + "BUG: Cancel event received for job run {} in invalid state {:?}", + event.job_run_id, job_run + ); + } + }; + + // Canceled job means building partitions should reset to Missing + let building_refs_to_reset = canceled.get_building_partitions_to_reset(); + self.reset_partitions_to_missing(&building_refs_to_reset); + + self.job_runs + .insert(event.job_run_id.clone(), JobRun::Canceled(canceled)); + vec![] } pub fn handle_job_run_dep_miss(&mut self, event: &JobRunMissingDepsEventV1) -> Vec { - let job_run_detail = self.get_job_run(&event.job_run_id).expect(&format!( - "BUG: Unable to find job run with id `{}`", + let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!( + "BUG: Job run {} must exist when dep miss event received", event.job_run_id )); - // Infer data/SLA timestamps from upstream want - let want_timestamps: WantTimestamps = job_run_detail + + let dep_miss = match job_run { + JobRun::Running(running) => running.dep_miss( + current_timestamp(), + event.missing_deps.clone(), + event.read_deps.clone(), + ), + _ => { + panic!( + "BUG: DepMiss event received for job run {} in invalid state {:?}. Job must be Running to hit dep miss.", + event.job_run_id, job_run + ); + } + }; + + // Infer data/SLA timestamps from servicing wants + let want_timestamps: WantTimestamps = dep_miss + .info .servicing_wants .iter() .flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into())) @@ -774,11 +861,12 @@ impl BuildState { .expect("BUG: No servicing wants found"); // Transition partitions back to Missing since this job can't build them yet - self.reset_partitions_to_missing(&job_run_detail.building_partitions); + let building_refs_to_reset = dep_miss.get_building_partitions_to_reset(); + self.reset_partitions_to_missing(&building_refs_to_reset); // Create wants from dep misses let want_events = missing_deps_to_want_events( - event.missing_deps.clone(), + dep_miss.get_missing_deps().to_vec(), &event.job_run_id, want_timestamps, ); @@ -794,11 +882,14 @@ impl BuildState { // Transition servicing wants to UpstreamBuilding when they have missing dependencies self.transition_wants_to_upstream_building( - &job_run_detail.servicing_wants, - &event.missing_deps, + &dep_miss.info.servicing_wants, + dep_miss.get_missing_deps(), &partition_to_want_map, ); + self.job_runs + .insert(event.job_run_id.clone(), JobRun::DepMiss(dep_miss)); + want_events } @@ -838,7 +929,7 @@ impl BuildState { self.partitions.get(partition_id).map(|p| p.to_detail()) } pub fn get_job_run(&self, job_run_id: &str) -> Option { - self.job_runs.get(job_run_id).cloned() + self.job_runs.get(job_run_id).map(|jr| jr.to_detail()) } pub fn list_wants(&self, request: &ListWantsRequest) -> ListWantsResponse { @@ -895,9 +986,19 @@ impl BuildState { pub fn list_job_runs(&self, request: &ListJobRunsRequest) -> ListJobRunsResponse { let page = request.page.unwrap_or(0); let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE); + + let start = page * page_size; + let data: Vec = self + .job_runs + .values() + .skip(start as usize) + .take(page_size as usize) + .map(|jr| jr.to_detail()) + .collect(); + ListJobRunsResponse { - data: list_state_items(&self.job_runs, page, page_size), - match_count: self.wants.len() as u64, + data, + match_count: self.job_runs.len() as u64, page, page_size, } diff --git a/databuild/databuild.proto b/databuild/databuild.proto index 62f87d7..c7fffc1 100644 --- a/databuild/databuild.proto +++ b/databuild/databuild.proto @@ -242,6 +242,7 @@ enum JobRunStatusCode { JobRunFailed = 2; JobRunCanceled = 3; JobRunSucceeded = 4; + JobRunDepMiss = 5; } message JobRunDetail { string id = 1; diff --git a/databuild/event_transforms.rs b/databuild/event_transforms.rs index cb91ccb..db32b8a 100644 --- a/databuild/event_transforms.rs +++ b/databuild/event_transforms.rs @@ -1,5 +1,6 @@ use crate::PartitionStatusCode::{PartitionFailed, PartitionLive}; use crate::data_build_event::Event; +use crate::job_run_state::{JobInfo, JobRunWithState, QueuedState}; use crate::util::current_timestamp; use crate::{ CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, @@ -83,6 +84,21 @@ impl From for JobRunDetail { } } +impl From for JobRunWithState { + fn from(event: JobRunBufferEventV1) -> Self { + JobRunWithState { + info: JobInfo { + id: event.job_run_id, + building_partitions: event.building_partitions, + servicing_wants: event.want_attributed_partitions, + }, + state: QueuedState { + queued_at: current_timestamp(), + }, + } + } +} + pub fn want_status_matches_any( pds: &Vec>, status: PartitionStatusCode, diff --git a/databuild/job.rs b/databuild/job.rs index feabb8d..9d9dbb9 100644 --- a/databuild/job.rs +++ b/databuild/job.rs @@ -1,4 +1,4 @@ -use crate::job_run::{JobRun, SubProcessBackend}; +use crate::job_run::{JobRunHandle, SubProcessBackend}; use crate::util::DatabuildError; use crate::{JobConfig, PartitionRef, WantDetail}; use regex::Regex; @@ -15,13 +15,13 @@ impl JobConfiguration { pub fn spawn( &self, wants: Vec, - ) -> Result, std::io::Error> { + ) -> Result, std::io::Error> { let wanted_refs: Vec = wants .iter() .flat_map(|want| want.partitions.clone()) .collect(); let args: Vec = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect(); - Ok(JobRun::spawn(self.entry_point.clone(), args)) + Ok(JobRunHandle::spawn(self.entry_point.clone(), args)) } pub fn matches(&self, refs: &PartitionRef) -> bool { diff --git a/databuild/job_run.rs b/databuild/job_run.rs index 055acd0..ded201e 100644 --- a/databuild/job_run.rs +++ b/databuild/job_run.rs @@ -55,34 +55,34 @@ pub enum PollResult { } // ===== TYPE-SAFE STATE MACHINE PATTERN ===== -// Uses parameterized JobRunWithState wrapped in JobRun enum for storage +// Uses parameterized JobRunHandleWithState wrapped in JobRun enum for storage -/// New JobRun with embedded state enum -/// Type-safe job run struct, parameterized by backend and state -/// This struct can only perform operations valid for its current state type -pub struct JobRunWithState { +/// JobRunHandle with embedded state enum +/// Type-safe job run handle struct, parameterized by backend and state +/// This struct manages the actual running process/execution and can only perform operations valid for its current state type +pub struct JobRunHandleWithState { pub job_run_id: Uuid, pub state: S, pub _backend: PhantomData, } -/// Wrapper enum for storing job runs in a single collection +/// Wrapper enum for storing job run handles in a single collection /// This allows us to store jobs in different states together while maintaining type safety -pub enum JobRun { - NotStarted(JobRunWithState), - Running(JobRunWithState), - Completed(JobRunWithState), - Failed(JobRunWithState), - Canceled(JobRunWithState), - DepMiss(JobRunWithState), +pub enum JobRunHandle { + NotStarted(JobRunHandleWithState), + Running(JobRunHandleWithState), + Completed(JobRunHandleWithState), + Failed(JobRunHandleWithState), + Canceled(JobRunHandleWithState), + DepMiss(JobRunHandleWithState), } /// Result of visiting a running job - returns the typed states pub enum VisitResult { - StillRunning(JobRunWithState), - Completed(JobRunWithState), - Failed(JobRunWithState), - DepMiss(JobRunWithState), + StillRunning(JobRunHandleWithState), + Completed(JobRunHandleWithState), + Failed(JobRunHandleWithState), + DepMiss(JobRunHandleWithState), } pub enum JobRunConfig { @@ -297,11 +297,11 @@ pub struct JobRunPollResult { // ===== Type-Safe State Transition Implementation ===== -// Factory and helper methods on the JobRun enum -impl JobRun { +// Factory and helper methods on the JobRunHandle enum +impl JobRunHandle { /// Create a new job run in the NotStarted state pub fn spawn(entry_point: String, args: Vec) -> Self { - JobRun::NotStarted(JobRunWithState { + JobRunHandle::NotStarted(JobRunHandleWithState { job_run_id: Uuid::new_v4(), state: B::create(entry_point, args), _backend: PhantomData, @@ -311,12 +311,12 @@ impl JobRun { /// Get the job run ID regardless of state pub fn job_run_id(&self) -> &Uuid { match self { - JobRun::NotStarted(j) => &j.job_run_id, - JobRun::Running(j) => &j.job_run_id, - JobRun::Completed(j) => &j.job_run_id, - JobRun::Failed(j) => &j.job_run_id, - JobRun::Canceled(j) => &j.job_run_id, - JobRun::DepMiss(j) => &j.job_run_id, + JobRunHandle::NotStarted(j) => &j.job_run_id, + JobRunHandle::Running(j) => &j.job_run_id, + JobRunHandle::Completed(j) => &j.job_run_id, + JobRunHandle::Failed(j) => &j.job_run_id, + JobRunHandle::Canceled(j) => &j.job_run_id, + JobRunHandle::DepMiss(j) => &j.job_run_id, } } @@ -324,20 +324,23 @@ impl JobRun { pub fn is_terminal(&self) -> bool { matches!( self, - JobRun::Completed(_) | JobRun::Failed(_) | JobRun::Canceled(_) | JobRun::DepMiss(_) + JobRunHandle::Completed(_) + | JobRunHandle::Failed(_) + | JobRunHandle::Canceled(_) + | JobRunHandle::DepMiss(_) ) } } // Type-safe transition: NotStarted -> Running // This method can ONLY be called on NotStarted jobs - compile error otherwise! -impl JobRunWithState { +impl JobRunHandleWithState { pub fn run( self, env: Option>, - ) -> Result, DatabuildError> { + ) -> Result, DatabuildError> { let running = B::start(self.state, env)?; - Ok(JobRunWithState { + Ok(JobRunHandleWithState { job_run_id: self.job_run_id, state: running, _backend: PhantomData, @@ -347,21 +350,21 @@ impl JobRunWithState { // Type-safe transition: Running -> (Running | Completed | Failed | DepMiss) // This method can ONLY be called on Running jobs - compile error otherwise! -impl JobRunWithState { +impl JobRunHandleWithState { pub fn visit(mut self) -> Result, DatabuildError> { match B::poll(&mut self.state)? { PollResult::StillRunning => Ok(VisitResult::StillRunning(self)), - PollResult::Completed(completed) => Ok(VisitResult::Completed(JobRunWithState { + PollResult::Completed(completed) => Ok(VisitResult::Completed(JobRunHandleWithState { job_run_id: self.job_run_id, state: completed, _backend: PhantomData, })), - PollResult::Failed(failed) => Ok(VisitResult::Failed(JobRunWithState { + PollResult::Failed(failed) => Ok(VisitResult::Failed(JobRunHandleWithState { job_run_id: self.job_run_id, state: failed, _backend: PhantomData, })), - PollResult::DepMiss(dep_miss) => Ok(VisitResult::DepMiss(JobRunWithState { + PollResult::DepMiss(dep_miss) => Ok(VisitResult::DepMiss(JobRunHandleWithState { job_run_id: self.job_run_id, state: dep_miss, _backend: PhantomData, @@ -372,9 +375,9 @@ impl JobRunWithState { pub fn cancel( self, source: EventSource, - ) -> Result, DatabuildError> { + ) -> Result, DatabuildError> { let canceled = B::cancel_job(self.state, source)?; - Ok(JobRunWithState { + Ok(JobRunHandleWithState { job_run_id: self.job_run_id, state: canceled, _backend: PhantomData, @@ -417,7 +420,7 @@ impl ToEvent for SubProcessDepMiss { mod tests { use crate::data_build_event::Event; use crate::data_deps::DATABUILD_MISSING_DEPS_JSON; - use crate::job_run::{JobRun, JobRunBackend, SubProcessBackend, VisitResult}; + use crate::job_run::{JobRunBackend, JobRunHandle, SubProcessBackend, VisitResult}; use crate::mock_job_run::MockJobRun; use crate::{JobRunMissingDeps, MissingDeps}; @@ -425,11 +428,11 @@ mod tests { #[test] fn test_job_run_success_returns_job_run_success_event() { // Spawn a job run that will succeed (exit code 0) - let job_run = JobRun::::spawn(MockJobRun::bin_path(), vec![]); + let job_run = JobRunHandle::::spawn(MockJobRun::bin_path(), vec![]); // Start the job - this consumes the NotStarted and returns Running let running_job = match job_run { - JobRun::NotStarted(not_started) => not_started.run(None).unwrap(), + JobRunHandle::NotStarted(not_started) => not_started.run(None).unwrap(), _ => panic!("Expected NotStarted job"), }; @@ -463,12 +466,12 @@ mod tests { #[test] fn test_job_run_failure_returns_job_run_failure_event() { // Spawn a job run - let job_run = JobRun::::spawn(MockJobRun::bin_path(), vec![]); + let job_run = JobRunHandle::::spawn(MockJobRun::bin_path(), vec![]); // Start the job with an exit code that indicates failure (non-zero) let env = MockJobRun::new().exit_code(1).to_env(); let running_job = match job_run { - JobRun::NotStarted(not_started) => not_started.run(Some(env)).unwrap(), + JobRunHandle::NotStarted(not_started) => not_started.run(Some(env)).unwrap(), _ => panic!("Expected NotStarted job"), }; @@ -511,7 +514,7 @@ mod tests { let temp_file = format!("/tmp/databuild_test_cancel_{}", Uuid::new_v4()); // Spawn a job run that will sleep for 1 second and write a file - let job_run = JobRun::::spawn(MockJobRun::bin_path(), vec![]); + let job_run = JobRunHandle::::spawn(MockJobRun::bin_path(), vec![]); let env = MockJobRun::new() .sleep_ms(1000) @@ -519,7 +522,7 @@ mod tests { .exit_code(0) .to_env(); let running_job = match job_run { - JobRun::NotStarted(not_started) => not_started.run(Some(env)).unwrap(), + JobRunHandle::NotStarted(not_started) => not_started.run(Some(env)).unwrap(), _ => panic!("Expected NotStarted job"), }; @@ -558,7 +561,7 @@ mod tests { #[test] fn test_job_run_fail_on_missing_deps_should_emit_missing_deps_event() { // Spawn a job run that will sleep for 1 second and write a file - let job_run = JobRun::::spawn(MockJobRun::bin_path(), vec![]); + let job_run = JobRunHandle::::spawn(MockJobRun::bin_path(), vec![]); let expected_dep_miss = JobRunMissingDeps { version: "1".into(), @@ -575,7 +578,7 @@ mod tests { .exit_code(1) .to_env(); let running_job = match job_run { - JobRun::NotStarted(not_started) => not_started.run(Some(env)).unwrap(), + JobRunHandle::NotStarted(not_started) => not_started.run(Some(env)).unwrap(), _ => panic!("Expected NotStarted job"), }; diff --git a/databuild/job_run_state.rs b/databuild/job_run_state.rs new file mode 100644 index 0000000..c50b896 --- /dev/null +++ b/databuild/job_run_state.rs @@ -0,0 +1,342 @@ +use crate::partition_state::{BuildingPartitionRef, FailedPartitionRef, LivePartitionRef}; +use crate::util::current_timestamp; +use crate::{ + EventSource, JobRunDetail, JobRunStatusCode, MissingDeps, PartitionRef, ReadDeps, + WantAttributedPartitions, +}; + +/// State: Job has been queued but not yet started +#[derive(Debug, Clone)] +pub struct QueuedState { + pub queued_at: u64, +} + +/// State: Job is currently running +#[derive(Debug, Clone)] +pub struct RunningState { + pub started_at: u64, + pub last_heartbeat_at: u64, // NOT optional, defaults to started_at +} + +/// State: Job completed successfully +#[derive(Debug, Clone)] +pub struct SucceededState { + pub completed_at: u64, +} + +/// State: Job failed during execution +#[derive(Debug, Clone)] +pub struct FailedState { + pub failed_at: u64, + pub failure_reason: String, +} + +/// State: Job detected missing dependencies +#[derive(Debug, Clone)] +pub struct DepMissState { + pub detected_at: u64, + pub missing_deps: Vec, + pub read_deps: Vec, +} + +/// State: Job was explicitly canceled +#[derive(Debug, Clone)] +pub struct CanceledState { + pub canceled_at: u64, + pub source: Option, + pub comment: String, +} + +/// Shared information across all job run states +#[derive(Debug, Clone)] +pub struct JobInfo { + pub id: String, + pub building_partitions: Vec, + pub servicing_wants: Vec, +} + +/// Generic job run struct parameterized by state +#[derive(Debug, Clone)] +pub struct JobRunWithState { + pub info: JobInfo, + pub state: S, +} + +/// Wrapper enum for storing job runs in collections +#[derive(Debug, Clone)] +pub enum JobRun { + Queued(JobRunWithState), + Running(JobRunWithState), + Succeeded(JobRunWithState), + Failed(JobRunWithState), + DepMiss(JobRunWithState), + Canceled(JobRunWithState), +} + +// ==================== State Transitions ==================== + +impl JobRunWithState { + /// Transition from Queued to Running + pub fn start_running(self, timestamp: u64) -> JobRunWithState { + JobRunWithState { + info: self.info, + state: RunningState { + started_at: timestamp, + last_heartbeat_at: timestamp, // Initialize to start time + }, + } + } + + /// Transition from Queued to Canceled (canceled before starting) + pub fn cancel( + self, + timestamp: u64, + source: Option, + comment: String, + ) -> JobRunWithState { + JobRunWithState { + info: self.info, + state: CanceledState { + canceled_at: timestamp, + source, + comment, + }, + } + } +} + +impl JobRunWithState { + /// Update heartbeat timestamp (non-consuming) + pub fn heartbeat(mut self, timestamp: u64) -> Self { + self.state.last_heartbeat_at = timestamp; + self + } + + /// Transition from Running to Succeeded + pub fn succeed(self, timestamp: u64) -> JobRunWithState { + JobRunWithState { + info: self.info, + state: SucceededState { + completed_at: timestamp, + }, + } + } + + /// Transition from Running to Failed + pub fn fail(self, timestamp: u64, reason: String) -> JobRunWithState { + JobRunWithState { + info: self.info, + state: FailedState { + failed_at: timestamp, + failure_reason: reason, + }, + } + } + + /// Transition from Running to DepMiss + pub fn dep_miss( + self, + timestamp: u64, + missing_deps: Vec, + read_deps: Vec, + ) -> JobRunWithState { + JobRunWithState { + info: self.info, + state: DepMissState { + detected_at: timestamp, + missing_deps, + read_deps, + }, + } + } + + /// Transition from Running to Canceled + pub fn cancel( + self, + timestamp: u64, + source: Option, + comment: String, + ) -> JobRunWithState { + JobRunWithState { + info: self.info, + state: CanceledState { + canceled_at: timestamp, + source, + comment, + }, + } + } +} + +// ==================== Type-Safe Job Run IDs ==================== + +/// Type-safe job run ID wrappers that encode state expectations in function signatures. +/// These should be created ephemerally from typestate objects via .get_id() and used immediately—never stored long-term. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct QueuedJobRunId(pub String); + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RunningJobRunId(pub String); + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SucceededJobRunId(pub String); + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FailedJobRunId(pub String); + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DepMissJobRunId(pub String); + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CanceledJobRunId(pub String); + +// ==================== State-Specific Methods ==================== + +impl JobRunWithState { + pub fn get_id(&self) -> QueuedJobRunId { + QueuedJobRunId(self.info.id.clone()) + } +} + +impl JobRunWithState { + pub fn get_id(&self) -> RunningJobRunId { + RunningJobRunId(self.info.id.clone()) + } + + /// Currently building these partitions + /// Job run running state is the SOURCE of truth that these partitions are building + pub fn get_building_partitions(&self) -> Vec { + self.info + .building_partitions + .iter() + .map(|p| BuildingPartitionRef(p.clone())) + .collect() + } + + pub fn get_last_heartbeat(&self) -> u64 { + self.state.last_heartbeat_at + } +} + +impl JobRunWithState { + pub fn get_id(&self) -> SucceededJobRunId { + SucceededJobRunId(self.info.id.clone()) + } + + /// Job run success is the SOURCE of truth that these partitions are live + pub fn get_completed_partitions(&self) -> Vec { + self.info + .building_partitions + .iter() + .map(|p| LivePartitionRef(p.clone())) + .collect() + } +} + +impl JobRunWithState { + pub fn get_id(&self) -> FailedJobRunId { + FailedJobRunId(self.info.id.clone()) + } + + /// Job run failure is the SOURCE of truth that these partitions failed + pub fn get_failed_partitions(&self) -> Vec { + self.info + .building_partitions + .iter() + .map(|p| FailedPartitionRef(p.clone())) + .collect() + } + + pub fn get_failure_reason(&self) -> &str { + &self.state.failure_reason + } +} + +impl JobRunWithState { + pub fn get_id(&self) -> DepMissJobRunId { + DepMissJobRunId(self.info.id.clone()) + } + + /// Job run dep miss means building partitions should reset to Missing + pub fn get_building_partitions_to_reset(&self) -> Vec { + self.info + .building_partitions + .iter() + .map(|p| BuildingPartitionRef(p.clone())) + .collect() + } + + pub fn get_missing_deps(&self) -> &[MissingDeps] { + &self.state.missing_deps + } + + pub fn get_read_deps(&self) -> &[ReadDeps] { + &self.state.read_deps + } +} + +impl JobRunWithState { + pub fn get_id(&self) -> CanceledJobRunId { + CanceledJobRunId(self.info.id.clone()) + } + + /// Canceled job means building partitions should reset to Missing + pub fn get_building_partitions_to_reset(&self) -> Vec { + self.info + .building_partitions + .iter() + .map(|p| BuildingPartitionRef(p.clone())) + .collect() + } +} + +// ==================== Conversion to JobRunDetail for API ==================== + +impl JobRun { + pub fn to_detail(&self) -> JobRunDetail { + match self { + JobRun::Queued(queued) => JobRunDetail { + id: queued.info.id.clone(), + status: Some(JobRunStatusCode::JobRunQueued.into()), + last_heartbeat_at: None, + building_partitions: queued.info.building_partitions.clone(), + servicing_wants: queued.info.servicing_wants.clone(), + }, + JobRun::Running(running) => JobRunDetail { + id: running.info.id.clone(), + status: Some(JobRunStatusCode::JobRunRunning.into()), + last_heartbeat_at: Some(running.state.last_heartbeat_at), + building_partitions: running.info.building_partitions.clone(), + servicing_wants: running.info.servicing_wants.clone(), + }, + JobRun::Succeeded(succeeded) => JobRunDetail { + id: succeeded.info.id.clone(), + status: Some(JobRunStatusCode::JobRunSucceeded.into()), + last_heartbeat_at: None, + building_partitions: succeeded.info.building_partitions.clone(), + servicing_wants: succeeded.info.servicing_wants.clone(), + }, + JobRun::Failed(failed) => JobRunDetail { + id: failed.info.id.clone(), + status: Some(JobRunStatusCode::JobRunFailed.into()), + last_heartbeat_at: None, + building_partitions: failed.info.building_partitions.clone(), + servicing_wants: failed.info.servicing_wants.clone(), + }, + JobRun::DepMiss(dep_miss) => JobRunDetail { + id: dep_miss.info.id.clone(), + status: Some(JobRunStatusCode::JobRunDepMiss.into()), + last_heartbeat_at: None, + building_partitions: dep_miss.info.building_partitions.clone(), + servicing_wants: dep_miss.info.servicing_wants.clone(), + }, + JobRun::Canceled(canceled) => JobRunDetail { + id: canceled.info.id.clone(), + status: Some(JobRunStatusCode::JobRunCanceled.into()), + last_heartbeat_at: None, + building_partitions: canceled.info.building_partitions.clone(), + servicing_wants: canceled.info.servicing_wants.clone(), + }, + } + } +} diff --git a/databuild/lib.rs b/databuild/lib.rs index 311bba7..670d886 100644 --- a/databuild/lib.rs +++ b/databuild/lib.rs @@ -4,6 +4,7 @@ mod data_deps; mod event_transforms; mod job; mod job_run; +mod job_run_state; mod mock_job_run; mod orchestrator; mod partition_state; diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index f57e465..460b747 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -20,7 +20,7 @@ JTBDs: struct Orchestrator { pub bel: BuildEventLog, pub config: OrchestratorConfig, - pub job_runs: Vec>, + pub job_runs: Vec>, } impl Default for Orchestrator { @@ -125,12 +125,25 @@ impl Orchestrator { } fn schedule_queued_jobs(&mut self) -> Result<(), DatabuildError> { - use crate::job_run::JobRun; + use crate::JobRunHeartbeatEventV1; + use crate::data_build_event::Event; + use crate::job_run::JobRunHandle; let mut new_jobs = Vec::new(); for job in self.job_runs.drain(..) { let transitioned = match job { - JobRun::NotStarted(not_started) => JobRun::Running(not_started.run(None)?), + JobRunHandle::NotStarted(not_started) => { + let job_run_id = not_started.job_run_id.clone(); + let running = not_started.run(None)?; + + // Emit heartbeat event to notify BuildState that job is now running + let heartbeat_event = Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 { + job_run_id: job_run_id.to_string(), + }); + self.bel.append_event(&heartbeat_event)?; + + JobRunHandle::Running(running) + } other => other, // Pass through all other states unchanged }; new_jobs.push(transitioned); @@ -142,36 +155,41 @@ impl Orchestrator { /// Visits individual job runs, appending resulting events, and moving runs between run status /// containers. Either jobs are still running, or they are moved to terminal states. fn poll_job_runs(&mut self) -> Result<(), DatabuildError> { - use crate::job_run::{JobRun, VisitResult}; + use crate::job_run::{JobRunHandle, VisitResult}; self.schedule_queued_jobs()?; + // TODO: Emit periodic JobRunHeartbeatEventV1 events for long-running jobs + // Currently we emit one heartbeat when starting (in schedule_queued_jobs), but we should + // also emit periodic heartbeats for long-running jobs to detect stalls + // Need to decide on heartbeat frequency (e.g., every N polls or based on time elapsed) + // Visit all running jobs using type-safe transitions let mut new_jobs = Vec::new(); for job in self.job_runs.drain(..) { let transitioned = match job { - JobRun::Running(running) => match running.visit()? { + JobRunHandle::Running(running) => match running.visit()? { VisitResult::StillRunning(still_running) => { println!("Still running job: {:?}", still_running.job_run_id); - JobRun::Running(still_running) + JobRunHandle::Running(still_running) } VisitResult::Completed(completed) => { println!("Completed job: {:?}", completed.job_run_id); let event = completed.state.to_event(&completed.job_run_id); self.bel.append_event(&event)?; - JobRun::Completed(completed) + JobRunHandle::Completed(completed) } VisitResult::Failed(failed) => { println!("Failed job: {:?}", failed.job_run_id); let event = failed.state.to_event(&failed.job_run_id); self.bel.append_event(&event)?; - JobRun::Failed(failed) + JobRunHandle::Failed(failed) } VisitResult::DepMiss(dep_miss) => { println!("Dep miss job: {:?}", dep_miss.job_run_id); let event = dep_miss.state.to_event(&dep_miss.job_run_id); self.bel.append_event(&event)?; - JobRun::DepMiss(dep_miss) + JobRunHandle::DepMiss(dep_miss) } }, other => other, // Pass through all non-running states unchanged @@ -225,7 +243,7 @@ impl Orchestrator { } fn queue_job(&mut self, wg: WantGroup) -> Result<(), DatabuildError> { - use crate::job_run::JobRun; + use crate::job_run::JobRunHandle; // Compute args from wants the same way JobConfiguration::spawn() does let wanted_refs: Vec = wg @@ -234,7 +252,7 @@ impl Orchestrator { .flat_map(|want| want.partitions.clone()) .collect(); let args: Vec = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect(); - let job_run = JobRun::spawn(wg.job.entry_point.clone(), args); + let job_run = JobRunHandle::spawn(wg.job.entry_point.clone(), args); // Create job run buffer event let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 { @@ -263,10 +281,10 @@ impl Orchestrator { // Helper methods for tests to count jobs by state #[cfg(test)] fn count_running_jobs(&self) -> usize { - use crate::job_run::JobRun; + use crate::job_run::JobRunHandle; self.job_runs .iter() - .filter(|j| matches!(j, JobRun::Running(_))) + .filter(|j| matches!(j, JobRunHandle::Running(_))) .count() } @@ -277,28 +295,28 @@ impl Orchestrator { #[cfg(test)] fn count_not_started_jobs(&self) -> usize { - use crate::job_run::JobRun; + use crate::job_run::JobRunHandle; self.job_runs .iter() - .filter(|j| matches!(j, JobRun::NotStarted(_))) + .filter(|j| matches!(j, JobRunHandle::NotStarted(_))) .count() } #[cfg(test)] fn count_dep_miss_jobs(&self) -> usize { - use crate::job_run::JobRun; + use crate::job_run::JobRunHandle; self.job_runs .iter() - .filter(|j| matches!(j, JobRun::DepMiss(_))) + .filter(|j| matches!(j, JobRunHandle::DepMiss(_))) .count() } #[cfg(test)] fn count_completed_jobs(&self) -> usize { - use crate::job_run::JobRun; + use crate::job_run::JobRunHandle; self.job_runs .iter() - .filter(|j| matches!(j, JobRun::Completed(_))) + .filter(|j| matches!(j, JobRunHandle::Completed(_))) .count() } @@ -444,13 +462,13 @@ mod tests { // Should schedule alpha job assert_eq!(orchestrator.count_not_started_jobs(), 1); // Verify the job has the right args by checking the first NotStarted job - use crate::job_run::JobRun; + use crate::job_run::JobRunHandle; let not_started_job = orchestrator .job_runs .iter() - .find(|j| matches!(j, JobRun::NotStarted(_))) + .find(|j| matches!(j, JobRunHandle::NotStarted(_))) .unwrap(); - if let JobRun::NotStarted(job) = not_started_job { + if let JobRunHandle::NotStarted(job) = not_started_job { assert_eq!( job.state.args, vec!["data/alpha"],