Implement typed IDs (and reformat)
Some checks are pending
/ setup (push) Waiting to run

This commit is contained in:
Stuart Axelbrooke 2025-11-22 19:31:08 +08:00
parent 8e8ff33ef8
commit eb44350865
11 changed files with 381 additions and 221 deletions

View file

@ -1,7 +1,13 @@
use crate::build_state::BuildState;
use crate::data_build_event::Event;
use crate::util::{current_timestamp, DatabuildError};
use crate::{CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, CreateWantRequest, CreateWantResponse, DataBuildEvent, GetTaintRequest, GetTaintResponse, GetWantRequest, GetWantResponse, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, TaintCreateEventV1, WantCancelEventV1, WantCreateEventV1};
use crate::util::{DatabuildError, current_timestamp};
use crate::{
CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse,
CreateWantRequest, CreateWantResponse, DataBuildEvent, GetTaintRequest, GetTaintResponse,
GetWantRequest, GetWantResponse, ListJobRunsRequest, ListJobRunsResponse,
ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse,
ListWantsRequest, ListWantsResponse, TaintCreateEventV1, WantCancelEventV1, WantCreateEventV1,
};
use prost::Message;
use rusqlite::Connection;
use std::fmt::Debug;
@ -173,7 +179,6 @@ impl<S: BELStorage + Debug> BuildEventLog<S> {
Ok(idx)
}
// API methods
pub fn api_handle_list_wants(&self, req: ListWantsRequest) -> ListWantsResponse {
self.state.list_wants(&req)
@ -191,7 +196,10 @@ impl<S: BELStorage + Debug> BuildEventLog<S> {
self.state.list_job_runs(&req)
}
pub fn api_handle_want_create(&mut self, req: CreateWantRequest) -> Result<CreateWantResponse, DatabuildError> {
pub fn api_handle_want_create(
&mut self,
req: CreateWantRequest,
) -> Result<CreateWantResponse, DatabuildError> {
let ev: WantCreateEventV1 = req.into();
self.append_event(&ev.clone().into())?;
Ok(self.state.get_want(&ev.want_id).into())
@ -201,13 +209,19 @@ impl<S: BELStorage + Debug> BuildEventLog<S> {
self.state.get_want(&req.want_id).into()
}
pub fn api_handle_want_cancel(&mut self, req: CancelWantRequest) -> Result<CancelWantResponse, DatabuildError> {
pub fn api_handle_want_cancel(
&mut self,
req: CancelWantRequest,
) -> Result<CancelWantResponse, DatabuildError> {
let ev: WantCancelEventV1 = req.into();
self.append_event(&ev.clone().into())?;
Ok(self.state.get_want(&ev.want_id).into())
}
pub fn api_handle_taint_create(&mut self, req: CreateTaintRequest) -> Result<CreateTaintResponse, DatabuildError> {
pub fn api_handle_taint_create(
&mut self,
req: CreateTaintRequest,
) -> Result<CreateTaintResponse, DatabuildError> {
// TODO Need to do this hierarchically? A taint will impact downstream partitions also
todo!();
let ev: TaintCreateEventV1 = req.into();
@ -264,7 +278,9 @@ mod tests {
// Append an event
let mut e = WantCreateEventV1::default();
e.want_id = want_id.clone();
e.partitions = vec!(PartitionRef { r#ref: "sqlite_partition_1234".to_string() });
e.partitions = vec![PartitionRef {
r#ref: "sqlite_partition_1234".to_string(),
}];
let event_id = log
.append_event(&Event::WantCreateV1(e))
.expect("append_event failed");
@ -298,7 +314,8 @@ mod tests {
"want_id not found in state"
);
assert_eq!(
log.state.get_want(&want_id)
log.state
.get_want(&want_id)
.map(|want| want.want_id.clone())
.expect("state.wants want_id not found"),
want_id,
@ -307,13 +324,16 @@ mod tests {
let mut e2 = WantCreateEventV1::default();
e2.want_id = Uuid::new_v4().into();
log.append_event(&Event::WantCreateV1(e2)).expect("append_event failed");
log.append_event(&Event::WantCreateV1(e2))
.expect("append_event failed");
let mut e3 = WantCreateEventV1::default();
e3.want_id = Uuid::new_v4().into();
log.append_event(&Event::WantCreateV1(e3)).expect("append_event failed");
log.append_event(&Event::WantCreateV1(e3))
.expect("append_event failed");
let mut e4 = WantCreateEventV1::default();
e4.want_id = Uuid::new_v4().into();
log.append_event(&Event::WantCreateV1(e4)).expect("append_event failed");
log.append_event(&Event::WantCreateV1(e4))
.expect("append_event failed");
let events = log
.storage

View file

@ -1,8 +1,13 @@
use crate::data_build_event::Event;
use crate::data_deps::{WantTimestamps, missing_deps_to_want_events};
use crate::partition_state::Partition;
use crate::partition_state::{
FailedPartitionRef, LivePartitionRef, MissingPartitionRef, MissingState, Partition,
PartitionWithState, TaintedPartitionRef,
};
use crate::util::current_timestamp;
use crate::want_state::{IdleState as WantIdleState, Want, WantWithState};
use crate::want_state::{
FailedWantId, IdleState as WantIdleState, SuccessfulWantId, Want, WantWithState,
};
use crate::{
JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1,
JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1,
@ -120,14 +125,14 @@ impl BuildState {
/// Used when a job run successfully completes
fn transition_partitions_to_live(
&mut self,
partition_refs: &[PartitionRef],
partition_refs: &[LivePartitionRef],
job_run_id: &str,
timestamp: u64,
) {
for pref in partition_refs {
let partition = self.partitions.remove(&pref.r#ref).expect(&format!(
let partition = self.partitions.remove(&pref.0.r#ref).expect(&format!(
"BUG: Partition {} must exist and be in Building state before completion",
pref.r#ref
pref.0.r#ref
));
// ONLY valid transition: Building -> Live
@ -139,11 +144,11 @@ impl BuildState {
_ => {
panic!(
"BUG: Invalid state - partition {} must be Building to transition to Live, found {:?}",
pref.r#ref, partition
pref.0.r#ref, partition
)
}
};
self.partitions.insert(pref.r#ref.clone(), transitioned);
self.partitions.insert(pref.0.r#ref.clone(), transitioned);
}
}
@ -151,14 +156,14 @@ impl BuildState {
/// Used when a job run fails
fn transition_partitions_to_failed(
&mut self,
partition_refs: &[PartitionRef],
partition_refs: &[FailedPartitionRef],
job_run_id: &str,
timestamp: u64,
) {
for pref in partition_refs {
let partition = self.partitions.remove(&pref.r#ref).expect(&format!(
let partition = self.partitions.remove(&pref.0.r#ref).expect(&format!(
"BUG: Partition {} must exist and be in Building state before failure",
pref.r#ref
pref.0.r#ref
));
// ONLY valid transition: Building -> Failed
@ -170,20 +175,17 @@ impl BuildState {
_ => {
panic!(
"BUG: Invalid state - partition {} must be Building to transition to Failed, found {:?}",
pref.r#ref, partition
pref.0.r#ref, partition
)
}
};
self.partitions.insert(pref.r#ref.clone(), transitioned);
self.partitions.insert(pref.0.r#ref.clone(), transitioned);
}
}
/// Reset partitions from Building back to Missing state
/// Used when a job run encounters missing dependencies and cannot proceed
fn reset_partitions_to_missing(
&mut self,
partition_refs: &[PartitionRef],
) {
fn reset_partitions_to_missing(&mut self, partition_refs: &[PartitionRef]) {
for pref in partition_refs {
let partition = self.partitions.remove(&pref.r#ref).expect(&format!(
"BUG: Partition {} must exist and be in Building state during dep_miss",
@ -209,16 +211,16 @@ impl BuildState {
/// Transitions Building → Successful, returns list of newly successful want IDs
fn complete_successful_wants(
&mut self,
newly_live_partitions: &[PartitionRef],
newly_live_partitions: &[LivePartitionRef],
job_run_id: &str,
timestamp: u64,
) -> Vec<String> {
let mut newly_successful_wants: Vec<String> = Vec::new();
) -> Vec<SuccessfulWantId> {
let mut newly_successful_wants: Vec<SuccessfulWantId> = Vec::new();
for pref in newly_live_partitions {
let want_ids = self
.partitions
.get(&pref.r#ref)
.get(&pref.0.r#ref)
.map(|p| p.want_ids().clone())
.unwrap_or_default();
@ -239,10 +241,10 @@ impl BuildState {
});
if all_partitions_live {
newly_successful_wants.push(want_id.clone());
Want::Successful(
building.complete(job_run_id.to_string(), timestamp),
)
let successful_want =
building.complete(job_run_id.to_string(), timestamp);
newly_successful_wants.push(successful_want.get_id());
Want::Successful(successful_want)
} else {
Want::Building(building) // Still building other partitions
}
@ -250,7 +252,7 @@ impl BuildState {
_ => {
panic!(
"BUG: Want {} in invalid state {:?} when partition {} became Live. Should be Building.",
want_id, want, pref.r#ref
want_id, want, pref.0.r#ref
);
}
};
@ -267,14 +269,14 @@ impl BuildState {
/// Returns list of newly failed want IDs for downstream cascade
fn fail_directly_affected_wants(
&mut self,
failed_partitions: &[PartitionRef],
) -> Vec<String> {
let mut newly_failed_wants: Vec<String> = Vec::new();
failed_partitions: &[FailedPartitionRef],
) -> Vec<FailedWantId> {
let mut newly_failed_wants: Vec<FailedWantId> = Vec::new();
for pref in failed_partitions {
let want_ids = self
.partitions
.get(&pref.r#ref)
.get(&pref.0.r#ref)
.map(|p| p.want_ids().clone())
.unwrap_or_default();
@ -286,10 +288,10 @@ impl BuildState {
let transitioned = match want {
Want::Building(building) => {
newly_failed_wants.push(want_id.clone());
Want::Failed(
building.fail(vec![pref.clone()], "Partition build failed".to_string()),
)
let failed = building
.fail(vec![pref.0.clone()], "Partition build failed".to_string());
newly_failed_wants.push(failed.get_id());
Want::Failed(failed)
}
// Failed → Failed: add new failed partition to existing failed state
Want::Failed(failed) => {
@ -298,7 +300,7 @@ impl BuildState {
_ => {
panic!(
"BUG: Want {} in invalid state {:?} when partition {} failed. Should be Building or Failed.",
want_id, want, pref.r#ref
want_id, want, pref.0.r#ref
);
}
};
@ -314,7 +316,7 @@ impl BuildState {
/// Transitions UpstreamBuilding → Idle (when ready) or Building (when partitions already building)
fn unblock_downstream_wants(
&mut self,
newly_successful_wants: &[String],
newly_successful_wants: &[SuccessfulWantId],
job_run_id: &str,
timestamp: u64,
) {
@ -327,11 +329,10 @@ impl BuildState {
match want {
Want::UpstreamBuilding(downstream_want) => {
// Is this downstream want waiting for any of the newly successful wants?
let is_affected = downstream_want
.state
.upstream_want_ids
.iter()
.any(|up_id| newly_successful_wants.contains(up_id));
let is_affected =
downstream_want.state.upstream_want_ids.iter().any(|up_id| {
newly_successful_wants.iter().any(|swid| &swid.0 == up_id)
});
if is_affected { Some(id.clone()) } else { None }
}
_ => None,
@ -374,10 +375,8 @@ impl BuildState {
if any_partition_building {
// Some partitions still being built, continue in Building state
Want::Building(
downstream_want.continue_building(
job_run_id.to_string(),
timestamp,
),
downstream_want
.continue_building(job_run_id.to_string(), timestamp),
)
} else {
// No partitions being built, become schedulable again
@ -401,7 +400,7 @@ impl BuildState {
/// Transitions UpstreamBuilding → UpstreamFailed
fn cascade_failures_to_downstream_wants(
&mut self,
newly_failed_wants: &[String],
newly_failed_wants: &[FailedWantId],
timestamp: u64,
) {
// Find downstream wants that are waiting for any of the newly failed wants
@ -413,11 +412,10 @@ impl BuildState {
match want {
Want::UpstreamBuilding(downstream_want) => {
// Is this downstream want waiting for any of the newly failed wants?
let is_affected = downstream_want
.state
.upstream_want_ids
.iter()
.any(|up_id| newly_failed_wants.contains(up_id));
let is_affected =
downstream_want.state.upstream_want_ids.iter().any(|up_id| {
newly_failed_wants.iter().any(|fwid| &fwid.0 == up_id)
});
if is_affected { Some(id.clone()) } else { None }
}
_ => None,
@ -433,8 +431,13 @@ impl BuildState {
let transitioned = match want {
Want::UpstreamBuilding(downstream_want) => Want::UpstreamFailed(
downstream_want
.upstream_failed(newly_failed_wants.to_vec(), timestamp),
downstream_want.upstream_failed(
newly_failed_wants
.iter()
.map(|fwid| fwid.0.clone())
.collect(),
timestamp,
),
),
_ => {
panic!("BUG: Want {} should be UpstreamBuilding here", want_id);
@ -570,10 +573,7 @@ impl BuildState {
}
}
fn handle_want_create(
&mut self,
event: &WantCreateEventV1,
) -> Vec<Event> {
fn handle_want_create(&mut self, event: &WantCreateEventV1) -> Vec<Event> {
// Use From impl to create want in Idle state
let want_idle: WantWithState<WantIdleState> = event.clone().into();
self.wants
@ -587,10 +587,7 @@ impl BuildState {
vec![]
}
fn handle_want_cancel(
&mut self,
event: &WantCancelEventV1,
) -> Vec<Event> {
fn handle_want_cancel(&mut self, event: &WantCancelEventV1) -> Vec<Event> {
// TODO actually cancel in-progress job runs that no longer have a sponsoring want
// Type-safe transition (API layer should prevent canceling terminal wants)
@ -626,10 +623,7 @@ impl BuildState {
vec![]
}
fn handle_job_run_buffer(
&mut self,
event: &JobRunBufferEventV1,
) -> Vec<Event> {
fn handle_job_run_buffer(&mut self, event: &JobRunBufferEventV1) -> Vec<Event> {
// No job run should exist - if it does, that's a BUG in the orchestrator
if self.job_runs.get(&event.job_run_id).is_some() {
panic!(
@ -683,11 +677,7 @@ impl BuildState {
vec![]
}
fn update_job_run_status(
&mut self,
job_run_id: &str,
status: JobRunStatusCode,
) {
fn update_job_run_status(&mut self, job_run_id: &str, status: JobRunStatusCode) {
let job_run = self.job_runs.get_mut(job_run_id).expect(&format!(
"BUG: Job run ID {} must exist to update status",
job_run_id
@ -696,52 +686,69 @@ impl BuildState {
job_run.status = Some(status.into());
}
fn handle_job_run_heartbeat(
&mut self,
event: &JobRunHeartbeatEventV1,
) -> Vec<Event> {
fn handle_job_run_heartbeat(&mut self, event: &JobRunHeartbeatEventV1) -> Vec<Event> {
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunRunning);
vec![]
}
fn handle_job_run_success(
&mut self,
event: &JobRunSuccessEventV1,
) -> Vec<Event> {
fn handle_job_run_success(&mut self, event: &JobRunSuccessEventV1) -> Vec<Event> {
println!("Job run success event: {:?}", event);
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunSucceeded);
let job_run = self.get_job_run(&event.job_run_id).unwrap();
// Clone building_partitions before we use it multiple times
let newly_live_partitions: Vec<PartitionRef> = job_run.building_partitions.clone();
// TODO correct this explicit upcasting of partition ref type
let newly_live_partitions: Vec<LivePartitionRef> = job_run
.building_partitions
.iter()
.map(|pref| LivePartitionRef(pref.clone()))
.collect();
// Update partitions being built by this job (strict type-safe transitions)
self.transition_partitions_to_live(&newly_live_partitions, &event.job_run_id, current_timestamp());
self.transition_partitions_to_live(
&newly_live_partitions,
&event.job_run_id,
current_timestamp(),
);
// Building → Successful (when all partitions Live)
let newly_successful_wants = self.complete_successful_wants(&newly_live_partitions, &event.job_run_id, current_timestamp());
let newly_successful_wants: Vec<SuccessfulWantId> = self.complete_successful_wants(
&newly_live_partitions,
&event.job_run_id,
current_timestamp(),
);
// UpstreamBuilding → Idle/Building (for downstream wants waiting on newly successful wants)
self.unblock_downstream_wants(&newly_successful_wants, &event.job_run_id, current_timestamp());
self.unblock_downstream_wants(
&newly_successful_wants,
&event.job_run_id,
current_timestamp(),
);
vec![]
}
fn handle_job_run_failure(
&mut self,
event: &JobRunFailureEventV1,
) -> Vec<Event> {
fn handle_job_run_failure(&mut self, event: &JobRunFailureEventV1) -> Vec<Event> {
self.update_job_run_status(&event.job_run_id, JobRunStatusCode::JobRunFailed);
let job_run = self.get_job_run(&event.job_run_id).unwrap();
// Clone building_partitions before we use it multiple times
let failed_partitions: Vec<PartitionRef> = job_run.building_partitions.clone();
let failed_partitions: Vec<FailedPartitionRef> = job_run
.building_partitions
.iter()
.map(|pref| FailedPartitionRef(pref.clone()))
.collect();
// Transition partitions using strict type-safe methods
self.transition_partitions_to_failed(&failed_partitions, &event.job_run_id, current_timestamp());
self.transition_partitions_to_failed(
&failed_partitions,
&event.job_run_id,
current_timestamp(),
);
// Building → Failed (for wants directly building failed partitions)
let newly_failed_wants = self.fail_directly_affected_wants(&failed_partitions);
let newly_failed_wants: Vec<FailedWantId> =
self.fail_directly_affected_wants(&failed_partitions);
// UpstreamBuilding → UpstreamFailed (for downstream wants waiting on newly failed wants)
self.cascade_failures_to_downstream_wants(&newly_failed_wants, current_timestamp());
@ -749,17 +756,11 @@ impl BuildState {
vec![]
}
fn handle_job_run_cancel(
&mut self,
_event: &JobRunCancelEventV1,
) -> Vec<Event> {
fn handle_job_run_cancel(&mut self, _event: &JobRunCancelEventV1) -> Vec<Event> {
todo!("should update already inserted job run, partition status, want status")
}
pub fn handle_job_run_dep_miss(
&mut self,
event: &JobRunMissingDepsEventV1,
) -> Vec<Event> {
pub fn handle_job_run_dep_miss(&mut self, event: &JobRunMissingDepsEventV1) -> Vec<Event> {
let job_run_detail = self.get_job_run(&event.job_run_id).expect(&format!(
"BUG: Unable to find job run with id `{}`",
event.job_run_id
@ -801,17 +802,11 @@ impl BuildState {
want_events
}
fn handle_taint_create(
&mut self,
_event: &TaintCreateEventV1,
) -> Vec<Event> {
fn handle_taint_create(&mut self, _event: &TaintCreateEventV1) -> Vec<Event> {
todo!("...?")
}
fn handle_taint_delete(
&mut self,
_event: &TaintCancelEventV1,
) -> Vec<Event> {
fn handle_taint_delete(&mut self, _event: &TaintCancelEventV1) -> Vec<Event> {
todo!("...?")
}
@ -913,22 +908,25 @@ impl BuildState {
*/
pub fn want_schedulability(&self, want: &WantDetail) -> WantSchedulability {
// Use type-safe partition checks from partitions
let mut live: Vec<PartitionRef> = Vec::new();
let mut tainted: Vec<PartitionRef> = Vec::new();
let mut missing: Vec<PartitionRef> = Vec::new();
let mut live: Vec<LivePartitionRef> = Vec::new();
let mut tainted: Vec<TaintedPartitionRef> = Vec::new();
let mut missing: Vec<MissingPartitionRef> = Vec::new();
for upstream_ref in &want.upstreams {
match self.partitions.get(&upstream_ref.r#ref) {
Some(partition) => {
if partition.is_live() {
live.push(upstream_ref.clone());
} else if matches!(partition, Partition::Tainted(_)) {
tainted.push(upstream_ref.clone());
match partition {
Partition::Live(p) => live.push(p.get_ref()),
Partition::Tainted(p) => tainted.push(p.get_ref()),
Partition::Missing(p) => missing.push(p.get_ref()),
_ => (), // Other states (Missing, Building, Failed) don't add to any list
}
// Other states (Missing, Building, Failed) don't add to any list
}
None => {
missing.push(upstream_ref.clone());
// TODO this definitely feels dirty, but we can't take a mutable ref of self to
// insert the missing partition here, and it feels a little over the top to
// create a more elaborate way to mint a missing ref.
missing.push(MissingPartitionRef(upstream_ref.clone()));
}
}
}
@ -958,9 +956,9 @@ impl BuildState {
/// The status of partitions required by a want to build (sensed from dep miss job run)
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WantUpstreamStatus {
pub live: Vec<PartitionRef>,
pub tainted: Vec<PartitionRef>,
pub missing: Vec<PartitionRef>,
pub live: Vec<LivePartitionRef>,
pub tainted: Vec<TaintedPartitionRef>,
pub missing: Vec<MissingPartitionRef>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]

View file

@ -1,8 +1,15 @@
use uuid::Uuid;
use crate::PartitionStatusCode::{PartitionFailed, PartitionLive};
use crate::data_build_event::Event;
use crate::util::current_timestamp;
use crate::{event_source, CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, CreateWantRequest, CreateWantResponse, EventSource, GetWantResponse, JobRunBufferEventV1, JobRunDetail, JobRunStatus, JobRunStatusCode, JobTriggeredEvent, ManuallyTriggeredEvent, PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantAttributedPartitions, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode};
use crate::PartitionStatusCode::{PartitionFailed, PartitionLive};
use crate::{
CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse,
CreateWantRequest, CreateWantResponse, EventSource, GetWantResponse, JobRunBufferEventV1,
JobRunDetail, JobRunStatus, JobRunStatusCode, JobTriggeredEvent, ManuallyTriggeredEvent,
PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, TaintCancelEventV1,
TaintCreateEventV1, TaintDetail, WantAttributedPartitions, WantCancelEventV1,
WantCreateEventV1, WantDetail, WantStatus, WantStatusCode, event_source,
};
use uuid::Uuid;
impl From<&WantCreateEventV1> for WantDetail {
fn from(e: &WantCreateEventV1) -> Self {
@ -76,25 +83,31 @@ impl From<JobRunBufferEventV1> for JobRunDetail {
}
}
pub fn want_status_matches_any(pds: &Vec<Option<PartitionDetail>>, status: PartitionStatusCode) -> bool {
pds.iter()
.any(|pd| pd.clone()
pub fn want_status_matches_any(
pds: &Vec<Option<PartitionDetail>>,
status: PartitionStatusCode,
) -> bool {
pds.iter().any(|pd| {
pd.clone()
.map(|pd| pd.status == Some(status.into()))
.unwrap_or(false))
.unwrap_or(false)
})
}
pub fn want_status_matches_all(pds: &Vec<Option<PartitionDetail>>, status: PartitionStatusCode) -> bool {
pds.iter()
.all(|pd| pd.clone()
pub fn want_status_matches_all(
pds: &Vec<Option<PartitionDetail>>,
status: PartitionStatusCode,
) -> bool {
pds.iter().all(|pd| {
pd.clone()
.map(|pd| pd.status == Some(status.into()))
.unwrap_or(false))
.unwrap_or(false)
})
}
/// Merges a list of partition details into a single status code.
/// Takes the lowest state as the want status.
impl Into<WantStatusCode> for Vec<Option<PartitionDetail>> {
fn into(self) -> WantStatusCode {
if want_status_matches_any(&self, PartitionFailed) {
WantStatusCode::WantFailed
@ -175,17 +188,13 @@ impl From<CreateWantRequest> for WantCreateEventV1 {
impl Into<CreateWantResponse> for Option<WantDetail> {
fn into(self) -> CreateWantResponse {
CreateWantResponse {
data: self,
}
CreateWantResponse { data: self }
}
}
impl Into<GetWantResponse> for Option<WantDetail> {
fn into(self) -> GetWantResponse {
GetWantResponse {
data: self,
}
GetWantResponse { data: self }
}
}
@ -201,9 +210,7 @@ impl From<CancelWantRequest> for WantCancelEventV1 {
impl Into<CancelWantResponse> for Option<WantDetail> {
fn into(self) -> CancelWantResponse {
CancelWantResponse {
data: self,
}
CancelWantResponse { data: self }
}
}
@ -219,4 +226,4 @@ impl Into<CreateTaintResponse> for Option<TaintDetail> {
// TODO
}
}
}
}

View file

@ -1,7 +1,7 @@
use crate::job_run::{JobRun, SubProcessBackend};
use crate::util::DatabuildError;
use crate::{JobConfig, PartitionRef, WantDetail};
use regex::Regex;
use crate::util::DatabuildError;
#[derive(Debug, Clone)]
pub struct JobConfiguration {
@ -12,17 +12,21 @@ pub struct JobConfiguration {
impl JobConfiguration {
/** Launch job to build the partitions specified by the provided wants. */
pub fn spawn(&self, wants: Vec<WantDetail>) -> Result<JobRun<SubProcessBackend>, std::io::Error> {
let wanted_refs: Vec<PartitionRef> =
wants.iter().flat_map(|want| want.partitions.clone()).collect();
pub fn spawn(
&self,
wants: Vec<WantDetail>,
) -> Result<JobRun<SubProcessBackend>, std::io::Error> {
let wanted_refs: Vec<PartitionRef> = wants
.iter()
.flat_map(|want| want.partitions.clone())
.collect();
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
Ok(JobRun::spawn(self.entry_point.clone(), args))
}
pub fn matches(&self, refs: &PartitionRef) -> bool {
self.patterns.iter().any(|pattern| {
let regex =
Regex::new(&pattern).expect(&format!("Invalid regex pattern: {}", pattern));
let regex = Regex::new(&pattern).expect(&format!("Invalid regex pattern: {}", pattern));
regex.is_match(&refs.r#ref)
})
}

View file

@ -417,7 +417,7 @@ impl ToEvent for SubProcessDepMiss {
mod tests {
use crate::data_build_event::Event;
use crate::data_deps::DATABUILD_MISSING_DEPS_JSON;
use crate::job_run::{JobRun, JobRunBackend, VisitResult, SubProcessBackend};
use crate::job_run::{JobRun, JobRunBackend, SubProcessBackend, VisitResult};
use crate::mock_job_run::MockJobRun;
use crate::{JobRunMissingDeps, MissingDeps};

View file

@ -1,13 +1,13 @@
mod build_event_log;
mod orchestrator;
mod job_run;
mod build_state;
mod data_deps;
mod event_transforms;
mod job;
mod job_run;
mod mock_job_run;
mod orchestrator;
mod partition_state;
mod util;
mod build_state;
mod event_transforms;
mod data_deps;
mod mock_job_run;
mod want_state;
// Include generated protobuf code

View file

@ -1,6 +1,6 @@
use std::collections::HashMap;
use crate::data_deps::DataDepLogLine;
use crate::{JobRunMissingDeps, MissingDeps};
use std::collections::HashMap;
pub struct MockJobRun {
sleep_ms: u64,
@ -54,13 +54,13 @@ impl MockJobRun {
}
pub fn dep_miss(self, missing_deps: Vec<MissingDeps>) -> Self {
self.exit_code(1)
.stdout_msg(
&DataDepLogLine::DepMiss(JobRunMissingDeps {
version: "1".to_string(),
missing_deps,
}).into()
)
self.exit_code(1).stdout_msg(
&DataDepLogLine::DepMiss(JobRunMissingDeps {
version: "1".to_string(),
missing_deps,
})
.into(),
)
}
pub fn to_env(&self) -> HashMap<String, String> {

View file

@ -97,7 +97,6 @@ struct WantGroup {
wants: Vec<WantDetail>,
}
#[derive(Debug, Clone)]
struct GroupedWants {
want_groups: Vec<WantGroup>,
@ -151,32 +150,30 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
let mut new_jobs = Vec::new();
for job in self.job_runs.drain(..) {
let transitioned = match job {
JobRun::Running(running) => {
match running.visit()? {
VisitResult::StillRunning(still_running) => {
println!("Still running job: {:?}", still_running.job_run_id);
JobRun::Running(still_running)
}
VisitResult::Completed(completed) => {
println!("Completed job: {:?}", completed.job_run_id);
let event = completed.state.to_event(&completed.job_run_id);
self.bel.append_event(&event)?;
JobRun::Completed(completed)
}
VisitResult::Failed(failed) => {
println!("Failed job: {:?}", failed.job_run_id);
let event = failed.state.to_event(&failed.job_run_id);
self.bel.append_event(&event)?;
JobRun::Failed(failed)
}
VisitResult::DepMiss(dep_miss) => {
println!("Dep miss job: {:?}", dep_miss.job_run_id);
let event = dep_miss.state.to_event(&dep_miss.job_run_id);
self.bel.append_event(&event)?;
JobRun::DepMiss(dep_miss)
}
JobRun::Running(running) => match running.visit()? {
VisitResult::StillRunning(still_running) => {
println!("Still running job: {:?}", still_running.job_run_id);
JobRun::Running(still_running)
}
}
VisitResult::Completed(completed) => {
println!("Completed job: {:?}", completed.job_run_id);
let event = completed.state.to_event(&completed.job_run_id);
self.bel.append_event(&event)?;
JobRun::Completed(completed)
}
VisitResult::Failed(failed) => {
println!("Failed job: {:?}", failed.job_run_id);
let event = failed.state.to_event(&failed.job_run_id);
self.bel.append_event(&event)?;
JobRun::Failed(failed)
}
VisitResult::DepMiss(dep_miss) => {
println!("Dep miss job: {:?}", dep_miss.job_run_id);
let event = dep_miss.state.to_event(&dep_miss.job_run_id);
self.bel.append_event(&event)?;
JobRun::DepMiss(dep_miss)
}
},
other => other, // Pass through all non-running states unchanged
};
new_jobs.push(transitioned);
@ -231,8 +228,11 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
use crate::job_run::JobRun;
// Compute args from wants the same way JobConfiguration::spawn() does
let wanted_refs: Vec<crate::PartitionRef> =
wg.wants.iter().flat_map(|want| want.partitions.clone()).collect();
let wanted_refs: Vec<crate::PartitionRef> = wg
.wants
.iter()
.flat_map(|want| want.partitions.clone())
.collect();
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
let job_run = JobRun::spawn(wg.job.entry_point.clone(), args);
@ -264,7 +264,10 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
#[cfg(test)]
fn count_running_jobs(&self) -> usize {
use crate::job_run::JobRun;
self.job_runs.iter().filter(|j| matches!(j, JobRun::Running(_))).count()
self.job_runs
.iter()
.filter(|j| matches!(j, JobRun::Running(_)))
.count()
}
#[cfg(test)]
@ -275,19 +278,28 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
#[cfg(test)]
fn count_not_started_jobs(&self) -> usize {
use crate::job_run::JobRun;
self.job_runs.iter().filter(|j| matches!(j, JobRun::NotStarted(_))).count()
self.job_runs
.iter()
.filter(|j| matches!(j, JobRun::NotStarted(_)))
.count()
}
#[cfg(test)]
fn count_dep_miss_jobs(&self) -> usize {
use crate::job_run::JobRun;
self.job_runs.iter().filter(|j| matches!(j, JobRun::DepMiss(_))).count()
self.job_runs
.iter()
.filter(|j| matches!(j, JobRun::DepMiss(_)))
.count()
}
#[cfg(test)]
fn count_completed_jobs(&self) -> usize {
use crate::job_run::JobRun;
self.job_runs.iter().filter(|j| matches!(j, JobRun::Completed(_))).count()
self.job_runs
.iter()
.filter(|j| matches!(j, JobRun::Completed(_)))
.count()
}
/** Entrypoint for running jobs */
@ -433,9 +445,17 @@ mod tests {
assert_eq!(orchestrator.count_not_started_jobs(), 1);
// Verify the job has the right args by checking the first NotStarted job
use crate::job_run::JobRun;
let not_started_job = orchestrator.job_runs.iter().find(|j| matches!(j, JobRun::NotStarted(_))).unwrap();
let not_started_job = orchestrator
.job_runs
.iter()
.find(|j| matches!(j, JobRun::NotStarted(_)))
.unwrap();
if let JobRun::NotStarted(job) = not_started_job {
assert_eq!(job.state.args, vec!["data/alpha"], "should have scheduled alpha job");
assert_eq!(
job.state.args,
vec!["data/alpha"],
"should have scheduled alpha job"
);
}
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
}
@ -599,9 +619,7 @@ mod tests {
thread::sleep(Duration::from_millis(1));
// Should still be running after 1ms
orchestrator
.step()
.expect("should still be running");
orchestrator.step().expect("should still be running");
assert_eq!(orchestrator.count_running_jobs(), 1);
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
println!("STATE: {:?}", orchestrator.bel.state);
@ -773,7 +791,8 @@ echo 'Beta succeeded'
.collect();
assert!(
beta_wants.iter().any(|w| w.status.as_ref().map(|s| s.code) == Some(WantStatusCode::WantUpstreamBuilding as i32)),
beta_wants.iter().any(|w| w.status.as_ref().map(|s| s.code)
== Some(WantStatusCode::WantUpstreamBuilding as i32)),
"At least one beta want should be in UpstreamBuilding state, found: {:?}",
beta_wants.iter().map(|w| &w.status).collect::<Vec<_>>()
);
@ -819,7 +838,11 @@ echo 'Beta succeeded'
// Step 7: Beta is rescheduled and started (want -> running_jobs)
orchestrator.step().expect("step 7");
assert_eq!(orchestrator.count_running_jobs(), 1, "beta should be running");
assert_eq!(
orchestrator.count_running_jobs(),
1,
"beta should be running"
);
// Step 8: Beta completes successfully
wait_for_jobs_to_complete(&mut orchestrator, 10).expect("beta job should complete");

View file

@ -1,4 +1,5 @@
use crate::{PartitionRef, PartitionDetail, PartitionStatus, PartitionStatusCode};
use crate::{PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode};
use serde::{Deserialize, Serialize};
/// State: Partition has been referenced but not yet built
#[derive(Debug, Clone)]
@ -49,6 +50,50 @@ pub enum Partition {
Tainted(PartitionWithState<TaintedState>),
}
/// Type-safe partition reference wrappers that encode state expectations in function signatures. It
/// is critical that these be treated with respect, not just summoned because it's convenient.
/// These should be created ephemerally from typestate objects via .get_ref() and used
/// immediately — never stored long-term, as partition state can change.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct MissingPartitionRef(pub PartitionRef);
impl PartitionWithState<MissingState> {
pub fn get_ref(&self) -> MissingPartitionRef {
MissingPartitionRef(self.partition_ref.clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct BuildingPartitionRef(pub PartitionRef);
impl PartitionWithState<BuildingState> {
pub fn get_ref(&self) -> BuildingPartitionRef {
BuildingPartitionRef(self.partition_ref.clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct LivePartitionRef(pub PartitionRef);
impl PartitionWithState<LiveState> {
pub fn get_ref(&self) -> LivePartitionRef {
LivePartitionRef(self.partition_ref.clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct FailedPartitionRef(pub PartitionRef);
impl PartitionWithState<FailedState> {
pub fn get_ref(&self) -> FailedPartitionRef {
FailedPartitionRef(self.partition_ref.clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct TaintedPartitionRef(pub PartitionRef);
impl PartitionWithState<TaintedState> {
pub fn get_ref(&self) -> TaintedPartitionRef {
TaintedPartitionRef(self.partition_ref.clone())
}
}
// Type-safe transition methods for MissingState
impl PartitionWithState<MissingState> {
/// Transition from Missing to Building when a job starts building this partition

View file

@ -1,5 +1,5 @@
use std::time::{SystemTime, UNIX_EPOCH};
use std::backtrace::Backtrace;
use std::time::{SystemTime, UNIX_EPOCH};
pub fn current_timestamp() -> u64 {
let now = SystemTime::now();
@ -27,7 +27,7 @@ impl DatabuildError {
Self {
msg: msg.into(),
source: None,
backtrace: maybe_backtrace()
backtrace: maybe_backtrace(),
}
}
}
@ -37,7 +37,7 @@ impl From<std::io::Error> for DatabuildError {
Self {
msg: err.to_string(),
source: Some(Box::new(err)),
backtrace: maybe_backtrace()
backtrace: maybe_backtrace(),
}
}
}
@ -47,7 +47,7 @@ impl From<rusqlite::Error> for DatabuildError {
Self {
msg: err.to_string(),
source: Some(Box::new(err)),
backtrace: maybe_backtrace()
backtrace: maybe_backtrace(),
}
}
}
@ -57,7 +57,7 @@ impl From<prost::EncodeError> for DatabuildError {
Self {
msg: err.to_string(),
source: Some(Box::new(err)),
backtrace: maybe_backtrace()
backtrace: maybe_backtrace(),
}
}
}
@ -67,7 +67,7 @@ impl From<serde_json::Error> for DatabuildError {
Self {
msg: err.to_string(),
source: Some(Box::new(err)),
backtrace: maybe_backtrace()
backtrace: maybe_backtrace(),
}
}
}

View file

@ -1,5 +1,8 @@
use crate::partition_state::FailedPartitionRef;
use crate::util::current_timestamp;
use crate::{EventSource, PartitionRef, WantCreateEventV1, WantDetail, WantStatusCode};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
/// State: Want has been created and is ready to be scheduled
#[derive(Debug, Clone)]
@ -101,6 +104,66 @@ pub enum Want {
Canceled(WantWithState<CanceledState>),
}
/// Type-safe partition reference wrappers that encode state expectations in function signatures. It
/// is critical that these be treated with respect, not just summoned because it's convenient.
/// These should be created ephemerally from typestate objects via .get_ref() and used
/// immediately — never stored long-term, as partition state can change.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct IdleWantId(pub String);
impl WantWithState<IdleState> {
pub fn get_id(&self) -> IdleWantId {
IdleWantId(self.want.want_id.clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct BuildingWantId(pub String);
impl WantWithState<BuildingState> {
pub fn get_id(&self) -> BuildingWantId {
BuildingWantId(self.want.want_id.clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct UpstreamBuildingWantId(pub String);
impl WantWithState<UpstreamBuildingState> {
pub fn get_id(&self) -> UpstreamBuildingWantId {
UpstreamBuildingWantId(self.want.want_id.clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct SuccessfulWantId(pub String);
impl WantWithState<SuccessfulState> {
pub fn get_id(&self) -> SuccessfulWantId {
SuccessfulWantId(self.want.want_id.clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct FailedWantId(pub String);
impl WantWithState<FailedState> {
pub fn get_id(&self) -> FailedWantId {
FailedWantId(self.want.want_id.clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct UpstreamFailedWantId(pub String);
impl WantWithState<UpstreamFailedState> {
pub fn get_id(&self) -> UpstreamFailedWantId {
UpstreamFailedWantId(self.want.want_id.clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct CanceledWantId(pub String);
impl WantWithState<CanceledState> {
pub fn get_id(&self) -> CanceledWantId {
CanceledWantId(self.want.want_id.clone())
}
}
// From impl for creating want from event
impl From<WantCreateEventV1> for WantWithState<IdleState> {
fn from(event: WantCreateEventV1) -> Self {
@ -207,20 +270,20 @@ impl WantWithState<BuildingState> {
// Type-safe transition methods for FailedState
impl WantWithState<FailedState> {
/// Add more failed partitions to an already-failed want (self-transition)
pub fn add_failed_partitions(mut self, partition_refs: Vec<PartitionRef>) -> Self {
pub fn add_failed_partitions(mut self, partition_refs: Vec<FailedPartitionRef>) -> Self {
for partition_ref in partition_refs {
if self
.state
.failed_partition_refs
.iter()
.any(|p| p.r#ref == partition_ref.r#ref)
.any(|p| p.r#ref == partition_ref.0.r#ref)
{
panic!(
"BUG: Attempted to add failed partition {} that already exists in want {}",
partition_ref.r#ref, self.want.want_id
partition_ref.0.r#ref, self.want.want_id
);
}
self.state.failed_partition_refs.push(partition_ref);
self.state.failed_partition_refs.push(partition_ref.0);
}
WantWithState {