diff --git a/databuild/lib.rs b/databuild/lib.rs index 5d669dc..ac9339c 100644 --- a/databuild/lib.rs +++ b/databuild/lib.rs @@ -8,6 +8,7 @@ mod build_state; mod event_transforms; mod data_deps; mod mock_job_run; +mod want_state; // Include generated protobuf code include!("databuild.rs"); diff --git a/databuild/want_state.rs b/databuild/want_state.rs new file mode 100644 index 0000000..32d01cb --- /dev/null +++ b/databuild/want_state.rs @@ -0,0 +1,289 @@ +use crate::util::current_timestamp; +use crate::{EventSource, PartitionRef, WantDetail, WantStatus, WantStatusCode}; + +/// State: Want has been created and is ready to be scheduled +#[derive(Debug, Clone)] +pub struct IdleState {} + +/// State: Want is currently being built by one or more jobs +#[derive(Debug, Clone)] +pub struct BuildingState { + pub started_at: u64, +} + +/// State: Want is waiting for upstream dependencies to be built +#[derive(Debug, Clone)] +pub struct UpstreamBuildingState { + pub upstream_want_ids: Vec, +} + +/// State: Want has been successfully completed (all partitions live) +#[derive(Debug, Clone)] +pub struct SuccessfulState { + pub completed_at: u64, +} + +/// State: Want failed because a partition build failed +#[derive(Debug, Clone)] +pub struct FailedState { + pub failed_at: u64, + pub failed_partition_refs: Vec, + pub failure_reason: String, +} + +/// State: Want failed because an upstream dependency failed +#[derive(Debug, Clone)] +pub struct UpstreamFailedState { + pub failed_at: u64, + pub failed_wants: Vec, // wants that failed +} + +/// State: Want has been explicitly canceled +#[derive(Debug, Clone)] +pub struct CanceledState { + pub canceled_at: u64, + pub canceled_by: Option, + pub comment: Option, +} + +#[derive(Debug, Clone)] +pub struct WantInfo { + pub want_id: String, + pub partitions: Vec, + pub upstreams: Vec, + pub data_timestamp: u64, + pub ttl_seconds: u64, + pub sla_seconds: u64, + pub source: Option, + pub comment: Option, + pub last_updated_at: u64, +} + +impl WantInfo { + pub fn updated_timestamp(self) -> Self { + Self { last_updated_at: current_timestamp(), ..self } + } +} + +/// Generic want struct parameterized by state +#[derive(Debug, Clone)] +pub struct WantWithState { + pub want: WantInfo, + pub state: S, +} + +/// Wrapper enum for storing wants in collections +#[derive(Debug, Clone)] +pub enum Want { + Idle(WantWithState), + Building(WantWithState), + UpstreamBuilding(WantWithState), + Successful(WantWithState), + Failed(WantWithState), + UpstreamFailed(WantWithState), + Canceled(WantWithState), +} + +// Type-safe transition methods for IdleState +impl WantWithState { + /// Transition from Idle to Building when a job starts building this want's partitions + pub fn start_building( + self, + started_at: u64, + ) -> WantWithState { + WantWithState { + want: self.want.updated_timestamp(), + state: BuildingState { started_at }, + } + } + + /// Transition from Idle to Canceled when want is explicitly canceled + pub fn cancel( + self, + canceled_by: Option, + comment: Option, + ) -> WantWithState { + WantWithState { + want: self.want.updated_timestamp(), + state: CanceledState { + canceled_at: current_timestamp(), + canceled_by, + comment, + }, + } + } +} + +// Type-safe transition methods for BuildingState +impl WantWithState { + /// Transition from Building to Successful when all partitions are built + pub fn complete(self, job_run_id: String, timestamp: u64) -> WantWithState { + WantWithState { + want: self.want.updated_timestamp(), + state: SuccessfulState { completed_at: timestamp }, + } + } + + /// Transition from Building to Failed when a partition build fails + pub fn fail(self, failed_partition_refs: Vec, reason: String) -> WantWithState { + WantWithState { + want: self.want.updated_timestamp(), + state: FailedState { + failed_at: current_timestamp(), + failed_partition_refs, + failure_reason: reason, + }, + } + } + + /// Transition from Building to UpstreamBuilding when missing dependencies are detected + pub fn detect_missing_deps( + self, + upstream_want_ids: Vec, + ) -> WantWithState { + WantWithState { + want: self.want.updated_timestamp(), + state: UpstreamBuildingState { upstream_want_ids }, + } + } + + /// Transition from Building to Canceled when want is explicitly canceled + pub fn cancel( + self, + source: Option, + timestamp: u64, + comment: Option, + ) -> WantWithState { + WantWithState { + want: self.want.updated_timestamp(), + state: CanceledState { + canceled_at: timestamp, + canceled_by: source, + comment, + }, + } + } +} + +// Type-safe transition methods for UpstreamBuildingState +impl WantWithState { + /// Transition from UpstreamBuilding back to Idle when upstreams are satisfied + pub fn upstreams_satisfied(self) -> WantWithState { + WantWithState { + want: self.want.updated_timestamp(), + state: IdleState {}, + } + } + + /// Transition from UpstreamBuilding to UpstreamFailed when an upstream dependency fails + pub fn upstream_failed( + self, + failed_upstreams: Vec, + timestamp: u64, + ) -> WantWithState { + WantWithState { + want: self.want.updated_timestamp(), + state: UpstreamFailedState { failed_at: timestamp, failed_wants: failed_upstreams }, + } + } + + /// Transition from UpstreamBuilding to Canceled when want is explicitly canceled + pub fn cancel( + self, + source: Option, + timestamp: u64, + comment: Option, + ) -> WantWithState { + WantWithState { + want: self.want.updated_timestamp(), + state: CanceledState { + canceled_at: timestamp, + canceled_by: source, + comment, + }, + } + } +} + +// Helper methods on the Want enum +impl Want { + /// Create a new want in the Idle state + pub fn new( + want_id: String, + partitions: Vec, + data_timestamp: u64, + ttl_seconds: u64, + sla_seconds: u64, + source: Option, + comment: Option, + ) -> Self { + Want::Idle(WantWithState { + want: WantInfo { + want_id, + partitions, + upstreams: vec![], + data_timestamp, + ttl_seconds, + sla_seconds, + source, + comment, + last_updated_at: current_timestamp(), + }, + state: IdleState {}, + }) + } + + /// Check if want is schedulable (Idle or UpstreamBuilding with satisfied upstreams) + pub fn is_schedulable(&self) -> bool { + match self { + Want::Idle(_) => true, + // Note: upstream building shouldn't return true ever, it should go back to idle to + // be explicit about upstreams completing, which then makes it schedulable + _ => false, + } + } + + /// Check if want is in a terminal state (Successful, Failed, UpstreamFailed, or Canceled) + pub fn is_terminal(&self) -> bool { + matches!( + self, + Want::Successful(_) | Want::Failed(_) | Want::UpstreamFailed(_) | Want::Canceled(_) + ) + } + + pub fn want(&self) -> &WantInfo { + match self { + Want::Idle(w) => &w.want, + Want::Building(w) => &w.want, + Want::UpstreamBuilding(w) => &w.want, + Want::Successful(w) => &w.want, + Want::Failed(w) => &w.want, + Want::UpstreamFailed(w) => &w.want, + Want::Canceled(w) => &w.want, + } + } + + /// Convert to WantDetail for API responses and queries + pub fn to_detail(&self) -> WantDetail { + WantDetail { + want_id: self.want().want_id.clone(), + partitions: self.want().partitions.clone(), + upstreams: self.want().upstreams.clone(), + data_timestamp: self.want().data_timestamp, + ttl_seconds: self.want().ttl_seconds, + sla_seconds: self.want().sla_seconds, + source: self.want().source.clone(), + comment: self.want().comment.clone(), + last_updated_timestamp: self.want().last_updated_at, + status: match self { + Want::Idle(_) => Some(WantStatusCode::WantIdle.into()), + Want::Building(_) => Some(WantStatusCode::WantBuilding.into()), + Want::UpstreamBuilding(_) => Some(WantStatusCode::WantUpstreamBuilding.into()), + Want::Successful(_) => Some(WantStatusCode::WantSuccessful.into()), + Want::Failed(_) => Some(WantStatusCode::WantFailed.into()), + Want::UpstreamFailed(_) => Some(WantStatusCode::WantUpstreamFailed.into()), + Want::Canceled(_) => Some(WantStatusCode::WantCanceled.into()), + }, + } + } +}