Implement typestate state machine for wants
This commit is contained in:
parent
a9b68bfa6a
commit
0c766a381b
2 changed files with 290 additions and 0 deletions
|
|
@ -8,6 +8,7 @@ mod build_state;
|
|||
mod event_transforms;
|
||||
mod data_deps;
|
||||
mod mock_job_run;
|
||||
mod want_state;
|
||||
|
||||
// Include generated protobuf code
|
||||
include!("databuild.rs");
|
||||
|
|
|
|||
289
databuild/want_state.rs
Normal file
289
databuild/want_state.rs
Normal file
|
|
@ -0,0 +1,289 @@
|
|||
use crate::util::current_timestamp;
|
||||
use crate::{EventSource, PartitionRef, WantDetail, WantStatus, WantStatusCode};
|
||||
|
||||
/// State: Want has been created and is ready to be scheduled
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct IdleState {}
|
||||
|
||||
/// State: Want is currently being built by one or more jobs
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BuildingState {
|
||||
pub started_at: u64,
|
||||
}
|
||||
|
||||
/// State: Want is waiting for upstream dependencies to be built
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct UpstreamBuildingState {
|
||||
pub upstream_want_ids: Vec<String>,
|
||||
}
|
||||
|
||||
/// State: Want has been successfully completed (all partitions live)
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SuccessfulState {
|
||||
pub completed_at: u64,
|
||||
}
|
||||
|
||||
/// State: Want failed because a partition build failed
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FailedState {
|
||||
pub failed_at: u64,
|
||||
pub failed_partition_refs: Vec<PartitionRef>,
|
||||
pub failure_reason: String,
|
||||
}
|
||||
|
||||
/// State: Want failed because an upstream dependency failed
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct UpstreamFailedState {
|
||||
pub failed_at: u64,
|
||||
pub failed_wants: Vec<String>, // wants that failed
|
||||
}
|
||||
|
||||
/// State: Want has been explicitly canceled
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CanceledState {
|
||||
pub canceled_at: u64,
|
||||
pub canceled_by: Option<EventSource>,
|
||||
pub comment: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WantInfo {
|
||||
pub want_id: String,
|
||||
pub partitions: Vec<PartitionRef>,
|
||||
pub upstreams: Vec<PartitionRef>,
|
||||
pub data_timestamp: u64,
|
||||
pub ttl_seconds: u64,
|
||||
pub sla_seconds: u64,
|
||||
pub source: Option<EventSource>,
|
||||
pub comment: Option<String>,
|
||||
pub last_updated_at: u64,
|
||||
}
|
||||
|
||||
impl WantInfo {
|
||||
pub fn updated_timestamp(self) -> Self {
|
||||
Self { last_updated_at: current_timestamp(), ..self }
|
||||
}
|
||||
}
|
||||
|
||||
/// Generic want struct parameterized by state
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WantWithState<S> {
|
||||
pub want: WantInfo,
|
||||
pub state: S,
|
||||
}
|
||||
|
||||
/// Wrapper enum for storing wants in collections
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Want {
|
||||
Idle(WantWithState<IdleState>),
|
||||
Building(WantWithState<BuildingState>),
|
||||
UpstreamBuilding(WantWithState<UpstreamBuildingState>),
|
||||
Successful(WantWithState<SuccessfulState>),
|
||||
Failed(WantWithState<FailedState>),
|
||||
UpstreamFailed(WantWithState<UpstreamFailedState>),
|
||||
Canceled(WantWithState<CanceledState>),
|
||||
}
|
||||
|
||||
// Type-safe transition methods for IdleState
|
||||
impl WantWithState<IdleState> {
|
||||
/// Transition from Idle to Building when a job starts building this want's partitions
|
||||
pub fn start_building(
|
||||
self,
|
||||
started_at: u64,
|
||||
) -> WantWithState<BuildingState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: BuildingState { started_at },
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition from Idle to Canceled when want is explicitly canceled
|
||||
pub fn cancel(
|
||||
self,
|
||||
canceled_by: Option<EventSource>,
|
||||
comment: Option<String>,
|
||||
) -> WantWithState<CanceledState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: CanceledState {
|
||||
canceled_at: current_timestamp(),
|
||||
canceled_by,
|
||||
comment,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Type-safe transition methods for BuildingState
|
||||
impl WantWithState<BuildingState> {
|
||||
/// Transition from Building to Successful when all partitions are built
|
||||
pub fn complete(self, job_run_id: String, timestamp: u64) -> WantWithState<SuccessfulState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: SuccessfulState { completed_at: timestamp },
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition from Building to Failed when a partition build fails
|
||||
pub fn fail(self, failed_partition_refs: Vec<PartitionRef>, reason: String) -> WantWithState<FailedState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: FailedState {
|
||||
failed_at: current_timestamp(),
|
||||
failed_partition_refs,
|
||||
failure_reason: reason,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition from Building to UpstreamBuilding when missing dependencies are detected
|
||||
pub fn detect_missing_deps(
|
||||
self,
|
||||
upstream_want_ids: Vec<String>,
|
||||
) -> WantWithState<UpstreamBuildingState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: UpstreamBuildingState { upstream_want_ids },
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition from Building to Canceled when want is explicitly canceled
|
||||
pub fn cancel(
|
||||
self,
|
||||
source: Option<EventSource>,
|
||||
timestamp: u64,
|
||||
comment: Option<String>,
|
||||
) -> WantWithState<CanceledState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: CanceledState {
|
||||
canceled_at: timestamp,
|
||||
canceled_by: source,
|
||||
comment,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Type-safe transition methods for UpstreamBuildingState
|
||||
impl WantWithState<UpstreamBuildingState> {
|
||||
/// Transition from UpstreamBuilding back to Idle when upstreams are satisfied
|
||||
pub fn upstreams_satisfied(self) -> WantWithState<IdleState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: IdleState {},
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition from UpstreamBuilding to UpstreamFailed when an upstream dependency fails
|
||||
pub fn upstream_failed(
|
||||
self,
|
||||
failed_upstreams: Vec<String>,
|
||||
timestamp: u64,
|
||||
) -> WantWithState<UpstreamFailedState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: UpstreamFailedState { failed_at: timestamp, failed_wants: failed_upstreams },
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition from UpstreamBuilding to Canceled when want is explicitly canceled
|
||||
pub fn cancel(
|
||||
self,
|
||||
source: Option<EventSource>,
|
||||
timestamp: u64,
|
||||
comment: Option<String>,
|
||||
) -> WantWithState<CanceledState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: CanceledState {
|
||||
canceled_at: timestamp,
|
||||
canceled_by: source,
|
||||
comment,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper methods on the Want enum
|
||||
impl Want {
|
||||
/// Create a new want in the Idle state
|
||||
pub fn new(
|
||||
want_id: String,
|
||||
partitions: Vec<PartitionRef>,
|
||||
data_timestamp: u64,
|
||||
ttl_seconds: u64,
|
||||
sla_seconds: u64,
|
||||
source: Option<EventSource>,
|
||||
comment: Option<String>,
|
||||
) -> Self {
|
||||
Want::Idle(WantWithState {
|
||||
want: WantInfo {
|
||||
want_id,
|
||||
partitions,
|
||||
upstreams: vec![],
|
||||
data_timestamp,
|
||||
ttl_seconds,
|
||||
sla_seconds,
|
||||
source,
|
||||
comment,
|
||||
last_updated_at: current_timestamp(),
|
||||
},
|
||||
state: IdleState {},
|
||||
})
|
||||
}
|
||||
|
||||
/// Check if want is schedulable (Idle or UpstreamBuilding with satisfied upstreams)
|
||||
pub fn is_schedulable(&self) -> bool {
|
||||
match self {
|
||||
Want::Idle(_) => true,
|
||||
// Note: upstream building shouldn't return true ever, it should go back to idle to
|
||||
// be explicit about upstreams completing, which then makes it schedulable
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if want is in a terminal state (Successful, Failed, UpstreamFailed, or Canceled)
|
||||
pub fn is_terminal(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
Want::Successful(_) | Want::Failed(_) | Want::UpstreamFailed(_) | Want::Canceled(_)
|
||||
)
|
||||
}
|
||||
|
||||
pub fn want(&self) -> &WantInfo {
|
||||
match self {
|
||||
Want::Idle(w) => &w.want,
|
||||
Want::Building(w) => &w.want,
|
||||
Want::UpstreamBuilding(w) => &w.want,
|
||||
Want::Successful(w) => &w.want,
|
||||
Want::Failed(w) => &w.want,
|
||||
Want::UpstreamFailed(w) => &w.want,
|
||||
Want::Canceled(w) => &w.want,
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert to WantDetail for API responses and queries
|
||||
pub fn to_detail(&self) -> WantDetail {
|
||||
WantDetail {
|
||||
want_id: self.want().want_id.clone(),
|
||||
partitions: self.want().partitions.clone(),
|
||||
upstreams: self.want().upstreams.clone(),
|
||||
data_timestamp: self.want().data_timestamp,
|
||||
ttl_seconds: self.want().ttl_seconds,
|
||||
sla_seconds: self.want().sla_seconds,
|
||||
source: self.want().source.clone(),
|
||||
comment: self.want().comment.clone(),
|
||||
last_updated_timestamp: self.want().last_updated_at,
|
||||
status: match self {
|
||||
Want::Idle(_) => Some(WantStatusCode::WantIdle.into()),
|
||||
Want::Building(_) => Some(WantStatusCode::WantBuilding.into()),
|
||||
Want::UpstreamBuilding(_) => Some(WantStatusCode::WantUpstreamBuilding.into()),
|
||||
Want::Successful(_) => Some(WantStatusCode::WantSuccessful.into()),
|
||||
Want::Failed(_) => Some(WantStatusCode::WantFailed.into()),
|
||||
Want::UpstreamFailed(_) => Some(WantStatusCode::WantUpstreamFailed.into()),
|
||||
Want::Canceled(_) => Some(WantStatusCode::WantCanceled.into()),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue