Compare commits
2 commits
eb44350865
...
2084fadbb6
| Author | SHA1 | Date | |
|---|---|---|---|
| 2084fadbb6 | |||
| cf163b294d |
9 changed files with 848 additions and 135 deletions
|
|
@ -1,8 +1,9 @@
|
||||||
use crate::data_build_event::Event;
|
use crate::data_build_event::Event;
|
||||||
use crate::data_deps::{WantTimestamps, missing_deps_to_want_events};
|
use crate::data_deps::{WantTimestamps, missing_deps_to_want_events};
|
||||||
|
use crate::job_run_state::{JobRun, JobRunWithState, QueuedState as JobQueuedState};
|
||||||
use crate::partition_state::{
|
use crate::partition_state::{
|
||||||
FailedPartitionRef, LivePartitionRef, MissingPartitionRef, MissingState, Partition,
|
BuildingPartitionRef, FailedPartitionRef, LivePartitionRef, MissingPartitionRef, MissingState,
|
||||||
PartitionWithState, TaintedPartitionRef,
|
Partition, PartitionWithState, TaintedPartitionRef,
|
||||||
};
|
};
|
||||||
use crate::util::current_timestamp;
|
use crate::util::current_timestamp;
|
||||||
use crate::want_state::{
|
use crate::want_state::{
|
||||||
|
|
@ -49,7 +50,7 @@ pub struct BuildState {
|
||||||
wants: BTreeMap<String, Want>, // Type-safe want storage
|
wants: BTreeMap<String, Want>, // Type-safe want storage
|
||||||
taints: BTreeMap<String, TaintDetail>,
|
taints: BTreeMap<String, TaintDetail>,
|
||||||
partitions: BTreeMap<String, Partition>, // Type-safe partition storage
|
partitions: BTreeMap<String, Partition>, // Type-safe partition storage
|
||||||
job_runs: BTreeMap<String, JobRunDetail>,
|
job_runs: BTreeMap<String, JobRun>, // Type-safe job run storage
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for BuildState {
|
impl Default for BuildState {
|
||||||
|
|
@ -89,11 +90,11 @@ impl BuildState {
|
||||||
/// Used when a job run starts building partitions
|
/// Used when a job run starts building partitions
|
||||||
fn transition_partitions_to_building(
|
fn transition_partitions_to_building(
|
||||||
&mut self,
|
&mut self,
|
||||||
partition_refs: &[PartitionRef],
|
partition_refs: &[BuildingPartitionRef],
|
||||||
job_run_id: &str,
|
job_run_id: &str,
|
||||||
) {
|
) {
|
||||||
for pref in partition_refs {
|
for building_ref in partition_refs {
|
||||||
if let Some(partition) = self.partitions.remove(&pref.r#ref) {
|
if let Some(partition) = self.partitions.remove(&building_ref.0.r#ref) {
|
||||||
// Partition exists - transition based on current state
|
// Partition exists - transition based on current state
|
||||||
let transitioned = match partition {
|
let transitioned = match partition {
|
||||||
// Valid: Missing -> Building
|
// Valid: Missing -> Building
|
||||||
|
|
@ -104,18 +105,19 @@ impl BuildState {
|
||||||
_ => {
|
_ => {
|
||||||
panic!(
|
panic!(
|
||||||
"BUG: Invalid state - partition {} cannot start building from state {:?}",
|
"BUG: Invalid state - partition {} cannot start building from state {:?}",
|
||||||
pref.r#ref, partition
|
building_ref.0.r#ref, partition
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
self.partitions.insert(pref.r#ref.clone(), transitioned);
|
self.partitions
|
||||||
|
.insert(building_ref.0.r#ref.clone(), transitioned);
|
||||||
} else {
|
} else {
|
||||||
// Partition doesn't exist yet - create in Missing then transition to Building
|
// Partition doesn't exist yet - create in Missing then transition to Building
|
||||||
let missing = Partition::new_missing(pref.clone());
|
let missing = Partition::new_missing(building_ref.0.clone());
|
||||||
if let Partition::Missing(m) = missing {
|
if let Partition::Missing(m) = missing {
|
||||||
let building = m.start_building(job_run_id.to_string());
|
let building = m.start_building(job_run_id.to_string());
|
||||||
self.partitions
|
self.partitions
|
||||||
.insert(pref.r#ref.clone(), Partition::Building(building));
|
.insert(building_ref.0.r#ref.clone(), Partition::Building(building));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -185,12 +187,15 @@ impl BuildState {
|
||||||
|
|
||||||
/// Reset partitions from Building back to Missing state
|
/// Reset partitions from Building back to Missing state
|
||||||
/// Used when a job run encounters missing dependencies and cannot proceed
|
/// Used when a job run encounters missing dependencies and cannot proceed
|
||||||
fn reset_partitions_to_missing(&mut self, partition_refs: &[PartitionRef]) {
|
fn reset_partitions_to_missing(&mut self, partition_refs: &[BuildingPartitionRef]) {
|
||||||
for pref in partition_refs {
|
for building_ref in partition_refs {
|
||||||
let partition = self.partitions.remove(&pref.r#ref).expect(&format!(
|
let partition = self
|
||||||
"BUG: Partition {} must exist and be in Building state during dep_miss",
|
.partitions
|
||||||
pref.r#ref
|
.remove(&building_ref.0.r#ref)
|
||||||
));
|
.expect(&format!(
|
||||||
|
"BUG: Partition {} must exist and be in Building state during dep_miss",
|
||||||
|
building_ref.0.r#ref
|
||||||
|
));
|
||||||
|
|
||||||
// Only valid transition: Building -> Missing
|
// Only valid transition: Building -> Missing
|
||||||
let transitioned = match partition {
|
let transitioned = match partition {
|
||||||
|
|
@ -199,11 +204,12 @@ impl BuildState {
|
||||||
_ => {
|
_ => {
|
||||||
panic!(
|
panic!(
|
||||||
"BUG: Invalid state - partition {} must be Building during dep_miss, found {:?}",
|
"BUG: Invalid state - partition {} must be Building during dep_miss, found {:?}",
|
||||||
pref.r#ref, partition
|
building_ref.0.r#ref, partition
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
self.partitions.insert(pref.r#ref.clone(), transitioned);
|
self.partitions
|
||||||
|
.insert(building_ref.0.r#ref.clone(), transitioned);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -632,8 +638,8 @@ impl BuildState {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create job run to be inserted
|
// Create job run in Queued state
|
||||||
let job_run: JobRunDetail = event.clone().into();
|
let queued: JobRunWithState<JobQueuedState> = event.clone().into();
|
||||||
|
|
||||||
// Transition wants to Building
|
// Transition wants to Building
|
||||||
// Valid states when job buffer event arrives:
|
// Valid states when job buffer event arrives:
|
||||||
|
|
@ -642,7 +648,7 @@ impl BuildState {
|
||||||
// Invalid states (panic - indicates orchestrator bug):
|
// Invalid states (panic - indicates orchestrator bug):
|
||||||
// - UpstreamBuilding: Not schedulable, waiting for dependencies
|
// - UpstreamBuilding: Not schedulable, waiting for dependencies
|
||||||
// - Successful/Failed/UpstreamFailed/Canceled: Terminal states, not schedulable
|
// - Successful/Failed/UpstreamFailed/Canceled: Terminal states, not schedulable
|
||||||
for wap in &job_run.servicing_wants {
|
for wap in &queued.info.servicing_wants {
|
||||||
let want = self.wants.remove(&wap.want_id).expect(&format!(
|
let want = self.wants.remove(&wap.want_id).expect(&format!(
|
||||||
"BUG: Want {} must exist when job buffer event received",
|
"BUG: Want {} must exist when job buffer event received",
|
||||||
wap.want_id
|
wap.want_id
|
||||||
|
|
@ -668,41 +674,64 @@ impl BuildState {
|
||||||
self.wants.insert(wap.want_id.clone(), transitioned);
|
self.wants.insert(wap.want_id.clone(), transitioned);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get building partition refs from queued job - job is source of truth for building partitions
|
||||||
|
let building_refs: Vec<BuildingPartitionRef> = queued
|
||||||
|
.info
|
||||||
|
.building_partitions
|
||||||
|
.iter()
|
||||||
|
.map(|p| BuildingPartitionRef(p.clone()))
|
||||||
|
.collect();
|
||||||
|
|
||||||
// Transition partitions to Building state
|
// Transition partitions to Building state
|
||||||
self.transition_partitions_to_building(&job_run.building_partitions, &event.job_run_id);
|
self.transition_partitions_to_building(&building_refs, &event.job_run_id);
|
||||||
|
|
||||||
self.job_runs
|
self.job_runs
|
||||||
.insert(event.job_run_id.clone(), job_run.clone());
|
.insert(event.job_run_id.clone(), JobRun::Queued(queued));
|
||||||
println!("Inserted job run: {:?}", job_run);
|
|
||||||
vec![]
|
vec![]
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_job_run_status(&mut self, job_run_id: &str, status: JobRunStatusCode) {
|
|
||||||
let job_run = self.job_runs.get_mut(job_run_id).expect(&format!(
|
|
||||||
"BUG: Job run ID {} must exist to update status",
|
|
||||||
job_run_id
|
|
||||||
));
|
|
||||||
job_run.last_heartbeat_at = Some(current_timestamp());
|
|
||||||
job_run.status = Some(status.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_job_run_heartbeat(&mut self, event: &JobRunHeartbeatEventV1) -> Vec<Event> {
|
fn handle_job_run_heartbeat(&mut self, event: &JobRunHeartbeatEventV1) -> Vec<Event> {
|
||||||
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunRunning);
|
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
|
||||||
|
"BUG: Job run {} must exist when heartbeat received",
|
||||||
|
event.job_run_id
|
||||||
|
));
|
||||||
|
|
||||||
|
let running = match job_run {
|
||||||
|
// First heartbeat: Queued -> Running
|
||||||
|
JobRun::Queued(queued) => queued.start_running(current_timestamp()),
|
||||||
|
// Subsequent heartbeat: update timestamp
|
||||||
|
JobRun::Running(running) => running.heartbeat(current_timestamp()),
|
||||||
|
_ => {
|
||||||
|
panic!(
|
||||||
|
"BUG: Heartbeat received for job run {} in invalid state {:?}",
|
||||||
|
event.job_run_id, job_run
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
self.job_runs
|
||||||
|
.insert(event.job_run_id.clone(), JobRun::Running(running));
|
||||||
vec![]
|
vec![]
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_job_run_success(&mut self, event: &JobRunSuccessEventV1) -> Vec<Event> {
|
fn handle_job_run_success(&mut self, event: &JobRunSuccessEventV1) -> Vec<Event> {
|
||||||
println!("Job run success event: {:?}", event);
|
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
|
||||||
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunSucceeded);
|
"BUG: Job run {} must exist when success event received",
|
||||||
let job_run = self.get_job_run(&event.job_run_id).unwrap();
|
event.job_run_id
|
||||||
|
));
|
||||||
|
|
||||||
// Clone building_partitions before we use it multiple times
|
let succeeded = match job_run {
|
||||||
// TODO correct this explicit upcasting of partition ref type
|
JobRun::Running(running) => running.succeed(current_timestamp()),
|
||||||
let newly_live_partitions: Vec<LivePartitionRef> = job_run
|
_ => {
|
||||||
.building_partitions
|
panic!(
|
||||||
.iter()
|
"BUG: Success event received for job run {} in invalid state {:?}. Job must be Running to succeed.",
|
||||||
.map(|pref| LivePartitionRef(pref.clone()))
|
event.job_run_id, job_run
|
||||||
.collect();
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Job run success is SOURCE of truth that partitions are live
|
||||||
|
let newly_live_partitions = succeeded.get_completed_partitions();
|
||||||
|
|
||||||
// Update partitions being built by this job (strict type-safe transitions)
|
// Update partitions being built by this job (strict type-safe transitions)
|
||||||
self.transition_partitions_to_live(
|
self.transition_partitions_to_live(
|
||||||
|
|
@ -725,19 +754,29 @@ impl BuildState {
|
||||||
current_timestamp(),
|
current_timestamp(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
self.job_runs
|
||||||
|
.insert(event.job_run_id.clone(), JobRun::Succeeded(succeeded));
|
||||||
vec![]
|
vec![]
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_job_run_failure(&mut self, event: &JobRunFailureEventV1) -> Vec<Event> {
|
fn handle_job_run_failure(&mut self, event: &JobRunFailureEventV1) -> Vec<Event> {
|
||||||
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed);
|
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
|
||||||
let job_run = self.get_job_run(&event.job_run_id).unwrap();
|
"BUG: Job run {} must exist when failure event received",
|
||||||
|
event.job_run_id
|
||||||
|
));
|
||||||
|
|
||||||
// Clone building_partitions before we use it multiple times
|
let failed = match job_run {
|
||||||
let failed_partitions: Vec<FailedPartitionRef> = job_run
|
JobRun::Running(running) => running.fail(current_timestamp(), event.reason.clone()),
|
||||||
.building_partitions
|
_ => {
|
||||||
.iter()
|
panic!(
|
||||||
.map(|pref| FailedPartitionRef(pref.clone()))
|
"BUG: Failure event received for job run {} in invalid state {:?}. Job must be Running to fail.",
|
||||||
.collect();
|
event.job_run_id, job_run
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Job run failure is SOURCE of truth that partitions failed
|
||||||
|
let failed_partitions = failed.get_failed_partitions();
|
||||||
|
|
||||||
// Transition partitions using strict type-safe methods
|
// Transition partitions using strict type-safe methods
|
||||||
self.transition_partitions_to_failed(
|
self.transition_partitions_to_failed(
|
||||||
|
|
@ -753,20 +792,68 @@ impl BuildState {
|
||||||
// UpstreamBuilding → UpstreamFailed (for downstream wants waiting on newly failed wants)
|
// UpstreamBuilding → UpstreamFailed (for downstream wants waiting on newly failed wants)
|
||||||
self.cascade_failures_to_downstream_wants(&newly_failed_wants, current_timestamp());
|
self.cascade_failures_to_downstream_wants(&newly_failed_wants, current_timestamp());
|
||||||
|
|
||||||
|
self.job_runs
|
||||||
|
.insert(event.job_run_id.clone(), JobRun::Failed(failed));
|
||||||
vec![]
|
vec![]
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_job_run_cancel(&mut self, _event: &JobRunCancelEventV1) -> Vec<Event> {
|
fn handle_job_run_cancel(&mut self, event: &JobRunCancelEventV1) -> Vec<Event> {
|
||||||
todo!("should update already inserted job run, partition status, want status")
|
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
|
||||||
|
"BUG: Job run {} must exist when cancel event received",
|
||||||
|
event.job_run_id
|
||||||
|
));
|
||||||
|
|
||||||
|
let canceled = match job_run {
|
||||||
|
JobRun::Queued(queued) => queued.cancel(
|
||||||
|
current_timestamp(),
|
||||||
|
event.source.clone(),
|
||||||
|
event.comment.clone().unwrap_or_default(),
|
||||||
|
),
|
||||||
|
JobRun::Running(running) => running.cancel(
|
||||||
|
current_timestamp(),
|
||||||
|
event.source.clone(),
|
||||||
|
event.comment.clone().unwrap_or_default(),
|
||||||
|
),
|
||||||
|
_ => {
|
||||||
|
panic!(
|
||||||
|
"BUG: Cancel event received for job run {} in invalid state {:?}",
|
||||||
|
event.job_run_id, job_run
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Canceled job means building partitions should reset to Missing
|
||||||
|
let building_refs_to_reset = canceled.get_building_partitions_to_reset();
|
||||||
|
self.reset_partitions_to_missing(&building_refs_to_reset);
|
||||||
|
|
||||||
|
self.job_runs
|
||||||
|
.insert(event.job_run_id.clone(), JobRun::Canceled(canceled));
|
||||||
|
vec![]
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn handle_job_run_dep_miss(&mut self, event: &JobRunMissingDepsEventV1) -> Vec<Event> {
|
pub fn handle_job_run_dep_miss(&mut self, event: &JobRunMissingDepsEventV1) -> Vec<Event> {
|
||||||
let job_run_detail = self.get_job_run(&event.job_run_id).expect(&format!(
|
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
|
||||||
"BUG: Unable to find job run with id `{}`",
|
"BUG: Job run {} must exist when dep miss event received",
|
||||||
event.job_run_id
|
event.job_run_id
|
||||||
));
|
));
|
||||||
// Infer data/SLA timestamps from upstream want
|
|
||||||
let want_timestamps: WantTimestamps = job_run_detail
|
let dep_miss = match job_run {
|
||||||
|
JobRun::Running(running) => running.dep_miss(
|
||||||
|
current_timestamp(),
|
||||||
|
event.missing_deps.clone(),
|
||||||
|
event.read_deps.clone(),
|
||||||
|
),
|
||||||
|
_ => {
|
||||||
|
panic!(
|
||||||
|
"BUG: DepMiss event received for job run {} in invalid state {:?}. Job must be Running to hit dep miss.",
|
||||||
|
event.job_run_id, job_run
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Infer data/SLA timestamps from servicing wants
|
||||||
|
let want_timestamps: WantTimestamps = dep_miss
|
||||||
|
.info
|
||||||
.servicing_wants
|
.servicing_wants
|
||||||
.iter()
|
.iter()
|
||||||
.flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into()))
|
.flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into()))
|
||||||
|
|
@ -774,11 +861,12 @@ impl BuildState {
|
||||||
.expect("BUG: No servicing wants found");
|
.expect("BUG: No servicing wants found");
|
||||||
|
|
||||||
// Transition partitions back to Missing since this job can't build them yet
|
// Transition partitions back to Missing since this job can't build them yet
|
||||||
self.reset_partitions_to_missing(&job_run_detail.building_partitions);
|
let building_refs_to_reset = dep_miss.get_building_partitions_to_reset();
|
||||||
|
self.reset_partitions_to_missing(&building_refs_to_reset);
|
||||||
|
|
||||||
// Create wants from dep misses
|
// Create wants from dep misses
|
||||||
let want_events = missing_deps_to_want_events(
|
let want_events = missing_deps_to_want_events(
|
||||||
event.missing_deps.clone(),
|
dep_miss.get_missing_deps().to_vec(),
|
||||||
&event.job_run_id,
|
&event.job_run_id,
|
||||||
want_timestamps,
|
want_timestamps,
|
||||||
);
|
);
|
||||||
|
|
@ -794,11 +882,14 @@ impl BuildState {
|
||||||
|
|
||||||
// Transition servicing wants to UpstreamBuilding when they have missing dependencies
|
// Transition servicing wants to UpstreamBuilding when they have missing dependencies
|
||||||
self.transition_wants_to_upstream_building(
|
self.transition_wants_to_upstream_building(
|
||||||
&job_run_detail.servicing_wants,
|
&dep_miss.info.servicing_wants,
|
||||||
&event.missing_deps,
|
dep_miss.get_missing_deps(),
|
||||||
&partition_to_want_map,
|
&partition_to_want_map,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
self.job_runs
|
||||||
|
.insert(event.job_run_id.clone(), JobRun::DepMiss(dep_miss));
|
||||||
|
|
||||||
want_events
|
want_events
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -838,7 +929,7 @@ impl BuildState {
|
||||||
self.partitions.get(partition_id).map(|p| p.to_detail())
|
self.partitions.get(partition_id).map(|p| p.to_detail())
|
||||||
}
|
}
|
||||||
pub fn get_job_run(&self, job_run_id: &str) -> Option<JobRunDetail> {
|
pub fn get_job_run(&self, job_run_id: &str) -> Option<JobRunDetail> {
|
||||||
self.job_runs.get(job_run_id).cloned()
|
self.job_runs.get(job_run_id).map(|jr| jr.to_detail())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn list_wants(&self, request: &ListWantsRequest) -> ListWantsResponse {
|
pub fn list_wants(&self, request: &ListWantsRequest) -> ListWantsResponse {
|
||||||
|
|
@ -895,9 +986,19 @@ impl BuildState {
|
||||||
pub fn list_job_runs(&self, request: &ListJobRunsRequest) -> ListJobRunsResponse {
|
pub fn list_job_runs(&self, request: &ListJobRunsRequest) -> ListJobRunsResponse {
|
||||||
let page = request.page.unwrap_or(0);
|
let page = request.page.unwrap_or(0);
|
||||||
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
|
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
|
||||||
|
|
||||||
|
let start = page * page_size;
|
||||||
|
let data: Vec<JobRunDetail> = self
|
||||||
|
.job_runs
|
||||||
|
.values()
|
||||||
|
.skip(start as usize)
|
||||||
|
.take(page_size as usize)
|
||||||
|
.map(|jr| jr.to_detail())
|
||||||
|
.collect();
|
||||||
|
|
||||||
ListJobRunsResponse {
|
ListJobRunsResponse {
|
||||||
data: list_state_items(&self.job_runs, page, page_size),
|
data,
|
||||||
match_count: self.wants.len() as u64,
|
match_count: self.job_runs.len() as u64,
|
||||||
page,
|
page,
|
||||||
page_size,
|
page_size,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -242,6 +242,7 @@ enum JobRunStatusCode {
|
||||||
JobRunFailed = 2;
|
JobRunFailed = 2;
|
||||||
JobRunCanceled = 3;
|
JobRunCanceled = 3;
|
||||||
JobRunSucceeded = 4;
|
JobRunSucceeded = 4;
|
||||||
|
JobRunDepMiss = 5;
|
||||||
}
|
}
|
||||||
message JobRunDetail {
|
message JobRunDetail {
|
||||||
string id = 1;
|
string id = 1;
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
use crate::PartitionStatusCode::{PartitionFailed, PartitionLive};
|
use crate::PartitionStatusCode::{PartitionFailed, PartitionLive};
|
||||||
use crate::data_build_event::Event;
|
use crate::data_build_event::Event;
|
||||||
|
use crate::job_run_state::{JobInfo, JobRunWithState, QueuedState};
|
||||||
use crate::util::current_timestamp;
|
use crate::util::current_timestamp;
|
||||||
use crate::{
|
use crate::{
|
||||||
CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse,
|
CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse,
|
||||||
|
|
@ -83,6 +84,21 @@ impl From<JobRunBufferEventV1> for JobRunDetail {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<JobRunBufferEventV1> for JobRunWithState<QueuedState> {
|
||||||
|
fn from(event: JobRunBufferEventV1) -> Self {
|
||||||
|
JobRunWithState {
|
||||||
|
info: JobInfo {
|
||||||
|
id: event.job_run_id,
|
||||||
|
building_partitions: event.building_partitions,
|
||||||
|
servicing_wants: event.want_attributed_partitions,
|
||||||
|
},
|
||||||
|
state: QueuedState {
|
||||||
|
queued_at: current_timestamp(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn want_status_matches_any(
|
pub fn want_status_matches_any(
|
||||||
pds: &Vec<Option<PartitionDetail>>,
|
pds: &Vec<Option<PartitionDetail>>,
|
||||||
status: PartitionStatusCode,
|
status: PartitionStatusCode,
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
use crate::job_run::{JobRun, SubProcessBackend};
|
use crate::job_run::{JobRunHandle, SubProcessBackend};
|
||||||
use crate::util::DatabuildError;
|
use crate::util::DatabuildError;
|
||||||
use crate::{JobConfig, PartitionRef, WantDetail};
|
use crate::{JobConfig, PartitionRef, WantDetail};
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
|
|
@ -15,13 +15,13 @@ impl JobConfiguration {
|
||||||
pub fn spawn(
|
pub fn spawn(
|
||||||
&self,
|
&self,
|
||||||
wants: Vec<WantDetail>,
|
wants: Vec<WantDetail>,
|
||||||
) -> Result<JobRun<SubProcessBackend>, std::io::Error> {
|
) -> Result<JobRunHandle<SubProcessBackend>, std::io::Error> {
|
||||||
let wanted_refs: Vec<PartitionRef> = wants
|
let wanted_refs: Vec<PartitionRef> = wants
|
||||||
.iter()
|
.iter()
|
||||||
.flat_map(|want| want.partitions.clone())
|
.flat_map(|want| want.partitions.clone())
|
||||||
.collect();
|
.collect();
|
||||||
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
|
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
|
||||||
Ok(JobRun::spawn(self.entry_point.clone(), args))
|
Ok(JobRunHandle::spawn(self.entry_point.clone(), args))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn matches(&self, refs: &PartitionRef) -> bool {
|
pub fn matches(&self, refs: &PartitionRef) -> bool {
|
||||||
|
|
|
||||||
|
|
@ -55,34 +55,34 @@ pub enum PollResult<C, F, D> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===== TYPE-SAFE STATE MACHINE PATTERN =====
|
// ===== TYPE-SAFE STATE MACHINE PATTERN =====
|
||||||
// Uses parameterized JobRunWithState wrapped in JobRun enum for storage
|
// Uses parameterized JobRunHandleWithState wrapped in JobRun enum for storage
|
||||||
|
|
||||||
/// New JobRun with embedded state enum
|
/// JobRunHandle with embedded state enum
|
||||||
/// Type-safe job run struct, parameterized by backend and state
|
/// Type-safe job run handle struct, parameterized by backend and state
|
||||||
/// This struct can only perform operations valid for its current state type
|
/// This struct manages the actual running process/execution and can only perform operations valid for its current state type
|
||||||
pub struct JobRunWithState<B: JobRunBackend, S> {
|
pub struct JobRunHandleWithState<B: JobRunBackend, S> {
|
||||||
pub job_run_id: Uuid,
|
pub job_run_id: Uuid,
|
||||||
pub state: S,
|
pub state: S,
|
||||||
pub _backend: PhantomData<B>,
|
pub _backend: PhantomData<B>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Wrapper enum for storing job runs in a single collection
|
/// Wrapper enum for storing job run handles in a single collection
|
||||||
/// This allows us to store jobs in different states together while maintaining type safety
|
/// This allows us to store jobs in different states together while maintaining type safety
|
||||||
pub enum JobRun<B: JobRunBackend> {
|
pub enum JobRunHandle<B: JobRunBackend> {
|
||||||
NotStarted(JobRunWithState<B, B::NotStartedState>),
|
NotStarted(JobRunHandleWithState<B, B::NotStartedState>),
|
||||||
Running(JobRunWithState<B, B::RunningState>),
|
Running(JobRunHandleWithState<B, B::RunningState>),
|
||||||
Completed(JobRunWithState<B, B::CompletedState>),
|
Completed(JobRunHandleWithState<B, B::CompletedState>),
|
||||||
Failed(JobRunWithState<B, B::FailedState>),
|
Failed(JobRunHandleWithState<B, B::FailedState>),
|
||||||
Canceled(JobRunWithState<B, B::CanceledState>),
|
Canceled(JobRunHandleWithState<B, B::CanceledState>),
|
||||||
DepMiss(JobRunWithState<B, B::DepMissState>),
|
DepMiss(JobRunHandleWithState<B, B::DepMissState>),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Result of visiting a running job - returns the typed states
|
/// Result of visiting a running job - returns the typed states
|
||||||
pub enum VisitResult<B: JobRunBackend> {
|
pub enum VisitResult<B: JobRunBackend> {
|
||||||
StillRunning(JobRunWithState<B, B::RunningState>),
|
StillRunning(JobRunHandleWithState<B, B::RunningState>),
|
||||||
Completed(JobRunWithState<B, B::CompletedState>),
|
Completed(JobRunHandleWithState<B, B::CompletedState>),
|
||||||
Failed(JobRunWithState<B, B::FailedState>),
|
Failed(JobRunHandleWithState<B, B::FailedState>),
|
||||||
DepMiss(JobRunWithState<B, B::DepMissState>),
|
DepMiss(JobRunHandleWithState<B, B::DepMissState>),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum JobRunConfig {
|
pub enum JobRunConfig {
|
||||||
|
|
@ -297,11 +297,11 @@ pub struct JobRunPollResult {
|
||||||
|
|
||||||
// ===== Type-Safe State Transition Implementation =====
|
// ===== Type-Safe State Transition Implementation =====
|
||||||
|
|
||||||
// Factory and helper methods on the JobRun enum
|
// Factory and helper methods on the JobRunHandle enum
|
||||||
impl<B: JobRunBackend> JobRun<B> {
|
impl<B: JobRunBackend> JobRunHandle<B> {
|
||||||
/// Create a new job run in the NotStarted state
|
/// Create a new job run in the NotStarted state
|
||||||
pub fn spawn(entry_point: String, args: Vec<String>) -> Self {
|
pub fn spawn(entry_point: String, args: Vec<String>) -> Self {
|
||||||
JobRun::NotStarted(JobRunWithState {
|
JobRunHandle::NotStarted(JobRunHandleWithState {
|
||||||
job_run_id: Uuid::new_v4(),
|
job_run_id: Uuid::new_v4(),
|
||||||
state: B::create(entry_point, args),
|
state: B::create(entry_point, args),
|
||||||
_backend: PhantomData,
|
_backend: PhantomData,
|
||||||
|
|
@ -311,12 +311,12 @@ impl<B: JobRunBackend> JobRun<B> {
|
||||||
/// Get the job run ID regardless of state
|
/// Get the job run ID regardless of state
|
||||||
pub fn job_run_id(&self) -> &Uuid {
|
pub fn job_run_id(&self) -> &Uuid {
|
||||||
match self {
|
match self {
|
||||||
JobRun::NotStarted(j) => &j.job_run_id,
|
JobRunHandle::NotStarted(j) => &j.job_run_id,
|
||||||
JobRun::Running(j) => &j.job_run_id,
|
JobRunHandle::Running(j) => &j.job_run_id,
|
||||||
JobRun::Completed(j) => &j.job_run_id,
|
JobRunHandle::Completed(j) => &j.job_run_id,
|
||||||
JobRun::Failed(j) => &j.job_run_id,
|
JobRunHandle::Failed(j) => &j.job_run_id,
|
||||||
JobRun::Canceled(j) => &j.job_run_id,
|
JobRunHandle::Canceled(j) => &j.job_run_id,
|
||||||
JobRun::DepMiss(j) => &j.job_run_id,
|
JobRunHandle::DepMiss(j) => &j.job_run_id,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -324,20 +324,23 @@ impl<B: JobRunBackend> JobRun<B> {
|
||||||
pub fn is_terminal(&self) -> bool {
|
pub fn is_terminal(&self) -> bool {
|
||||||
matches!(
|
matches!(
|
||||||
self,
|
self,
|
||||||
JobRun::Completed(_) | JobRun::Failed(_) | JobRun::Canceled(_) | JobRun::DepMiss(_)
|
JobRunHandle::Completed(_)
|
||||||
|
| JobRunHandle::Failed(_)
|
||||||
|
| JobRunHandle::Canceled(_)
|
||||||
|
| JobRunHandle::DepMiss(_)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Type-safe transition: NotStarted -> Running
|
// Type-safe transition: NotStarted -> Running
|
||||||
// This method can ONLY be called on NotStarted jobs - compile error otherwise!
|
// This method can ONLY be called on NotStarted jobs - compile error otherwise!
|
||||||
impl<B: JobRunBackend> JobRunWithState<B, B::NotStartedState> {
|
impl<B: JobRunBackend> JobRunHandleWithState<B, B::NotStartedState> {
|
||||||
pub fn run(
|
pub fn run(
|
||||||
self,
|
self,
|
||||||
env: Option<HashMap<String, String>>,
|
env: Option<HashMap<String, String>>,
|
||||||
) -> Result<JobRunWithState<B, B::RunningState>, DatabuildError> {
|
) -> Result<JobRunHandleWithState<B, B::RunningState>, DatabuildError> {
|
||||||
let running = B::start(self.state, env)?;
|
let running = B::start(self.state, env)?;
|
||||||
Ok(JobRunWithState {
|
Ok(JobRunHandleWithState {
|
||||||
job_run_id: self.job_run_id,
|
job_run_id: self.job_run_id,
|
||||||
state: running,
|
state: running,
|
||||||
_backend: PhantomData,
|
_backend: PhantomData,
|
||||||
|
|
@ -347,21 +350,21 @@ impl<B: JobRunBackend> JobRunWithState<B, B::NotStartedState> {
|
||||||
|
|
||||||
// Type-safe transition: Running -> (Running | Completed | Failed | DepMiss)
|
// Type-safe transition: Running -> (Running | Completed | Failed | DepMiss)
|
||||||
// This method can ONLY be called on Running jobs - compile error otherwise!
|
// This method can ONLY be called on Running jobs - compile error otherwise!
|
||||||
impl<B: JobRunBackend> JobRunWithState<B, B::RunningState> {
|
impl<B: JobRunBackend> JobRunHandleWithState<B, B::RunningState> {
|
||||||
pub fn visit(mut self) -> Result<VisitResult<B>, DatabuildError> {
|
pub fn visit(mut self) -> Result<VisitResult<B>, DatabuildError> {
|
||||||
match B::poll(&mut self.state)? {
|
match B::poll(&mut self.state)? {
|
||||||
PollResult::StillRunning => Ok(VisitResult::StillRunning(self)),
|
PollResult::StillRunning => Ok(VisitResult::StillRunning(self)),
|
||||||
PollResult::Completed(completed) => Ok(VisitResult::Completed(JobRunWithState {
|
PollResult::Completed(completed) => Ok(VisitResult::Completed(JobRunHandleWithState {
|
||||||
job_run_id: self.job_run_id,
|
job_run_id: self.job_run_id,
|
||||||
state: completed,
|
state: completed,
|
||||||
_backend: PhantomData,
|
_backend: PhantomData,
|
||||||
})),
|
})),
|
||||||
PollResult::Failed(failed) => Ok(VisitResult::Failed(JobRunWithState {
|
PollResult::Failed(failed) => Ok(VisitResult::Failed(JobRunHandleWithState {
|
||||||
job_run_id: self.job_run_id,
|
job_run_id: self.job_run_id,
|
||||||
state: failed,
|
state: failed,
|
||||||
_backend: PhantomData,
|
_backend: PhantomData,
|
||||||
})),
|
})),
|
||||||
PollResult::DepMiss(dep_miss) => Ok(VisitResult::DepMiss(JobRunWithState {
|
PollResult::DepMiss(dep_miss) => Ok(VisitResult::DepMiss(JobRunHandleWithState {
|
||||||
job_run_id: self.job_run_id,
|
job_run_id: self.job_run_id,
|
||||||
state: dep_miss,
|
state: dep_miss,
|
||||||
_backend: PhantomData,
|
_backend: PhantomData,
|
||||||
|
|
@ -372,9 +375,9 @@ impl<B: JobRunBackend> JobRunWithState<B, B::RunningState> {
|
||||||
pub fn cancel(
|
pub fn cancel(
|
||||||
self,
|
self,
|
||||||
source: EventSource,
|
source: EventSource,
|
||||||
) -> Result<JobRunWithState<B, B::CanceledState>, DatabuildError> {
|
) -> Result<JobRunHandleWithState<B, B::CanceledState>, DatabuildError> {
|
||||||
let canceled = B::cancel_job(self.state, source)?;
|
let canceled = B::cancel_job(self.state, source)?;
|
||||||
Ok(JobRunWithState {
|
Ok(JobRunHandleWithState {
|
||||||
job_run_id: self.job_run_id,
|
job_run_id: self.job_run_id,
|
||||||
state: canceled,
|
state: canceled,
|
||||||
_backend: PhantomData,
|
_backend: PhantomData,
|
||||||
|
|
@ -417,7 +420,7 @@ impl ToEvent for SubProcessDepMiss {
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::data_build_event::Event;
|
use crate::data_build_event::Event;
|
||||||
use crate::data_deps::DATABUILD_MISSING_DEPS_JSON;
|
use crate::data_deps::DATABUILD_MISSING_DEPS_JSON;
|
||||||
use crate::job_run::{JobRun, JobRunBackend, SubProcessBackend, VisitResult};
|
use crate::job_run::{JobRunBackend, JobRunHandle, SubProcessBackend, VisitResult};
|
||||||
use crate::mock_job_run::MockJobRun;
|
use crate::mock_job_run::MockJobRun;
|
||||||
use crate::{JobRunMissingDeps, MissingDeps};
|
use crate::{JobRunMissingDeps, MissingDeps};
|
||||||
|
|
||||||
|
|
@ -425,11 +428,11 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_job_run_success_returns_job_run_success_event() {
|
fn test_job_run_success_returns_job_run_success_event() {
|
||||||
// Spawn a job run that will succeed (exit code 0)
|
// Spawn a job run that will succeed (exit code 0)
|
||||||
let job_run = JobRun::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
|
let job_run = JobRunHandle::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
|
||||||
|
|
||||||
// Start the job - this consumes the NotStarted and returns Running
|
// Start the job - this consumes the NotStarted and returns Running
|
||||||
let running_job = match job_run {
|
let running_job = match job_run {
|
||||||
JobRun::NotStarted(not_started) => not_started.run(None).unwrap(),
|
JobRunHandle::NotStarted(not_started) => not_started.run(None).unwrap(),
|
||||||
_ => panic!("Expected NotStarted job"),
|
_ => panic!("Expected NotStarted job"),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -463,12 +466,12 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_job_run_failure_returns_job_run_failure_event() {
|
fn test_job_run_failure_returns_job_run_failure_event() {
|
||||||
// Spawn a job run
|
// Spawn a job run
|
||||||
let job_run = JobRun::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
|
let job_run = JobRunHandle::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
|
||||||
|
|
||||||
// Start the job with an exit code that indicates failure (non-zero)
|
// Start the job with an exit code that indicates failure (non-zero)
|
||||||
let env = MockJobRun::new().exit_code(1).to_env();
|
let env = MockJobRun::new().exit_code(1).to_env();
|
||||||
let running_job = match job_run {
|
let running_job = match job_run {
|
||||||
JobRun::NotStarted(not_started) => not_started.run(Some(env)).unwrap(),
|
JobRunHandle::NotStarted(not_started) => not_started.run(Some(env)).unwrap(),
|
||||||
_ => panic!("Expected NotStarted job"),
|
_ => panic!("Expected NotStarted job"),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -511,7 +514,7 @@ mod tests {
|
||||||
let temp_file = format!("/tmp/databuild_test_cancel_{}", Uuid::new_v4());
|
let temp_file = format!("/tmp/databuild_test_cancel_{}", Uuid::new_v4());
|
||||||
|
|
||||||
// Spawn a job run that will sleep for 1 second and write a file
|
// Spawn a job run that will sleep for 1 second and write a file
|
||||||
let job_run = JobRun::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
|
let job_run = JobRunHandle::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
|
||||||
|
|
||||||
let env = MockJobRun::new()
|
let env = MockJobRun::new()
|
||||||
.sleep_ms(1000)
|
.sleep_ms(1000)
|
||||||
|
|
@ -519,7 +522,7 @@ mod tests {
|
||||||
.exit_code(0)
|
.exit_code(0)
|
||||||
.to_env();
|
.to_env();
|
||||||
let running_job = match job_run {
|
let running_job = match job_run {
|
||||||
JobRun::NotStarted(not_started) => not_started.run(Some(env)).unwrap(),
|
JobRunHandle::NotStarted(not_started) => not_started.run(Some(env)).unwrap(),
|
||||||
_ => panic!("Expected NotStarted job"),
|
_ => panic!("Expected NotStarted job"),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -558,7 +561,7 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_job_run_fail_on_missing_deps_should_emit_missing_deps_event() {
|
fn test_job_run_fail_on_missing_deps_should_emit_missing_deps_event() {
|
||||||
// Spawn a job run that will sleep for 1 second and write a file
|
// Spawn a job run that will sleep for 1 second and write a file
|
||||||
let job_run = JobRun::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
|
let job_run = JobRunHandle::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
|
||||||
|
|
||||||
let expected_dep_miss = JobRunMissingDeps {
|
let expected_dep_miss = JobRunMissingDeps {
|
||||||
version: "1".into(),
|
version: "1".into(),
|
||||||
|
|
@ -575,7 +578,7 @@ mod tests {
|
||||||
.exit_code(1)
|
.exit_code(1)
|
||||||
.to_env();
|
.to_env();
|
||||||
let running_job = match job_run {
|
let running_job = match job_run {
|
||||||
JobRun::NotStarted(not_started) => not_started.run(Some(env)).unwrap(),
|
JobRunHandle::NotStarted(not_started) => not_started.run(Some(env)).unwrap(),
|
||||||
_ => panic!("Expected NotStarted job"),
|
_ => panic!("Expected NotStarted job"),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
342
databuild/job_run_state.rs
Normal file
342
databuild/job_run_state.rs
Normal file
|
|
@ -0,0 +1,342 @@
|
||||||
|
use crate::partition_state::{BuildingPartitionRef, FailedPartitionRef, LivePartitionRef};
|
||||||
|
use crate::util::current_timestamp;
|
||||||
|
use crate::{
|
||||||
|
EventSource, JobRunDetail, JobRunStatusCode, MissingDeps, PartitionRef, ReadDeps,
|
||||||
|
WantAttributedPartitions,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// State: Job has been queued but not yet started
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct QueuedState {
|
||||||
|
pub queued_at: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// State: Job is currently running
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct RunningState {
|
||||||
|
pub started_at: u64,
|
||||||
|
pub last_heartbeat_at: u64, // NOT optional, defaults to started_at
|
||||||
|
}
|
||||||
|
|
||||||
|
/// State: Job completed successfully
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct SucceededState {
|
||||||
|
pub completed_at: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// State: Job failed during execution
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct FailedState {
|
||||||
|
pub failed_at: u64,
|
||||||
|
pub failure_reason: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// State: Job detected missing dependencies
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct DepMissState {
|
||||||
|
pub detected_at: u64,
|
||||||
|
pub missing_deps: Vec<MissingDeps>,
|
||||||
|
pub read_deps: Vec<ReadDeps>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// State: Job was explicitly canceled
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct CanceledState {
|
||||||
|
pub canceled_at: u64,
|
||||||
|
pub source: Option<EventSource>,
|
||||||
|
pub comment: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Shared information across all job run states
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct JobInfo {
|
||||||
|
pub id: String,
|
||||||
|
pub building_partitions: Vec<PartitionRef>,
|
||||||
|
pub servicing_wants: Vec<WantAttributedPartitions>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generic job run struct parameterized by state
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct JobRunWithState<S> {
|
||||||
|
pub info: JobInfo,
|
||||||
|
pub state: S,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wrapper enum for storing job runs in collections
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum JobRun {
|
||||||
|
Queued(JobRunWithState<QueuedState>),
|
||||||
|
Running(JobRunWithState<RunningState>),
|
||||||
|
Succeeded(JobRunWithState<SucceededState>),
|
||||||
|
Failed(JobRunWithState<FailedState>),
|
||||||
|
DepMiss(JobRunWithState<DepMissState>),
|
||||||
|
Canceled(JobRunWithState<CanceledState>),
|
||||||
|
}
|
||||||
|
|
||||||
|
// ==================== State Transitions ====================
|
||||||
|
|
||||||
|
impl JobRunWithState<QueuedState> {
|
||||||
|
/// Transition from Queued to Running
|
||||||
|
pub fn start_running(self, timestamp: u64) -> JobRunWithState<RunningState> {
|
||||||
|
JobRunWithState {
|
||||||
|
info: self.info,
|
||||||
|
state: RunningState {
|
||||||
|
started_at: timestamp,
|
||||||
|
last_heartbeat_at: timestamp, // Initialize to start time
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Transition from Queued to Canceled (canceled before starting)
|
||||||
|
pub fn cancel(
|
||||||
|
self,
|
||||||
|
timestamp: u64,
|
||||||
|
source: Option<EventSource>,
|
||||||
|
comment: String,
|
||||||
|
) -> JobRunWithState<CanceledState> {
|
||||||
|
JobRunWithState {
|
||||||
|
info: self.info,
|
||||||
|
state: CanceledState {
|
||||||
|
canceled_at: timestamp,
|
||||||
|
source,
|
||||||
|
comment,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JobRunWithState<RunningState> {
|
||||||
|
/// Update heartbeat timestamp (non-consuming)
|
||||||
|
pub fn heartbeat(mut self, timestamp: u64) -> Self {
|
||||||
|
self.state.last_heartbeat_at = timestamp;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Transition from Running to Succeeded
|
||||||
|
pub fn succeed(self, timestamp: u64) -> JobRunWithState<SucceededState> {
|
||||||
|
JobRunWithState {
|
||||||
|
info: self.info,
|
||||||
|
state: SucceededState {
|
||||||
|
completed_at: timestamp,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Transition from Running to Failed
|
||||||
|
pub fn fail(self, timestamp: u64, reason: String) -> JobRunWithState<FailedState> {
|
||||||
|
JobRunWithState {
|
||||||
|
info: self.info,
|
||||||
|
state: FailedState {
|
||||||
|
failed_at: timestamp,
|
||||||
|
failure_reason: reason,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Transition from Running to DepMiss
|
||||||
|
pub fn dep_miss(
|
||||||
|
self,
|
||||||
|
timestamp: u64,
|
||||||
|
missing_deps: Vec<MissingDeps>,
|
||||||
|
read_deps: Vec<ReadDeps>,
|
||||||
|
) -> JobRunWithState<DepMissState> {
|
||||||
|
JobRunWithState {
|
||||||
|
info: self.info,
|
||||||
|
state: DepMissState {
|
||||||
|
detected_at: timestamp,
|
||||||
|
missing_deps,
|
||||||
|
read_deps,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Transition from Running to Canceled
|
||||||
|
pub fn cancel(
|
||||||
|
self,
|
||||||
|
timestamp: u64,
|
||||||
|
source: Option<EventSource>,
|
||||||
|
comment: String,
|
||||||
|
) -> JobRunWithState<CanceledState> {
|
||||||
|
JobRunWithState {
|
||||||
|
info: self.info,
|
||||||
|
state: CanceledState {
|
||||||
|
canceled_at: timestamp,
|
||||||
|
source,
|
||||||
|
comment,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ==================== Type-Safe Job Run IDs ====================
|
||||||
|
|
||||||
|
/// Type-safe job run ID wrappers that encode state expectations in function signatures.
|
||||||
|
/// These should be created ephemerally from typestate objects via .get_id() and used immediately—never stored long-term.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub struct QueuedJobRunId(pub String);
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub struct RunningJobRunId(pub String);
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub struct SucceededJobRunId(pub String);
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub struct FailedJobRunId(pub String);
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub struct DepMissJobRunId(pub String);
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub struct CanceledJobRunId(pub String);
|
||||||
|
|
||||||
|
// ==================== State-Specific Methods ====================
|
||||||
|
|
||||||
|
impl JobRunWithState<QueuedState> {
|
||||||
|
pub fn get_id(&self) -> QueuedJobRunId {
|
||||||
|
QueuedJobRunId(self.info.id.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JobRunWithState<RunningState> {
|
||||||
|
pub fn get_id(&self) -> RunningJobRunId {
|
||||||
|
RunningJobRunId(self.info.id.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Currently building these partitions
|
||||||
|
/// Job run running state is the SOURCE of truth that these partitions are building
|
||||||
|
pub fn get_building_partitions(&self) -> Vec<BuildingPartitionRef> {
|
||||||
|
self.info
|
||||||
|
.building_partitions
|
||||||
|
.iter()
|
||||||
|
.map(|p| BuildingPartitionRef(p.clone()))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_last_heartbeat(&self) -> u64 {
|
||||||
|
self.state.last_heartbeat_at
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JobRunWithState<SucceededState> {
|
||||||
|
pub fn get_id(&self) -> SucceededJobRunId {
|
||||||
|
SucceededJobRunId(self.info.id.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Job run success is the SOURCE of truth that these partitions are live
|
||||||
|
pub fn get_completed_partitions(&self) -> Vec<LivePartitionRef> {
|
||||||
|
self.info
|
||||||
|
.building_partitions
|
||||||
|
.iter()
|
||||||
|
.map(|p| LivePartitionRef(p.clone()))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JobRunWithState<FailedState> {
|
||||||
|
pub fn get_id(&self) -> FailedJobRunId {
|
||||||
|
FailedJobRunId(self.info.id.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Job run failure is the SOURCE of truth that these partitions failed
|
||||||
|
pub fn get_failed_partitions(&self) -> Vec<FailedPartitionRef> {
|
||||||
|
self.info
|
||||||
|
.building_partitions
|
||||||
|
.iter()
|
||||||
|
.map(|p| FailedPartitionRef(p.clone()))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_failure_reason(&self) -> &str {
|
||||||
|
&self.state.failure_reason
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JobRunWithState<DepMissState> {
|
||||||
|
pub fn get_id(&self) -> DepMissJobRunId {
|
||||||
|
DepMissJobRunId(self.info.id.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Job run dep miss means building partitions should reset to Missing
|
||||||
|
pub fn get_building_partitions_to_reset(&self) -> Vec<BuildingPartitionRef> {
|
||||||
|
self.info
|
||||||
|
.building_partitions
|
||||||
|
.iter()
|
||||||
|
.map(|p| BuildingPartitionRef(p.clone()))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_missing_deps(&self) -> &[MissingDeps] {
|
||||||
|
&self.state.missing_deps
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_read_deps(&self) -> &[ReadDeps] {
|
||||||
|
&self.state.read_deps
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JobRunWithState<CanceledState> {
|
||||||
|
pub fn get_id(&self) -> CanceledJobRunId {
|
||||||
|
CanceledJobRunId(self.info.id.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Canceled job means building partitions should reset to Missing
|
||||||
|
pub fn get_building_partitions_to_reset(&self) -> Vec<BuildingPartitionRef> {
|
||||||
|
self.info
|
||||||
|
.building_partitions
|
||||||
|
.iter()
|
||||||
|
.map(|p| BuildingPartitionRef(p.clone()))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ==================== Conversion to JobRunDetail for API ====================
|
||||||
|
|
||||||
|
impl JobRun {
|
||||||
|
pub fn to_detail(&self) -> JobRunDetail {
|
||||||
|
match self {
|
||||||
|
JobRun::Queued(queued) => JobRunDetail {
|
||||||
|
id: queued.info.id.clone(),
|
||||||
|
status: Some(JobRunStatusCode::JobRunQueued.into()),
|
||||||
|
last_heartbeat_at: None,
|
||||||
|
building_partitions: queued.info.building_partitions.clone(),
|
||||||
|
servicing_wants: queued.info.servicing_wants.clone(),
|
||||||
|
},
|
||||||
|
JobRun::Running(running) => JobRunDetail {
|
||||||
|
id: running.info.id.clone(),
|
||||||
|
status: Some(JobRunStatusCode::JobRunRunning.into()),
|
||||||
|
last_heartbeat_at: Some(running.state.last_heartbeat_at),
|
||||||
|
building_partitions: running.info.building_partitions.clone(),
|
||||||
|
servicing_wants: running.info.servicing_wants.clone(),
|
||||||
|
},
|
||||||
|
JobRun::Succeeded(succeeded) => JobRunDetail {
|
||||||
|
id: succeeded.info.id.clone(),
|
||||||
|
status: Some(JobRunStatusCode::JobRunSucceeded.into()),
|
||||||
|
last_heartbeat_at: None,
|
||||||
|
building_partitions: succeeded.info.building_partitions.clone(),
|
||||||
|
servicing_wants: succeeded.info.servicing_wants.clone(),
|
||||||
|
},
|
||||||
|
JobRun::Failed(failed) => JobRunDetail {
|
||||||
|
id: failed.info.id.clone(),
|
||||||
|
status: Some(JobRunStatusCode::JobRunFailed.into()),
|
||||||
|
last_heartbeat_at: None,
|
||||||
|
building_partitions: failed.info.building_partitions.clone(),
|
||||||
|
servicing_wants: failed.info.servicing_wants.clone(),
|
||||||
|
},
|
||||||
|
JobRun::DepMiss(dep_miss) => JobRunDetail {
|
||||||
|
id: dep_miss.info.id.clone(),
|
||||||
|
status: Some(JobRunStatusCode::JobRunDepMiss.into()),
|
||||||
|
last_heartbeat_at: None,
|
||||||
|
building_partitions: dep_miss.info.building_partitions.clone(),
|
||||||
|
servicing_wants: dep_miss.info.servicing_wants.clone(),
|
||||||
|
},
|
||||||
|
JobRun::Canceled(canceled) => JobRunDetail {
|
||||||
|
id: canceled.info.id.clone(),
|
||||||
|
status: Some(JobRunStatusCode::JobRunCanceled.into()),
|
||||||
|
last_heartbeat_at: None,
|
||||||
|
building_partitions: canceled.info.building_partitions.clone(),
|
||||||
|
servicing_wants: canceled.info.servicing_wants.clone(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -4,6 +4,7 @@ mod data_deps;
|
||||||
mod event_transforms;
|
mod event_transforms;
|
||||||
mod job;
|
mod job;
|
||||||
mod job_run;
|
mod job_run;
|
||||||
|
mod job_run_state;
|
||||||
mod mock_job_run;
|
mod mock_job_run;
|
||||||
mod orchestrator;
|
mod orchestrator;
|
||||||
mod partition_state;
|
mod partition_state;
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ JTBDs:
|
||||||
struct Orchestrator<S: BELStorage + Debug> {
|
struct Orchestrator<S: BELStorage + Debug> {
|
||||||
pub bel: BuildEventLog<S>,
|
pub bel: BuildEventLog<S>,
|
||||||
pub config: OrchestratorConfig,
|
pub config: OrchestratorConfig,
|
||||||
pub job_runs: Vec<crate::job_run::JobRun<SubProcessBackend>>,
|
pub job_runs: Vec<crate::job_run::JobRunHandle<SubProcessBackend>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Orchestrator<MemoryBELStorage> {
|
impl Default for Orchestrator<MemoryBELStorage> {
|
||||||
|
|
@ -125,12 +125,25 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn schedule_queued_jobs(&mut self) -> Result<(), DatabuildError> {
|
fn schedule_queued_jobs(&mut self) -> Result<(), DatabuildError> {
|
||||||
use crate::job_run::JobRun;
|
use crate::JobRunHeartbeatEventV1;
|
||||||
|
use crate::data_build_event::Event;
|
||||||
|
use crate::job_run::JobRunHandle;
|
||||||
|
|
||||||
let mut new_jobs = Vec::new();
|
let mut new_jobs = Vec::new();
|
||||||
for job in self.job_runs.drain(..) {
|
for job in self.job_runs.drain(..) {
|
||||||
let transitioned = match job {
|
let transitioned = match job {
|
||||||
JobRun::NotStarted(not_started) => JobRun::Running(not_started.run(None)?),
|
JobRunHandle::NotStarted(not_started) => {
|
||||||
|
let job_run_id = not_started.job_run_id.clone();
|
||||||
|
let running = not_started.run(None)?;
|
||||||
|
|
||||||
|
// Emit heartbeat event to notify BuildState that job is now running
|
||||||
|
let heartbeat_event = Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 {
|
||||||
|
job_run_id: job_run_id.to_string(),
|
||||||
|
});
|
||||||
|
self.bel.append_event(&heartbeat_event)?;
|
||||||
|
|
||||||
|
JobRunHandle::Running(running)
|
||||||
|
}
|
||||||
other => other, // Pass through all other states unchanged
|
other => other, // Pass through all other states unchanged
|
||||||
};
|
};
|
||||||
new_jobs.push(transitioned);
|
new_jobs.push(transitioned);
|
||||||
|
|
@ -142,36 +155,41 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
||||||
/// Visits individual job runs, appending resulting events, and moving runs between run status
|
/// Visits individual job runs, appending resulting events, and moving runs between run status
|
||||||
/// containers. Either jobs are still running, or they are moved to terminal states.
|
/// containers. Either jobs are still running, or they are moved to terminal states.
|
||||||
fn poll_job_runs(&mut self) -> Result<(), DatabuildError> {
|
fn poll_job_runs(&mut self) -> Result<(), DatabuildError> {
|
||||||
use crate::job_run::{JobRun, VisitResult};
|
use crate::job_run::{JobRunHandle, VisitResult};
|
||||||
|
|
||||||
self.schedule_queued_jobs()?;
|
self.schedule_queued_jobs()?;
|
||||||
|
|
||||||
|
// TODO: Emit periodic JobRunHeartbeatEventV1 events for long-running jobs
|
||||||
|
// Currently we emit one heartbeat when starting (in schedule_queued_jobs), but we should
|
||||||
|
// also emit periodic heartbeats for long-running jobs to detect stalls
|
||||||
|
// Need to decide on heartbeat frequency (e.g., every N polls or based on time elapsed)
|
||||||
|
|
||||||
// Visit all running jobs using type-safe transitions
|
// Visit all running jobs using type-safe transitions
|
||||||
let mut new_jobs = Vec::new();
|
let mut new_jobs = Vec::new();
|
||||||
for job in self.job_runs.drain(..) {
|
for job in self.job_runs.drain(..) {
|
||||||
let transitioned = match job {
|
let transitioned = match job {
|
||||||
JobRun::Running(running) => match running.visit()? {
|
JobRunHandle::Running(running) => match running.visit()? {
|
||||||
VisitResult::StillRunning(still_running) => {
|
VisitResult::StillRunning(still_running) => {
|
||||||
println!("Still running job: {:?}", still_running.job_run_id);
|
println!("Still running job: {:?}", still_running.job_run_id);
|
||||||
JobRun::Running(still_running)
|
JobRunHandle::Running(still_running)
|
||||||
}
|
}
|
||||||
VisitResult::Completed(completed) => {
|
VisitResult::Completed(completed) => {
|
||||||
println!("Completed job: {:?}", completed.job_run_id);
|
println!("Completed job: {:?}", completed.job_run_id);
|
||||||
let event = completed.state.to_event(&completed.job_run_id);
|
let event = completed.state.to_event(&completed.job_run_id);
|
||||||
self.bel.append_event(&event)?;
|
self.bel.append_event(&event)?;
|
||||||
JobRun::Completed(completed)
|
JobRunHandle::Completed(completed)
|
||||||
}
|
}
|
||||||
VisitResult::Failed(failed) => {
|
VisitResult::Failed(failed) => {
|
||||||
println!("Failed job: {:?}", failed.job_run_id);
|
println!("Failed job: {:?}", failed.job_run_id);
|
||||||
let event = failed.state.to_event(&failed.job_run_id);
|
let event = failed.state.to_event(&failed.job_run_id);
|
||||||
self.bel.append_event(&event)?;
|
self.bel.append_event(&event)?;
|
||||||
JobRun::Failed(failed)
|
JobRunHandle::Failed(failed)
|
||||||
}
|
}
|
||||||
VisitResult::DepMiss(dep_miss) => {
|
VisitResult::DepMiss(dep_miss) => {
|
||||||
println!("Dep miss job: {:?}", dep_miss.job_run_id);
|
println!("Dep miss job: {:?}", dep_miss.job_run_id);
|
||||||
let event = dep_miss.state.to_event(&dep_miss.job_run_id);
|
let event = dep_miss.state.to_event(&dep_miss.job_run_id);
|
||||||
self.bel.append_event(&event)?;
|
self.bel.append_event(&event)?;
|
||||||
JobRun::DepMiss(dep_miss)
|
JobRunHandle::DepMiss(dep_miss)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
other => other, // Pass through all non-running states unchanged
|
other => other, // Pass through all non-running states unchanged
|
||||||
|
|
@ -225,7 +243,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn queue_job(&mut self, wg: WantGroup) -> Result<(), DatabuildError> {
|
fn queue_job(&mut self, wg: WantGroup) -> Result<(), DatabuildError> {
|
||||||
use crate::job_run::JobRun;
|
use crate::job_run::JobRunHandle;
|
||||||
|
|
||||||
// Compute args from wants the same way JobConfiguration::spawn() does
|
// Compute args from wants the same way JobConfiguration::spawn() does
|
||||||
let wanted_refs: Vec<crate::PartitionRef> = wg
|
let wanted_refs: Vec<crate::PartitionRef> = wg
|
||||||
|
|
@ -234,7 +252,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
||||||
.flat_map(|want| want.partitions.clone())
|
.flat_map(|want| want.partitions.clone())
|
||||||
.collect();
|
.collect();
|
||||||
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
|
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
|
||||||
let job_run = JobRun::spawn(wg.job.entry_point.clone(), args);
|
let job_run = JobRunHandle::spawn(wg.job.entry_point.clone(), args);
|
||||||
|
|
||||||
// Create job run buffer event
|
// Create job run buffer event
|
||||||
let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 {
|
let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 {
|
||||||
|
|
@ -263,10 +281,10 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
||||||
// Helper methods for tests to count jobs by state
|
// Helper methods for tests to count jobs by state
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
fn count_running_jobs(&self) -> usize {
|
fn count_running_jobs(&self) -> usize {
|
||||||
use crate::job_run::JobRun;
|
use crate::job_run::JobRunHandle;
|
||||||
self.job_runs
|
self.job_runs
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|j| matches!(j, JobRun::Running(_)))
|
.filter(|j| matches!(j, JobRunHandle::Running(_)))
|
||||||
.count()
|
.count()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -277,28 +295,28 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
fn count_not_started_jobs(&self) -> usize {
|
fn count_not_started_jobs(&self) -> usize {
|
||||||
use crate::job_run::JobRun;
|
use crate::job_run::JobRunHandle;
|
||||||
self.job_runs
|
self.job_runs
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|j| matches!(j, JobRun::NotStarted(_)))
|
.filter(|j| matches!(j, JobRunHandle::NotStarted(_)))
|
||||||
.count()
|
.count()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
fn count_dep_miss_jobs(&self) -> usize {
|
fn count_dep_miss_jobs(&self) -> usize {
|
||||||
use crate::job_run::JobRun;
|
use crate::job_run::JobRunHandle;
|
||||||
self.job_runs
|
self.job_runs
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|j| matches!(j, JobRun::DepMiss(_)))
|
.filter(|j| matches!(j, JobRunHandle::DepMiss(_)))
|
||||||
.count()
|
.count()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
fn count_completed_jobs(&self) -> usize {
|
fn count_completed_jobs(&self) -> usize {
|
||||||
use crate::job_run::JobRun;
|
use crate::job_run::JobRunHandle;
|
||||||
self.job_runs
|
self.job_runs
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|j| matches!(j, JobRun::Completed(_)))
|
.filter(|j| matches!(j, JobRunHandle::Completed(_)))
|
||||||
.count()
|
.count()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -444,13 +462,13 @@ mod tests {
|
||||||
// Should schedule alpha job
|
// Should schedule alpha job
|
||||||
assert_eq!(orchestrator.count_not_started_jobs(), 1);
|
assert_eq!(orchestrator.count_not_started_jobs(), 1);
|
||||||
// Verify the job has the right args by checking the first NotStarted job
|
// Verify the job has the right args by checking the first NotStarted job
|
||||||
use crate::job_run::JobRun;
|
use crate::job_run::JobRunHandle;
|
||||||
let not_started_job = orchestrator
|
let not_started_job = orchestrator
|
||||||
.job_runs
|
.job_runs
|
||||||
.iter()
|
.iter()
|
||||||
.find(|j| matches!(j, JobRun::NotStarted(_)))
|
.find(|j| matches!(j, JobRunHandle::NotStarted(_)))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
if let JobRun::NotStarted(job) = not_started_job {
|
if let JobRunHandle::NotStarted(job) = not_started_job {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
job.state.args,
|
job.state.args,
|
||||||
vec!["data/alpha"],
|
vec!["data/alpha"],
|
||||||
|
|
|
||||||
231
docs/ideas/logging.md
Normal file
231
docs/ideas/logging.md
Normal file
|
|
@ -0,0 +1,231 @@
|
||||||
|
# Job Run Logging Strategy
|
||||||
|
Claude-generated plan for logging.
|
||||||
|
|
||||||
|
## Philosophy
|
||||||
|
|
||||||
|
Job run logs are critical for debugging build failures and understanding system behavior. The logging system must:
|
||||||
|
|
||||||
|
1. **Be resource efficient** - Not consume unbounded memory in the service process
|
||||||
|
2. **Persist across restarts** - Logs survive service restarts/crashes
|
||||||
|
3. **Stream in real-time** - Enable live tailing for running jobs
|
||||||
|
4. **Support future log shipping** - Abstract design allows later integration with log aggregation systems
|
||||||
|
5. **Maintain data locality** - Keep logs on build machines where jobs execute
|
||||||
|
|
||||||
|
## File-Based Approach
|
||||||
|
|
||||||
|
### Directory Structure
|
||||||
|
|
||||||
|
```
|
||||||
|
/var/log/databuild/
|
||||||
|
job_runs/
|
||||||
|
{job_run_id}/
|
||||||
|
stdout.log # Job standard output
|
||||||
|
stderr.log # Job standard error
|
||||||
|
metadata.json # Job metadata (timestamps, exit code, building_partitions, etc.)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Write Strategy
|
||||||
|
|
||||||
|
- **Streaming writes**: Job output written to disk as it's produced (not buffered in memory)
|
||||||
|
- **Append-only**: Log files are append-only for simplicity and crash safety
|
||||||
|
- **Metadata on completion**: Write metadata.json when job reaches terminal state
|
||||||
|
|
||||||
|
### Rotation & Cleanup Policy
|
||||||
|
|
||||||
|
Two-pronged approach to prevent unbounded disk usage:
|
||||||
|
|
||||||
|
1. **Time-based TTL**: Delete logs older than N days (default: 7 days)
|
||||||
|
2. **Size-based cap**: If total log directory exceeds M GB (default: 10 GB), delete oldest logs first
|
||||||
|
|
||||||
|
Configuration via environment variables:
|
||||||
|
- `DATABUILD_LOG_TTL_DAYS` (default: 7)
|
||||||
|
- `DATABUILD_LOG_MAX_SIZE_GB` (default: 10)
|
||||||
|
|
||||||
|
## API Streaming
|
||||||
|
|
||||||
|
### HTTP Endpoints
|
||||||
|
|
||||||
|
```
|
||||||
|
GET /api/job_runs/{job_run_id}/logs/stdout
|
||||||
|
GET /api/job_runs/{job_run_id}/logs/stderr
|
||||||
|
```
|
||||||
|
|
||||||
|
### Streaming Protocol
|
||||||
|
|
||||||
|
Use **Server-Sent Events (SSE)** for real-time log streaming:
|
||||||
|
|
||||||
|
- Efficient for text streams (line-oriented)
|
||||||
|
- Native browser support (no WebSocket complexity)
|
||||||
|
- Automatic reconnection
|
||||||
|
- Works through HTTP/1.1 (no HTTP/2 requirement)
|
||||||
|
|
||||||
|
### Example Response
|
||||||
|
|
||||||
|
```
|
||||||
|
event: log
|
||||||
|
data: Building partition data/alpha...
|
||||||
|
|
||||||
|
event: log
|
||||||
|
data: [INFO] Reading dependencies from upstream
|
||||||
|
|
||||||
|
event: complete
|
||||||
|
data: {"exit_code": 0, "duration_ms": 1234}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Query Parameters
|
||||||
|
|
||||||
|
- `?follow=true` - Keep connection open, stream new lines as they're written (like `tail -f`)
|
||||||
|
- `?since=<line_number>` - Start from specific line (for reconnection)
|
||||||
|
- `?lines=<N>` - Return last N lines and close (for quick inspection)
|
||||||
|
|
||||||
|
## Abstraction Layer
|
||||||
|
|
||||||
|
Define `LogStore` trait to enable future log shipping without changing core logic:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
/// Abstraction for job run log storage and retrieval
|
||||||
|
pub trait LogStore: Send + Sync {
|
||||||
|
/// Append a line to stdout for the given job run
|
||||||
|
fn append_stdout(&mut self, job_run_id: &str, line: &str) -> Result<(), LogError>;
|
||||||
|
|
||||||
|
/// Append a line to stderr for the given job run
|
||||||
|
fn append_stderr(&mut self, job_run_id: &str, line: &str) -> Result<(), LogError>;
|
||||||
|
|
||||||
|
/// Stream stdout lines from a job run
|
||||||
|
fn stream_stdout(&self, job_run_id: &str, opts: StreamOptions)
|
||||||
|
-> impl Stream<Item = Result<String, LogError>>;
|
||||||
|
|
||||||
|
/// Stream stderr lines from a job run
|
||||||
|
fn stream_stderr(&self, job_run_id: &str, opts: StreamOptions)
|
||||||
|
-> impl Stream<Item = Result<String, LogError>>;
|
||||||
|
|
||||||
|
/// Write job metadata on completion
|
||||||
|
fn write_metadata(&mut self, job_run_id: &str, metadata: JobMetadata)
|
||||||
|
-> Result<(), LogError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct StreamOptions {
|
||||||
|
pub follow: bool, // Keep streaming new lines
|
||||||
|
pub since_line: usize, // Start from line N
|
||||||
|
pub max_lines: Option<usize>, // Limit to N lines
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Initial Implementation: `FileLogStore`
|
||||||
|
|
||||||
|
```rust
|
||||||
|
pub struct FileLogStore {
|
||||||
|
base_path: PathBuf, // e.g., /var/log/databuild/job_runs
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Writes directly to `{base_path}/{job_run_id}/stdout.log`.
|
||||||
|
|
||||||
|
### Future Implementations
|
||||||
|
|
||||||
|
- **`ShippingLogStore`**: Wraps `FileLogStore`, ships logs to S3/GCS/CloudWatch in background
|
||||||
|
- **`CompositeLogStore`**: Writes to multiple stores (local + remote)
|
||||||
|
- **`BufferedLogStore`**: Batches writes for efficiency
|
||||||
|
|
||||||
|
## Integration with Job Runner
|
||||||
|
|
||||||
|
The `SubProcessBackend` (in `job_run.rs`) currently buffers stdout in memory. This needs updating:
|
||||||
|
|
||||||
|
### Current (in-memory buffering):
|
||||||
|
```rust
|
||||||
|
pub struct SubProcessRunning {
|
||||||
|
pub process: Child,
|
||||||
|
pub stdout_buffer: Vec<String>, // ❌ Unbounded memory
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Proposed (streaming to disk):
|
||||||
|
```rust
|
||||||
|
pub struct SubProcessRunning {
|
||||||
|
pub process: Child,
|
||||||
|
pub log_store: Arc<Mutex<dyn LogStore>>,
|
||||||
|
pub job_run_id: String,
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
When polling the job:
|
||||||
|
1. Read available stdout/stderr from process
|
||||||
|
2. Write each line to `log_store.append_stdout(job_run_id, line)`
|
||||||
|
3. Parse for special lines (e.g., `DATABUILD_MISSING_DEPS_JSON:...`)
|
||||||
|
4. Don't keep full log in memory
|
||||||
|
|
||||||
|
## CLI Integration
|
||||||
|
|
||||||
|
The CLI should support log streaming:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Stream logs for a running or completed job
|
||||||
|
databuild logs <job_run_id>
|
||||||
|
|
||||||
|
# Follow mode (tail -f)
|
||||||
|
databuild logs <job_run_id> --follow
|
||||||
|
|
||||||
|
# Show last N lines
|
||||||
|
databuild logs <job_run_id> --tail 100
|
||||||
|
|
||||||
|
# Show stderr instead of stdout
|
||||||
|
databuild logs <job_run_id> --stderr
|
||||||
|
```
|
||||||
|
|
||||||
|
Under the hood, this hits the `/api/job_runs/{id}/logs/stdout?follow=true` endpoint.
|
||||||
|
|
||||||
|
## Web App Integration
|
||||||
|
|
||||||
|
The web app can use native EventSource API:
|
||||||
|
|
||||||
|
```javascript
|
||||||
|
const eventSource = new EventSource(`/api/job_runs/${jobId}/logs/stdout?follow=true`);
|
||||||
|
|
||||||
|
eventSource.addEventListener('log', (event) => {
|
||||||
|
appendToTerminal(event.data);
|
||||||
|
});
|
||||||
|
|
||||||
|
eventSource.addEventListener('complete', (event) => {
|
||||||
|
const metadata = JSON.parse(event.data);
|
||||||
|
showExitCode(metadata.exit_code);
|
||||||
|
eventSource.close();
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
## Future: Log Shipping
|
||||||
|
|
||||||
|
When adding log shipping (e.g., to S3, CloudWatch, Datadog):
|
||||||
|
|
||||||
|
1. Create a `ShippingLogStore` implementation
|
||||||
|
2. Run background task that:
|
||||||
|
- Watches for completed jobs
|
||||||
|
- Batches log lines
|
||||||
|
- Ships to configured destination
|
||||||
|
- Deletes local files after successful upload (if configured)
|
||||||
|
3. Configure via:
|
||||||
|
```bash
|
||||||
|
export DATABUILD_LOG_SHIP_DEST=s3://my-bucket/databuild-logs
|
||||||
|
export DATABUILD_LOG_KEEP_LOCAL=false # Delete after ship
|
||||||
|
```
|
||||||
|
|
||||||
|
The `LogStore` trait means the core system doesn't change - just swap implementations.
|
||||||
|
|
||||||
|
## Open Questions
|
||||||
|
|
||||||
|
1. **Log format**: Plain text vs structured (JSON lines)?
|
||||||
|
- Plain text is more human-readable
|
||||||
|
- Structured is easier to search/analyze
|
||||||
|
- Suggestion: Plain text in files, parse to structured for API if needed
|
||||||
|
|
||||||
|
2. **Compression**: Compress old logs to save space?
|
||||||
|
- Could gzip files older than 24 hours
|
||||||
|
- Trade-off: disk space vs CPU on access
|
||||||
|
|
||||||
|
3. **Indexing**: Build an index for fast log search?
|
||||||
|
- Simple grep is probably fine initially
|
||||||
|
- Could add full-text search later if needed
|
||||||
|
|
||||||
|
4. **Multi-machine**: How do logs work in distributed builds?
|
||||||
|
- Each build machine has its own log directory
|
||||||
|
- Central service aggregates via log shipping
|
||||||
|
- Need to design this when we tackle distributed execution
|
||||||
Loading…
Reference in a new issue