Refactor job run into type-state state machines + typed IDs

This commit is contained in:
Stuart Axelbrooke 2025-11-22 20:27:37 +08:00
parent eb44350865
commit cf163b294d
8 changed files with 617 additions and 135 deletions

View file

@ -1,8 +1,9 @@
use crate::data_build_event::Event;
use crate::data_deps::{WantTimestamps, missing_deps_to_want_events};
use crate::job_run_state::{JobRun, JobRunWithState, QueuedState as JobQueuedState};
use crate::partition_state::{
FailedPartitionRef, LivePartitionRef, MissingPartitionRef, MissingState, Partition,
PartitionWithState, TaintedPartitionRef,
BuildingPartitionRef, FailedPartitionRef, LivePartitionRef, MissingPartitionRef, MissingState,
Partition, PartitionWithState, TaintedPartitionRef,
};
use crate::util::current_timestamp;
use crate::want_state::{
@ -49,7 +50,7 @@ pub struct BuildState {
wants: BTreeMap<String, Want>, // Type-safe want storage
taints: BTreeMap<String, TaintDetail>,
partitions: BTreeMap<String, Partition>, // Type-safe partition storage
job_runs: BTreeMap<String, JobRunDetail>,
job_runs: BTreeMap<String, JobRun>, // Type-safe job run storage
}
impl Default for BuildState {
@ -89,11 +90,11 @@ impl BuildState {
/// Used when a job run starts building partitions
fn transition_partitions_to_building(
&mut self,
partition_refs: &[PartitionRef],
partition_refs: &[BuildingPartitionRef],
job_run_id: &str,
) {
for pref in partition_refs {
if let Some(partition) = self.partitions.remove(&pref.r#ref) {
for building_ref in partition_refs {
if let Some(partition) = self.partitions.remove(&building_ref.0.r#ref) {
// Partition exists - transition based on current state
let transitioned = match partition {
// Valid: Missing -> Building
@ -104,18 +105,19 @@ impl BuildState {
_ => {
panic!(
"BUG: Invalid state - partition {} cannot start building from state {:?}",
pref.r#ref, partition
building_ref.0.r#ref, partition
)
}
};
self.partitions.insert(pref.r#ref.clone(), transitioned);
self.partitions
.insert(building_ref.0.r#ref.clone(), transitioned);
} else {
// Partition doesn't exist yet - create in Missing then transition to Building
let missing = Partition::new_missing(pref.clone());
let missing = Partition::new_missing(building_ref.0.clone());
if let Partition::Missing(m) = missing {
let building = m.start_building(job_run_id.to_string());
self.partitions
.insert(pref.r#ref.clone(), Partition::Building(building));
.insert(building_ref.0.r#ref.clone(), Partition::Building(building));
}
}
}
@ -185,12 +187,15 @@ impl BuildState {
/// Reset partitions from Building back to Missing state
/// Used when a job run encounters missing dependencies and cannot proceed
fn reset_partitions_to_missing(&mut self, partition_refs: &[PartitionRef]) {
for pref in partition_refs {
let partition = self.partitions.remove(&pref.r#ref).expect(&format!(
"BUG: Partition {} must exist and be in Building state during dep_miss",
pref.r#ref
));
fn reset_partitions_to_missing(&mut self, partition_refs: &[BuildingPartitionRef]) {
for building_ref in partition_refs {
let partition = self
.partitions
.remove(&building_ref.0.r#ref)
.expect(&format!(
"BUG: Partition {} must exist and be in Building state during dep_miss",
building_ref.0.r#ref
));
// Only valid transition: Building -> Missing
let transitioned = match partition {
@ -199,11 +204,12 @@ impl BuildState {
_ => {
panic!(
"BUG: Invalid state - partition {} must be Building during dep_miss, found {:?}",
pref.r#ref, partition
building_ref.0.r#ref, partition
)
}
};
self.partitions.insert(pref.r#ref.clone(), transitioned);
self.partitions
.insert(building_ref.0.r#ref.clone(), transitioned);
}
}
@ -632,8 +638,8 @@ impl BuildState {
);
}
// Create job run to be inserted
let job_run: JobRunDetail = event.clone().into();
// Create job run in Queued state
let queued: JobRunWithState<JobQueuedState> = event.clone().into();
// Transition wants to Building
// Valid states when job buffer event arrives:
@ -642,7 +648,7 @@ impl BuildState {
// Invalid states (panic - indicates orchestrator bug):
// - UpstreamBuilding: Not schedulable, waiting for dependencies
// - Successful/Failed/UpstreamFailed/Canceled: Terminal states, not schedulable
for wap in &job_run.servicing_wants {
for wap in &queued.info.servicing_wants {
let want = self.wants.remove(&wap.want_id).expect(&format!(
"BUG: Want {} must exist when job buffer event received",
wap.want_id
@ -668,41 +674,64 @@ impl BuildState {
self.wants.insert(wap.want_id.clone(), transitioned);
}
// Get building partition refs from queued job - job is source of truth for building partitions
let building_refs: Vec<BuildingPartitionRef> = queued
.info
.building_partitions
.iter()
.map(|p| BuildingPartitionRef(p.clone()))
.collect();
// Transition partitions to Building state
self.transition_partitions_to_building(&job_run.building_partitions, &event.job_run_id);
self.transition_partitions_to_building(&building_refs, &event.job_run_id);
self.job_runs
.insert(event.job_run_id.clone(), job_run.clone());
println!("Inserted job run: {:?}", job_run);
.insert(event.job_run_id.clone(), JobRun::Queued(queued));
vec![]
}
fn update_job_run_status(&mut self, job_run_id: &str, status: JobRunStatusCode) {
let job_run = self.job_runs.get_mut(job_run_id).expect(&format!(
"BUG: Job run ID {} must exist to update status",
job_run_id
));
job_run.last_heartbeat_at = Some(current_timestamp());
job_run.status = Some(status.into());
}
fn handle_job_run_heartbeat(&mut self, event: &JobRunHeartbeatEventV1) -> Vec<Event> {
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunRunning);
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
"BUG: Job run {} must exist when heartbeat received",
event.job_run_id
));
let running = match job_run {
// First heartbeat: Queued -> Running
JobRun::Queued(queued) => queued.start_running(current_timestamp()),
// Subsequent heartbeat: update timestamp
JobRun::Running(running) => running.heartbeat(current_timestamp()),
_ => {
panic!(
"BUG: Heartbeat received for job run {} in invalid state {:?}",
event.job_run_id, job_run
);
}
};
self.job_runs
.insert(event.job_run_id.clone(), JobRun::Running(running));
vec![]
}
fn handle_job_run_success(&mut self, event: &JobRunSuccessEventV1) -> Vec<Event> {
println!("Job run success event: {:?}", event);
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunSucceeded);
let job_run = self.get_job_run(&event.job_run_id).unwrap();
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
"BUG: Job run {} must exist when success event received",
event.job_run_id
));
// Clone building_partitions before we use it multiple times
// TODO correct this explicit upcasting of partition ref type
let newly_live_partitions: Vec<LivePartitionRef> = job_run
.building_partitions
.iter()
.map(|pref| LivePartitionRef(pref.clone()))
.collect();
let succeeded = match job_run {
JobRun::Running(running) => running.succeed(current_timestamp()),
_ => {
panic!(
"BUG: Success event received for job run {} in invalid state {:?}. Job must be Running to succeed.",
event.job_run_id, job_run
);
}
};
// Job run success is SOURCE of truth that partitions are live
let newly_live_partitions = succeeded.get_completed_partitions();
// Update partitions being built by this job (strict type-safe transitions)
self.transition_partitions_to_live(
@ -725,19 +754,29 @@ impl BuildState {
current_timestamp(),
);
self.job_runs
.insert(event.job_run_id.clone(), JobRun::Succeeded(succeeded));
vec![]
}
fn handle_job_run_failure(&mut self, event: &JobRunFailureEventV1) -> Vec<Event> {
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed);
let job_run = self.get_job_run(&event.job_run_id).unwrap();
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
"BUG: Job run {} must exist when failure event received",
event.job_run_id
));
// Clone building_partitions before we use it multiple times
let failed_partitions: Vec<FailedPartitionRef> = job_run
.building_partitions
.iter()
.map(|pref| FailedPartitionRef(pref.clone()))
.collect();
let failed = match job_run {
JobRun::Running(running) => running.fail(current_timestamp(), event.reason.clone()),
_ => {
panic!(
"BUG: Failure event received for job run {} in invalid state {:?}. Job must be Running to fail.",
event.job_run_id, job_run
);
}
};
// Job run failure is SOURCE of truth that partitions failed
let failed_partitions = failed.get_failed_partitions();
// Transition partitions using strict type-safe methods
self.transition_partitions_to_failed(
@ -753,20 +792,68 @@ impl BuildState {
// UpstreamBuilding → UpstreamFailed (for downstream wants waiting on newly failed wants)
self.cascade_failures_to_downstream_wants(&newly_failed_wants, current_timestamp());
self.job_runs
.insert(event.job_run_id.clone(), JobRun::Failed(failed));
vec![]
}
fn handle_job_run_cancel(&mut self, _event: &JobRunCancelEventV1) -> Vec<Event> {
todo!("should update already inserted job run, partition status, want status")
fn handle_job_run_cancel(&mut self, event: &JobRunCancelEventV1) -> Vec<Event> {
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
"BUG: Job run {} must exist when cancel event received",
event.job_run_id
));
let canceled = match job_run {
JobRun::Queued(queued) => queued.cancel(
current_timestamp(),
event.source.clone(),
event.comment.clone().unwrap_or_default(),
),
JobRun::Running(running) => running.cancel(
current_timestamp(),
event.source.clone(),
event.comment.clone().unwrap_or_default(),
),
_ => {
panic!(
"BUG: Cancel event received for job run {} in invalid state {:?}",
event.job_run_id, job_run
);
}
};
// Canceled job means building partitions should reset to Missing
let building_refs_to_reset = canceled.get_building_partitions_to_reset();
self.reset_partitions_to_missing(&building_refs_to_reset);
self.job_runs
.insert(event.job_run_id.clone(), JobRun::Canceled(canceled));
vec![]
}
pub fn handle_job_run_dep_miss(&mut self, event: &JobRunMissingDepsEventV1) -> Vec<Event> {
let job_run_detail = self.get_job_run(&event.job_run_id).expect(&format!(
"BUG: Unable to find job run with id `{}`",
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
"BUG: Job run {} must exist when dep miss event received",
event.job_run_id
));
// Infer data/SLA timestamps from upstream want
let want_timestamps: WantTimestamps = job_run_detail
let dep_miss = match job_run {
JobRun::Running(running) => running.dep_miss(
current_timestamp(),
event.missing_deps.clone(),
event.read_deps.clone(),
),
_ => {
panic!(
"BUG: DepMiss event received for job run {} in invalid state {:?}. Job must be Running to hit dep miss.",
event.job_run_id, job_run
);
}
};
// Infer data/SLA timestamps from servicing wants
let want_timestamps: WantTimestamps = dep_miss
.info
.servicing_wants
.iter()
.flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into()))
@ -774,11 +861,12 @@ impl BuildState {
.expect("BUG: No servicing wants found");
// Transition partitions back to Missing since this job can't build them yet
self.reset_partitions_to_missing(&job_run_detail.building_partitions);
let building_refs_to_reset = dep_miss.get_building_partitions_to_reset();
self.reset_partitions_to_missing(&building_refs_to_reset);
// Create wants from dep misses
let want_events = missing_deps_to_want_events(
event.missing_deps.clone(),
dep_miss.get_missing_deps().to_vec(),
&event.job_run_id,
want_timestamps,
);
@ -794,11 +882,14 @@ impl BuildState {
// Transition servicing wants to UpstreamBuilding when they have missing dependencies
self.transition_wants_to_upstream_building(
&job_run_detail.servicing_wants,
&event.missing_deps,
&dep_miss.info.servicing_wants,
dep_miss.get_missing_deps(),
&partition_to_want_map,
);
self.job_runs
.insert(event.job_run_id.clone(), JobRun::DepMiss(dep_miss));
want_events
}
@ -838,7 +929,7 @@ impl BuildState {
self.partitions.get(partition_id).map(|p| p.to_detail())
}
pub fn get_job_run(&self, job_run_id: &str) -> Option<JobRunDetail> {
self.job_runs.get(job_run_id).cloned()
self.job_runs.get(job_run_id).map(|jr| jr.to_detail())
}
pub fn list_wants(&self, request: &ListWantsRequest) -> ListWantsResponse {
@ -895,9 +986,19 @@ impl BuildState {
pub fn list_job_runs(&self, request: &ListJobRunsRequest) -> ListJobRunsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
let start = page * page_size;
let data: Vec<JobRunDetail> = self
.job_runs
.values()
.skip(start as usize)
.take(page_size as usize)
.map(|jr| jr.to_detail())
.collect();
ListJobRunsResponse {
data: list_state_items(&self.job_runs, page, page_size),
match_count: self.wants.len() as u64,
data,
match_count: self.job_runs.len() as u64,
page,
page_size,
}

View file

@ -242,6 +242,7 @@ enum JobRunStatusCode {
JobRunFailed = 2;
JobRunCanceled = 3;
JobRunSucceeded = 4;
JobRunDepMiss = 5;
}
message JobRunDetail {
string id = 1;

View file

@ -1,5 +1,6 @@
use crate::PartitionStatusCode::{PartitionFailed, PartitionLive};
use crate::data_build_event::Event;
use crate::job_run_state::{JobInfo, JobRunWithState, QueuedState};
use crate::util::current_timestamp;
use crate::{
CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse,
@ -83,6 +84,21 @@ impl From<JobRunBufferEventV1> for JobRunDetail {
}
}
impl From<JobRunBufferEventV1> for JobRunWithState<QueuedState> {
fn from(event: JobRunBufferEventV1) -> Self {
JobRunWithState {
info: JobInfo {
id: event.job_run_id,
building_partitions: event.building_partitions,
servicing_wants: event.want_attributed_partitions,
},
state: QueuedState {
queued_at: current_timestamp(),
},
}
}
}
pub fn want_status_matches_any(
pds: &Vec<Option<PartitionDetail>>,
status: PartitionStatusCode,

View file

@ -1,4 +1,4 @@
use crate::job_run::{JobRun, SubProcessBackend};
use crate::job_run::{JobRunHandle, SubProcessBackend};
use crate::util::DatabuildError;
use crate::{JobConfig, PartitionRef, WantDetail};
use regex::Regex;
@ -15,13 +15,13 @@ impl JobConfiguration {
pub fn spawn(
&self,
wants: Vec<WantDetail>,
) -> Result<JobRun<SubProcessBackend>, std::io::Error> {
) -> Result<JobRunHandle<SubProcessBackend>, std::io::Error> {
let wanted_refs: Vec<PartitionRef> = wants
.iter()
.flat_map(|want| want.partitions.clone())
.collect();
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
Ok(JobRun::spawn(self.entry_point.clone(), args))
Ok(JobRunHandle::spawn(self.entry_point.clone(), args))
}
pub fn matches(&self, refs: &PartitionRef) -> bool {

View file

@ -55,34 +55,34 @@ pub enum PollResult<C, F, D> {
}
// ===== TYPE-SAFE STATE MACHINE PATTERN =====
// Uses parameterized JobRunWithState wrapped in JobRun enum for storage
// Uses parameterized JobRunHandleWithState wrapped in JobRun enum for storage
/// New JobRun with embedded state enum
/// Type-safe job run struct, parameterized by backend and state
/// This struct can only perform operations valid for its current state type
pub struct JobRunWithState<B: JobRunBackend, S> {
/// JobRunHandle with embedded state enum
/// Type-safe job run handle struct, parameterized by backend and state
/// This struct manages the actual running process/execution and can only perform operations valid for its current state type
pub struct JobRunHandleWithState<B: JobRunBackend, S> {
pub job_run_id: Uuid,
pub state: S,
pub _backend: PhantomData<B>,
}
/// Wrapper enum for storing job runs in a single collection
/// Wrapper enum for storing job run handles in a single collection
/// This allows us to store jobs in different states together while maintaining type safety
pub enum JobRun<B: JobRunBackend> {
NotStarted(JobRunWithState<B, B::NotStartedState>),
Running(JobRunWithState<B, B::RunningState>),
Completed(JobRunWithState<B, B::CompletedState>),
Failed(JobRunWithState<B, B::FailedState>),
Canceled(JobRunWithState<B, B::CanceledState>),
DepMiss(JobRunWithState<B, B::DepMissState>),
pub enum JobRunHandle<B: JobRunBackend> {
NotStarted(JobRunHandleWithState<B, B::NotStartedState>),
Running(JobRunHandleWithState<B, B::RunningState>),
Completed(JobRunHandleWithState<B, B::CompletedState>),
Failed(JobRunHandleWithState<B, B::FailedState>),
Canceled(JobRunHandleWithState<B, B::CanceledState>),
DepMiss(JobRunHandleWithState<B, B::DepMissState>),
}
/// Result of visiting a running job - returns the typed states
pub enum VisitResult<B: JobRunBackend> {
StillRunning(JobRunWithState<B, B::RunningState>),
Completed(JobRunWithState<B, B::CompletedState>),
Failed(JobRunWithState<B, B::FailedState>),
DepMiss(JobRunWithState<B, B::DepMissState>),
StillRunning(JobRunHandleWithState<B, B::RunningState>),
Completed(JobRunHandleWithState<B, B::CompletedState>),
Failed(JobRunHandleWithState<B, B::FailedState>),
DepMiss(JobRunHandleWithState<B, B::DepMissState>),
}
pub enum JobRunConfig {
@ -297,11 +297,11 @@ pub struct JobRunPollResult {
// ===== Type-Safe State Transition Implementation =====
// Factory and helper methods on the JobRun enum
impl<B: JobRunBackend> JobRun<B> {
// Factory and helper methods on the JobRunHandle enum
impl<B: JobRunBackend> JobRunHandle<B> {
/// Create a new job run in the NotStarted state
pub fn spawn(entry_point: String, args: Vec<String>) -> Self {
JobRun::NotStarted(JobRunWithState {
JobRunHandle::NotStarted(JobRunHandleWithState {
job_run_id: Uuid::new_v4(),
state: B::create(entry_point, args),
_backend: PhantomData,
@ -311,12 +311,12 @@ impl<B: JobRunBackend> JobRun<B> {
/// Get the job run ID regardless of state
pub fn job_run_id(&self) -> &Uuid {
match self {
JobRun::NotStarted(j) => &j.job_run_id,
JobRun::Running(j) => &j.job_run_id,
JobRun::Completed(j) => &j.job_run_id,
JobRun::Failed(j) => &j.job_run_id,
JobRun::Canceled(j) => &j.job_run_id,
JobRun::DepMiss(j) => &j.job_run_id,
JobRunHandle::NotStarted(j) => &j.job_run_id,
JobRunHandle::Running(j) => &j.job_run_id,
JobRunHandle::Completed(j) => &j.job_run_id,
JobRunHandle::Failed(j) => &j.job_run_id,
JobRunHandle::Canceled(j) => &j.job_run_id,
JobRunHandle::DepMiss(j) => &j.job_run_id,
}
}
@ -324,20 +324,23 @@ impl<B: JobRunBackend> JobRun<B> {
pub fn is_terminal(&self) -> bool {
matches!(
self,
JobRun::Completed(_) | JobRun::Failed(_) | JobRun::Canceled(_) | JobRun::DepMiss(_)
JobRunHandle::Completed(_)
| JobRunHandle::Failed(_)
| JobRunHandle::Canceled(_)
| JobRunHandle::DepMiss(_)
)
}
}
// Type-safe transition: NotStarted -> Running
// This method can ONLY be called on NotStarted jobs - compile error otherwise!
impl<B: JobRunBackend> JobRunWithState<B, B::NotStartedState> {
impl<B: JobRunBackend> JobRunHandleWithState<B, B::NotStartedState> {
pub fn run(
self,
env: Option<HashMap<String, String>>,
) -> Result<JobRunWithState<B, B::RunningState>, DatabuildError> {
) -> Result<JobRunHandleWithState<B, B::RunningState>, DatabuildError> {
let running = B::start(self.state, env)?;
Ok(JobRunWithState {
Ok(JobRunHandleWithState {
job_run_id: self.job_run_id,
state: running,
_backend: PhantomData,
@ -347,21 +350,21 @@ impl<B: JobRunBackend> JobRunWithState<B, B::NotStartedState> {
// Type-safe transition: Running -> (Running | Completed | Failed | DepMiss)
// This method can ONLY be called on Running jobs - compile error otherwise!
impl<B: JobRunBackend> JobRunWithState<B, B::RunningState> {
impl<B: JobRunBackend> JobRunHandleWithState<B, B::RunningState> {
pub fn visit(mut self) -> Result<VisitResult<B>, DatabuildError> {
match B::poll(&mut self.state)? {
PollResult::StillRunning => Ok(VisitResult::StillRunning(self)),
PollResult::Completed(completed) => Ok(VisitResult::Completed(JobRunWithState {
PollResult::Completed(completed) => Ok(VisitResult::Completed(JobRunHandleWithState {
job_run_id: self.job_run_id,
state: completed,
_backend: PhantomData,
})),
PollResult::Failed(failed) => Ok(VisitResult::Failed(JobRunWithState {
PollResult::Failed(failed) => Ok(VisitResult::Failed(JobRunHandleWithState {
job_run_id: self.job_run_id,
state: failed,
_backend: PhantomData,
})),
PollResult::DepMiss(dep_miss) => Ok(VisitResult::DepMiss(JobRunWithState {
PollResult::DepMiss(dep_miss) => Ok(VisitResult::DepMiss(JobRunHandleWithState {
job_run_id: self.job_run_id,
state: dep_miss,
_backend: PhantomData,
@ -372,9 +375,9 @@ impl<B: JobRunBackend> JobRunWithState<B, B::RunningState> {
pub fn cancel(
self,
source: EventSource,
) -> Result<JobRunWithState<B, B::CanceledState>, DatabuildError> {
) -> Result<JobRunHandleWithState<B, B::CanceledState>, DatabuildError> {
let canceled = B::cancel_job(self.state, source)?;
Ok(JobRunWithState {
Ok(JobRunHandleWithState {
job_run_id: self.job_run_id,
state: canceled,
_backend: PhantomData,
@ -417,7 +420,7 @@ impl ToEvent for SubProcessDepMiss {
mod tests {
use crate::data_build_event::Event;
use crate::data_deps::DATABUILD_MISSING_DEPS_JSON;
use crate::job_run::{JobRun, JobRunBackend, SubProcessBackend, VisitResult};
use crate::job_run::{JobRunBackend, JobRunHandle, SubProcessBackend, VisitResult};
use crate::mock_job_run::MockJobRun;
use crate::{JobRunMissingDeps, MissingDeps};
@ -425,11 +428,11 @@ mod tests {
#[test]
fn test_job_run_success_returns_job_run_success_event() {
// Spawn a job run that will succeed (exit code 0)
let job_run = JobRun::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
let job_run = JobRunHandle::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
// Start the job - this consumes the NotStarted and returns Running
let running_job = match job_run {
JobRun::NotStarted(not_started) => not_started.run(None).unwrap(),
JobRunHandle::NotStarted(not_started) => not_started.run(None).unwrap(),
_ => panic!("Expected NotStarted job"),
};
@ -463,12 +466,12 @@ mod tests {
#[test]
fn test_job_run_failure_returns_job_run_failure_event() {
// Spawn a job run
let job_run = JobRun::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
let job_run = JobRunHandle::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
// Start the job with an exit code that indicates failure (non-zero)
let env = MockJobRun::new().exit_code(1).to_env();
let running_job = match job_run {
JobRun::NotStarted(not_started) => not_started.run(Some(env)).unwrap(),
JobRunHandle::NotStarted(not_started) => not_started.run(Some(env)).unwrap(),
_ => panic!("Expected NotStarted job"),
};
@ -511,7 +514,7 @@ mod tests {
let temp_file = format!("/tmp/databuild_test_cancel_{}", Uuid::new_v4());
// Spawn a job run that will sleep for 1 second and write a file
let job_run = JobRun::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
let job_run = JobRunHandle::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
let env = MockJobRun::new()
.sleep_ms(1000)
@ -519,7 +522,7 @@ mod tests {
.exit_code(0)
.to_env();
let running_job = match job_run {
JobRun::NotStarted(not_started) => not_started.run(Some(env)).unwrap(),
JobRunHandle::NotStarted(not_started) => not_started.run(Some(env)).unwrap(),
_ => panic!("Expected NotStarted job"),
};
@ -558,7 +561,7 @@ mod tests {
#[test]
fn test_job_run_fail_on_missing_deps_should_emit_missing_deps_event() {
// Spawn a job run that will sleep for 1 second and write a file
let job_run = JobRun::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
let job_run = JobRunHandle::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
let expected_dep_miss = JobRunMissingDeps {
version: "1".into(),
@ -575,7 +578,7 @@ mod tests {
.exit_code(1)
.to_env();
let running_job = match job_run {
JobRun::NotStarted(not_started) => not_started.run(Some(env)).unwrap(),
JobRunHandle::NotStarted(not_started) => not_started.run(Some(env)).unwrap(),
_ => panic!("Expected NotStarted job"),
};

342
databuild/job_run_state.rs Normal file
View file

@ -0,0 +1,342 @@
use crate::partition_state::{BuildingPartitionRef, FailedPartitionRef, LivePartitionRef};
use crate::util::current_timestamp;
use crate::{
EventSource, JobRunDetail, JobRunStatusCode, MissingDeps, PartitionRef, ReadDeps,
WantAttributedPartitions,
};
/// State: Job has been queued but not yet started
#[derive(Debug, Clone)]
pub struct QueuedState {
pub queued_at: u64,
}
/// State: Job is currently running
#[derive(Debug, Clone)]
pub struct RunningState {
pub started_at: u64,
pub last_heartbeat_at: u64, // NOT optional, defaults to started_at
}
/// State: Job completed successfully
#[derive(Debug, Clone)]
pub struct SucceededState {
pub completed_at: u64,
}
/// State: Job failed during execution
#[derive(Debug, Clone)]
pub struct FailedState {
pub failed_at: u64,
pub failure_reason: String,
}
/// State: Job detected missing dependencies
#[derive(Debug, Clone)]
pub struct DepMissState {
pub detected_at: u64,
pub missing_deps: Vec<MissingDeps>,
pub read_deps: Vec<ReadDeps>,
}
/// State: Job was explicitly canceled
#[derive(Debug, Clone)]
pub struct CanceledState {
pub canceled_at: u64,
pub source: Option<EventSource>,
pub comment: String,
}
/// Shared information across all job run states
#[derive(Debug, Clone)]
pub struct JobInfo {
pub id: String,
pub building_partitions: Vec<PartitionRef>,
pub servicing_wants: Vec<WantAttributedPartitions>,
}
/// Generic job run struct parameterized by state
#[derive(Debug, Clone)]
pub struct JobRunWithState<S> {
pub info: JobInfo,
pub state: S,
}
/// Wrapper enum for storing job runs in collections
#[derive(Debug, Clone)]
pub enum JobRun {
Queued(JobRunWithState<QueuedState>),
Running(JobRunWithState<RunningState>),
Succeeded(JobRunWithState<SucceededState>),
Failed(JobRunWithState<FailedState>),
DepMiss(JobRunWithState<DepMissState>),
Canceled(JobRunWithState<CanceledState>),
}
// ==================== State Transitions ====================
impl JobRunWithState<QueuedState> {
/// Transition from Queued to Running
pub fn start_running(self, timestamp: u64) -> JobRunWithState<RunningState> {
JobRunWithState {
info: self.info,
state: RunningState {
started_at: timestamp,
last_heartbeat_at: timestamp, // Initialize to start time
},
}
}
/// Transition from Queued to Canceled (canceled before starting)
pub fn cancel(
self,
timestamp: u64,
source: Option<EventSource>,
comment: String,
) -> JobRunWithState<CanceledState> {
JobRunWithState {
info: self.info,
state: CanceledState {
canceled_at: timestamp,
source,
comment,
},
}
}
}
impl JobRunWithState<RunningState> {
/// Update heartbeat timestamp (non-consuming)
pub fn heartbeat(mut self, timestamp: u64) -> Self {
self.state.last_heartbeat_at = timestamp;
self
}
/// Transition from Running to Succeeded
pub fn succeed(self, timestamp: u64) -> JobRunWithState<SucceededState> {
JobRunWithState {
info: self.info,
state: SucceededState {
completed_at: timestamp,
},
}
}
/// Transition from Running to Failed
pub fn fail(self, timestamp: u64, reason: String) -> JobRunWithState<FailedState> {
JobRunWithState {
info: self.info,
state: FailedState {
failed_at: timestamp,
failure_reason: reason,
},
}
}
/// Transition from Running to DepMiss
pub fn dep_miss(
self,
timestamp: u64,
missing_deps: Vec<MissingDeps>,
read_deps: Vec<ReadDeps>,
) -> JobRunWithState<DepMissState> {
JobRunWithState {
info: self.info,
state: DepMissState {
detected_at: timestamp,
missing_deps,
read_deps,
},
}
}
/// Transition from Running to Canceled
pub fn cancel(
self,
timestamp: u64,
source: Option<EventSource>,
comment: String,
) -> JobRunWithState<CanceledState> {
JobRunWithState {
info: self.info,
state: CanceledState {
canceled_at: timestamp,
source,
comment,
},
}
}
}
// ==================== Type-Safe Job Run IDs ====================
/// Type-safe job run ID wrappers that encode state expectations in function signatures.
/// These should be created ephemerally from typestate objects via .get_id() and used immediately—never stored long-term.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QueuedJobRunId(pub String);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RunningJobRunId(pub String);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SucceededJobRunId(pub String);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FailedJobRunId(pub String);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DepMissJobRunId(pub String);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CanceledJobRunId(pub String);
// ==================== State-Specific Methods ====================
impl JobRunWithState<QueuedState> {
pub fn get_id(&self) -> QueuedJobRunId {
QueuedJobRunId(self.info.id.clone())
}
}
impl JobRunWithState<RunningState> {
pub fn get_id(&self) -> RunningJobRunId {
RunningJobRunId(self.info.id.clone())
}
/// Currently building these partitions
/// Job run running state is the SOURCE of truth that these partitions are building
pub fn get_building_partitions(&self) -> Vec<BuildingPartitionRef> {
self.info
.building_partitions
.iter()
.map(|p| BuildingPartitionRef(p.clone()))
.collect()
}
pub fn get_last_heartbeat(&self) -> u64 {
self.state.last_heartbeat_at
}
}
impl JobRunWithState<SucceededState> {
pub fn get_id(&self) -> SucceededJobRunId {
SucceededJobRunId(self.info.id.clone())
}
/// Job run success is the SOURCE of truth that these partitions are live
pub fn get_completed_partitions(&self) -> Vec<LivePartitionRef> {
self.info
.building_partitions
.iter()
.map(|p| LivePartitionRef(p.clone()))
.collect()
}
}
impl JobRunWithState<FailedState> {
pub fn get_id(&self) -> FailedJobRunId {
FailedJobRunId(self.info.id.clone())
}
/// Job run failure is the SOURCE of truth that these partitions failed
pub fn get_failed_partitions(&self) -> Vec<FailedPartitionRef> {
self.info
.building_partitions
.iter()
.map(|p| FailedPartitionRef(p.clone()))
.collect()
}
pub fn get_failure_reason(&self) -> &str {
&self.state.failure_reason
}
}
impl JobRunWithState<DepMissState> {
pub fn get_id(&self) -> DepMissJobRunId {
DepMissJobRunId(self.info.id.clone())
}
/// Job run dep miss means building partitions should reset to Missing
pub fn get_building_partitions_to_reset(&self) -> Vec<BuildingPartitionRef> {
self.info
.building_partitions
.iter()
.map(|p| BuildingPartitionRef(p.clone()))
.collect()
}
pub fn get_missing_deps(&self) -> &[MissingDeps] {
&self.state.missing_deps
}
pub fn get_read_deps(&self) -> &[ReadDeps] {
&self.state.read_deps
}
}
impl JobRunWithState<CanceledState> {
pub fn get_id(&self) -> CanceledJobRunId {
CanceledJobRunId(self.info.id.clone())
}
/// Canceled job means building partitions should reset to Missing
pub fn get_building_partitions_to_reset(&self) -> Vec<BuildingPartitionRef> {
self.info
.building_partitions
.iter()
.map(|p| BuildingPartitionRef(p.clone()))
.collect()
}
}
// ==================== Conversion to JobRunDetail for API ====================
impl JobRun {
pub fn to_detail(&self) -> JobRunDetail {
match self {
JobRun::Queued(queued) => JobRunDetail {
id: queued.info.id.clone(),
status: Some(JobRunStatusCode::JobRunQueued.into()),
last_heartbeat_at: None,
building_partitions: queued.info.building_partitions.clone(),
servicing_wants: queued.info.servicing_wants.clone(),
},
JobRun::Running(running) => JobRunDetail {
id: running.info.id.clone(),
status: Some(JobRunStatusCode::JobRunRunning.into()),
last_heartbeat_at: Some(running.state.last_heartbeat_at),
building_partitions: running.info.building_partitions.clone(),
servicing_wants: running.info.servicing_wants.clone(),
},
JobRun::Succeeded(succeeded) => JobRunDetail {
id: succeeded.info.id.clone(),
status: Some(JobRunStatusCode::JobRunSucceeded.into()),
last_heartbeat_at: None,
building_partitions: succeeded.info.building_partitions.clone(),
servicing_wants: succeeded.info.servicing_wants.clone(),
},
JobRun::Failed(failed) => JobRunDetail {
id: failed.info.id.clone(),
status: Some(JobRunStatusCode::JobRunFailed.into()),
last_heartbeat_at: None,
building_partitions: failed.info.building_partitions.clone(),
servicing_wants: failed.info.servicing_wants.clone(),
},
JobRun::DepMiss(dep_miss) => JobRunDetail {
id: dep_miss.info.id.clone(),
status: Some(JobRunStatusCode::JobRunDepMiss.into()),
last_heartbeat_at: None,
building_partitions: dep_miss.info.building_partitions.clone(),
servicing_wants: dep_miss.info.servicing_wants.clone(),
},
JobRun::Canceled(canceled) => JobRunDetail {
id: canceled.info.id.clone(),
status: Some(JobRunStatusCode::JobRunCanceled.into()),
last_heartbeat_at: None,
building_partitions: canceled.info.building_partitions.clone(),
servicing_wants: canceled.info.servicing_wants.clone(),
},
}
}
}

View file

@ -4,6 +4,7 @@ mod data_deps;
mod event_transforms;
mod job;
mod job_run;
mod job_run_state;
mod mock_job_run;
mod orchestrator;
mod partition_state;

View file

@ -20,7 +20,7 @@ JTBDs:
struct Orchestrator<S: BELStorage + Debug> {
pub bel: BuildEventLog<S>,
pub config: OrchestratorConfig,
pub job_runs: Vec<crate::job_run::JobRun<SubProcessBackend>>,
pub job_runs: Vec<crate::job_run::JobRunHandle<SubProcessBackend>>,
}
impl Default for Orchestrator<MemoryBELStorage> {
@ -125,12 +125,25 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
}
fn schedule_queued_jobs(&mut self) -> Result<(), DatabuildError> {
use crate::job_run::JobRun;
use crate::JobRunHeartbeatEventV1;
use crate::data_build_event::Event;
use crate::job_run::JobRunHandle;
let mut new_jobs = Vec::new();
for job in self.job_runs.drain(..) {
let transitioned = match job {
JobRun::NotStarted(not_started) => JobRun::Running(not_started.run(None)?),
JobRunHandle::NotStarted(not_started) => {
let job_run_id = not_started.job_run_id.clone();
let running = not_started.run(None)?;
// Emit heartbeat event to notify BuildState that job is now running
let heartbeat_event = Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 {
job_run_id: job_run_id.to_string(),
});
self.bel.append_event(&heartbeat_event)?;
JobRunHandle::Running(running)
}
other => other, // Pass through all other states unchanged
};
new_jobs.push(transitioned);
@ -142,36 +155,41 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
/// Visits individual job runs, appending resulting events, and moving runs between run status
/// containers. Either jobs are still running, or they are moved to terminal states.
fn poll_job_runs(&mut self) -> Result<(), DatabuildError> {
use crate::job_run::{JobRun, VisitResult};
use crate::job_run::{JobRunHandle, VisitResult};
self.schedule_queued_jobs()?;
// TODO: Emit periodic JobRunHeartbeatEventV1 events for long-running jobs
// Currently we emit one heartbeat when starting (in schedule_queued_jobs), but we should
// also emit periodic heartbeats for long-running jobs to detect stalls
// Need to decide on heartbeat frequency (e.g., every N polls or based on time elapsed)
// Visit all running jobs using type-safe transitions
let mut new_jobs = Vec::new();
for job in self.job_runs.drain(..) {
let transitioned = match job {
JobRun::Running(running) => match running.visit()? {
JobRunHandle::Running(running) => match running.visit()? {
VisitResult::StillRunning(still_running) => {
println!("Still running job: {:?}", still_running.job_run_id);
JobRun::Running(still_running)
JobRunHandle::Running(still_running)
}
VisitResult::Completed(completed) => {
println!("Completed job: {:?}", completed.job_run_id);
let event = completed.state.to_event(&completed.job_run_id);
self.bel.append_event(&event)?;
JobRun::Completed(completed)
JobRunHandle::Completed(completed)
}
VisitResult::Failed(failed) => {
println!("Failed job: {:?}", failed.job_run_id);
let event = failed.state.to_event(&failed.job_run_id);
self.bel.append_event(&event)?;
JobRun::Failed(failed)
JobRunHandle::Failed(failed)
}
VisitResult::DepMiss(dep_miss) => {
println!("Dep miss job: {:?}", dep_miss.job_run_id);
let event = dep_miss.state.to_event(&dep_miss.job_run_id);
self.bel.append_event(&event)?;
JobRun::DepMiss(dep_miss)
JobRunHandle::DepMiss(dep_miss)
}
},
other => other, // Pass through all non-running states unchanged
@ -225,7 +243,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
}
fn queue_job(&mut self, wg: WantGroup) -> Result<(), DatabuildError> {
use crate::job_run::JobRun;
use crate::job_run::JobRunHandle;
// Compute args from wants the same way JobConfiguration::spawn() does
let wanted_refs: Vec<crate::PartitionRef> = wg
@ -234,7 +252,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
.flat_map(|want| want.partitions.clone())
.collect();
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
let job_run = JobRun::spawn(wg.job.entry_point.clone(), args);
let job_run = JobRunHandle::spawn(wg.job.entry_point.clone(), args);
// Create job run buffer event
let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 {
@ -263,10 +281,10 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
// Helper methods for tests to count jobs by state
#[cfg(test)]
fn count_running_jobs(&self) -> usize {
use crate::job_run::JobRun;
use crate::job_run::JobRunHandle;
self.job_runs
.iter()
.filter(|j| matches!(j, JobRun::Running(_)))
.filter(|j| matches!(j, JobRunHandle::Running(_)))
.count()
}
@ -277,28 +295,28 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
#[cfg(test)]
fn count_not_started_jobs(&self) -> usize {
use crate::job_run::JobRun;
use crate::job_run::JobRunHandle;
self.job_runs
.iter()
.filter(|j| matches!(j, JobRun::NotStarted(_)))
.filter(|j| matches!(j, JobRunHandle::NotStarted(_)))
.count()
}
#[cfg(test)]
fn count_dep_miss_jobs(&self) -> usize {
use crate::job_run::JobRun;
use crate::job_run::JobRunHandle;
self.job_runs
.iter()
.filter(|j| matches!(j, JobRun::DepMiss(_)))
.filter(|j| matches!(j, JobRunHandle::DepMiss(_)))
.count()
}
#[cfg(test)]
fn count_completed_jobs(&self) -> usize {
use crate::job_run::JobRun;
use crate::job_run::JobRunHandle;
self.job_runs
.iter()
.filter(|j| matches!(j, JobRun::Completed(_)))
.filter(|j| matches!(j, JobRunHandle::Completed(_)))
.count()
}
@ -444,13 +462,13 @@ mod tests {
// Should schedule alpha job
assert_eq!(orchestrator.count_not_started_jobs(), 1);
// Verify the job has the right args by checking the first NotStarted job
use crate::job_run::JobRun;
use crate::job_run::JobRunHandle;
let not_started_job = orchestrator
.job_runs
.iter()
.find(|j| matches!(j, JobRun::NotStarted(_)))
.find(|j| matches!(j, JobRunHandle::NotStarted(_)))
.unwrap();
if let JobRun::NotStarted(job) = not_started_job {
if let JobRunHandle::NotStarted(job) = not_started_job {
assert_eq!(
job.state.args,
vec!["data/alpha"],