582 lines
19 KiB
Rust
582 lines
19 KiB
Rust
use crate::partition_state::FailedPartitionRef;
|
|
use crate::util::{HasRelatedIds, RelatedIds, current_timestamp};
|
|
use crate::{EventSource, PartitionRef, WantCreateEventV1, WantDetail, WantStatusCode};
|
|
use serde::{Deserialize, Serialize};
|
|
use uuid::Uuid;
|
|
|
|
/// State: Want has just been created, state not yet determined by sensing partition states
|
|
#[derive(Debug, Clone)]
|
|
pub struct NewState {}
|
|
|
|
/// State: Want has been created and is ready to be scheduled
|
|
#[derive(Debug, Clone)]
|
|
pub struct IdleState {}
|
|
|
|
/// State: Want is currently being built by one or more jobs
|
|
#[derive(Debug, Clone)]
|
|
pub struct BuildingState {
|
|
pub started_at: u64,
|
|
}
|
|
|
|
/// State: Want is waiting for upstream dependencies to be built
|
|
#[derive(Debug, Clone)]
|
|
pub struct UpstreamBuildingState {
|
|
pub upstream_want_ids: Vec<String>,
|
|
}
|
|
|
|
/// State: Want has been successfully completed (all partitions live)
|
|
#[derive(Debug, Clone)]
|
|
pub struct SuccessfulState {
|
|
pub completed_at: u64,
|
|
}
|
|
|
|
/// State: Want failed because a partition build failed
|
|
#[derive(Debug, Clone)]
|
|
pub struct FailedState {
|
|
pub failed_at: u64,
|
|
pub failed_partition_refs: Vec<PartitionRef>,
|
|
pub failure_reason: String,
|
|
}
|
|
|
|
/// State: Want failed because an upstream dependency failed
|
|
#[derive(Debug, Clone)]
|
|
pub struct UpstreamFailedState {
|
|
pub failed_at: u64,
|
|
pub failed_wants: Vec<String>, // wants that failed
|
|
}
|
|
|
|
/// State: Want has been explicitly canceled
|
|
#[derive(Debug, Clone)]
|
|
pub struct CanceledState {
|
|
pub canceled_at: u64,
|
|
pub canceled_by: Option<EventSource>,
|
|
pub comment: Option<String>,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct WantInfo {
|
|
pub want_id: String,
|
|
pub partitions: Vec<PartitionRef>,
|
|
pub data_timestamp: u64,
|
|
pub ttl_seconds: u64,
|
|
pub sla_seconds: u64,
|
|
pub source: Option<EventSource>,
|
|
pub comment: Option<String>,
|
|
pub last_updated_at: u64,
|
|
}
|
|
|
|
impl Default for WantInfo {
|
|
fn default() -> Self {
|
|
Self {
|
|
want_id: uuid::Uuid::new_v4().to_string(),
|
|
partitions: vec![],
|
|
data_timestamp: 0,
|
|
ttl_seconds: 0,
|
|
sla_seconds: 0,
|
|
source: None,
|
|
comment: None,
|
|
last_updated_at: 0,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl WantInfo {
|
|
pub fn updated_timestamp(self) -> Self {
|
|
Self {
|
|
last_updated_at: current_timestamp(),
|
|
..self
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Generic want struct parameterized by state
|
|
#[derive(Debug, Clone)]
|
|
pub struct WantWithState<S> {
|
|
pub want: WantInfo,
|
|
pub state: S,
|
|
}
|
|
|
|
/// Wrapper enum for storing wants in collections
|
|
#[derive(Debug, Clone)]
|
|
pub enum Want {
|
|
New(WantWithState<NewState>),
|
|
Idle(WantWithState<IdleState>),
|
|
Building(WantWithState<BuildingState>),
|
|
UpstreamBuilding(WantWithState<UpstreamBuildingState>),
|
|
Successful(WantWithState<SuccessfulState>),
|
|
Failed(WantWithState<FailedState>),
|
|
UpstreamFailed(WantWithState<UpstreamFailedState>),
|
|
Canceled(WantWithState<CanceledState>),
|
|
}
|
|
|
|
/// Type-safe partition reference wrappers that encode state expectations in function signatures. It
|
|
/// is critical that these be treated with respect, not just summoned because it's convenient.
|
|
/// These should be created ephemerally from typestate objects via .get_ref() and used
|
|
/// immediately — never stored long-term, as partition state can change.
|
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
|
pub struct NewWantId(pub String);
|
|
impl WantWithState<NewState> {
|
|
pub fn get_id(&self) -> NewWantId {
|
|
NewWantId(self.want.want_id.clone())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
|
pub struct IdleWantId(pub String);
|
|
impl WantWithState<IdleState> {
|
|
pub fn get_id(&self) -> IdleWantId {
|
|
IdleWantId(self.want.want_id.clone())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
|
pub struct BuildingWantId(pub String);
|
|
impl WantWithState<BuildingState> {
|
|
pub fn get_id(&self) -> BuildingWantId {
|
|
BuildingWantId(self.want.want_id.clone())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
|
pub struct UpstreamBuildingWantId(pub String);
|
|
impl WantWithState<UpstreamBuildingState> {
|
|
pub fn get_id(&self) -> UpstreamBuildingWantId {
|
|
UpstreamBuildingWantId(self.want.want_id.clone())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
|
pub struct SuccessfulWantId(pub String);
|
|
impl WantWithState<SuccessfulState> {
|
|
pub fn get_id(&self) -> SuccessfulWantId {
|
|
SuccessfulWantId(self.want.want_id.clone())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
|
pub struct FailedWantId(pub String);
|
|
impl WantWithState<FailedState> {
|
|
pub fn get_id(&self) -> FailedWantId {
|
|
FailedWantId(self.want.want_id.clone())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
|
pub struct UpstreamFailedWantId(pub String);
|
|
impl WantWithState<UpstreamFailedState> {
|
|
pub fn get_id(&self) -> UpstreamFailedWantId {
|
|
UpstreamFailedWantId(self.want.want_id.clone())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
|
pub struct CanceledWantId(pub String);
|
|
impl WantWithState<CanceledState> {
|
|
pub fn get_id(&self) -> CanceledWantId {
|
|
CanceledWantId(self.want.want_id.clone())
|
|
}
|
|
}
|
|
|
|
// From impl for creating want from event - creates in New state for sensing
|
|
impl From<WantCreateEventV1> for WantWithState<NewState> {
|
|
fn from(event: WantCreateEventV1) -> Self {
|
|
WantWithState {
|
|
want: WantInfo {
|
|
want_id: event.want_id,
|
|
partitions: event.partitions,
|
|
data_timestamp: event.data_timestamp,
|
|
ttl_seconds: event.ttl_seconds,
|
|
sla_seconds: event.sla_seconds,
|
|
source: event.source,
|
|
comment: event.comment,
|
|
last_updated_at: current_timestamp(),
|
|
},
|
|
state: NewState {},
|
|
}
|
|
}
|
|
}
|
|
|
|
// Type-safe transition methods for NewState
|
|
impl WantWithState<NewState> {
|
|
/// Transition from New to Idle when partitions don't exist or are ready to schedule
|
|
pub fn to_idle(self) -> WantWithState<IdleState> {
|
|
WantWithState {
|
|
want: self.want.updated_timestamp(),
|
|
state: IdleState {},
|
|
}
|
|
}
|
|
|
|
/// Transition from New to Building when partitions are currently being built
|
|
pub fn to_building(self, started_at: u64) -> WantWithState<BuildingState> {
|
|
WantWithState {
|
|
want: self.want.updated_timestamp(),
|
|
state: BuildingState { started_at },
|
|
}
|
|
}
|
|
|
|
/// Transition from New to Successful when all partitions are already Live
|
|
pub fn to_successful(self, completed_at: u64) -> WantWithState<SuccessfulState> {
|
|
WantWithState {
|
|
want: self.want.updated_timestamp(),
|
|
state: SuccessfulState { completed_at },
|
|
}
|
|
}
|
|
|
|
/// Transition from New to Failed when any partition has failed
|
|
pub fn to_failed(
|
|
self,
|
|
failed_partition_refs: Vec<PartitionRef>,
|
|
reason: String,
|
|
) -> WantWithState<FailedState> {
|
|
WantWithState {
|
|
want: self.want.updated_timestamp(),
|
|
state: FailedState {
|
|
failed_at: current_timestamp(),
|
|
failed_partition_refs,
|
|
failure_reason: reason,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Transition from New to UpstreamBuilding when partitions are waiting for upstream deps
|
|
pub fn to_upstream_building(
|
|
self,
|
|
upstream_want_ids: Vec<String>,
|
|
) -> WantWithState<UpstreamBuildingState> {
|
|
WantWithState {
|
|
want: self.want.updated_timestamp(),
|
|
state: UpstreamBuildingState { upstream_want_ids },
|
|
}
|
|
}
|
|
|
|
/// Transition from New to UpstreamFailed when upstream dependencies have failed
|
|
pub fn to_upstream_failed(
|
|
self,
|
|
failed_wants: Vec<String>,
|
|
) -> WantWithState<UpstreamFailedState> {
|
|
WantWithState {
|
|
want: self.want.updated_timestamp(),
|
|
state: UpstreamFailedState {
|
|
failed_at: current_timestamp(),
|
|
failed_wants,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Transition from New to Canceled when want is explicitly canceled
|
|
/// (Rarely used - wants are typically transitioned before cancel can arrive)
|
|
pub fn cancel(
|
|
self,
|
|
canceled_by: Option<EventSource>,
|
|
comment: Option<String>,
|
|
) -> WantWithState<CanceledState> {
|
|
WantWithState {
|
|
want: self.want.updated_timestamp(),
|
|
state: CanceledState {
|
|
canceled_at: current_timestamp(),
|
|
canceled_by,
|
|
comment,
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
// Type-safe transition methods for IdleState
|
|
impl WantWithState<IdleState> {
|
|
/// Transition from Idle to Building when a job starts building this want's partitions
|
|
pub fn start_building(self, started_at: u64) -> WantWithState<BuildingState> {
|
|
WantWithState {
|
|
want: self.want.updated_timestamp(),
|
|
state: BuildingState { started_at },
|
|
}
|
|
}
|
|
|
|
/// Transition from Idle to Canceled when want is explicitly canceled
|
|
pub fn cancel(
|
|
self,
|
|
canceled_by: Option<EventSource>,
|
|
comment: Option<String>,
|
|
) -> WantWithState<CanceledState> {
|
|
WantWithState {
|
|
want: self.want.updated_timestamp(),
|
|
state: CanceledState {
|
|
canceled_at: current_timestamp(),
|
|
canceled_by,
|
|
comment,
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
// Type-safe transition methods for BuildingState
|
|
impl WantWithState<BuildingState> {
|
|
/// Transition from Building to Successful when all partitions are built
|
|
pub fn complete(self, job_run_id: String, timestamp: u64) -> WantWithState<SuccessfulState> {
|
|
WantWithState {
|
|
want: self.want.updated_timestamp(),
|
|
state: SuccessfulState {
|
|
completed_at: timestamp,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Transition from Building to Failed when a partition build fails
|
|
pub fn fail(
|
|
self,
|
|
failed_partition_refs: Vec<PartitionRef>,
|
|
reason: String,
|
|
) -> WantWithState<FailedState> {
|
|
WantWithState {
|
|
want: self.want.updated_timestamp(),
|
|
state: FailedState {
|
|
failed_at: current_timestamp(),
|
|
failed_partition_refs,
|
|
failure_reason: reason,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Transition from Building to UpstreamBuilding when missing dependencies are detected
|
|
pub fn detect_missing_deps(
|
|
self,
|
|
upstream_want_ids: Vec<String>,
|
|
) -> WantWithState<UpstreamBuildingState> {
|
|
WantWithState {
|
|
want: self.want.updated_timestamp(),
|
|
state: UpstreamBuildingState { upstream_want_ids },
|
|
}
|
|
}
|
|
|
|
/// Transition from Building to Canceled when want is explicitly canceled
|
|
pub fn cancel(
|
|
self,
|
|
source: Option<EventSource>,
|
|
timestamp: u64,
|
|
comment: Option<String>,
|
|
) -> WantWithState<CanceledState> {
|
|
WantWithState {
|
|
want: self.want.updated_timestamp(),
|
|
state: CanceledState {
|
|
canceled_at: timestamp,
|
|
canceled_by: source,
|
|
comment,
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
// Type-safe transition methods for FailedState
|
|
impl WantWithState<FailedState> {
|
|
/// Add more failed partitions to an already-failed want (self-transition)
|
|
pub fn add_failed_partitions(mut self, partition_refs: Vec<FailedPartitionRef>) -> Self {
|
|
for partition_ref in partition_refs {
|
|
if self
|
|
.state
|
|
.failed_partition_refs
|
|
.iter()
|
|
.any(|p| p.r#ref == partition_ref.0.r#ref)
|
|
{
|
|
panic!(
|
|
"BUG: Attempted to add failed partition {} that already exists in want {}",
|
|
partition_ref.0.r#ref, self.want.want_id
|
|
);
|
|
}
|
|
self.state.failed_partition_refs.push(partition_ref.0);
|
|
}
|
|
|
|
WantWithState {
|
|
want: self.want.updated_timestamp(),
|
|
state: self.state,
|
|
}
|
|
}
|
|
}
|
|
|
|
// Type-safe transition methods for UpstreamBuildingState
|
|
impl WantWithState<UpstreamBuildingState> {
|
|
/// Transition from UpstreamBuilding back to Idle when upstreams are satisfied and no jobs are still building
|
|
pub fn upstreams_satisfied(self) -> WantWithState<IdleState> {
|
|
WantWithState {
|
|
want: self.want.updated_timestamp(),
|
|
state: IdleState {},
|
|
}
|
|
}
|
|
|
|
/// Transition from UpstreamBuilding to Building when upstreams are satisfied but jobs are still actively building
|
|
pub fn continue_building(
|
|
self,
|
|
_job_run_id: String, // Reference to active building job for safety/documentation
|
|
started_at: u64,
|
|
) -> WantWithState<BuildingState> {
|
|
WantWithState {
|
|
want: self.want.updated_timestamp(),
|
|
state: BuildingState { started_at },
|
|
}
|
|
}
|
|
|
|
/// Add more upstream dependencies (self-transition)
|
|
pub fn add_upstreams(mut self, want_ids: Vec<String>) -> Self {
|
|
for want_id in want_ids {
|
|
if self.state.upstream_want_ids.contains(&want_id) {
|
|
panic!(
|
|
"BUG: Attempted to add upstream want {} that already exists in want {}",
|
|
want_id, self.want.want_id
|
|
);
|
|
}
|
|
self.state.upstream_want_ids.push(want_id);
|
|
}
|
|
|
|
WantWithState {
|
|
want: self.want.updated_timestamp(),
|
|
state: self.state,
|
|
}
|
|
}
|
|
|
|
/// Transition from UpstreamBuilding to UpstreamFailed when an upstream dependency fails
|
|
pub fn upstream_failed(
|
|
self,
|
|
failed_upstreams: Vec<String>,
|
|
timestamp: u64,
|
|
) -> WantWithState<UpstreamFailedState> {
|
|
WantWithState {
|
|
want: self.want.updated_timestamp(),
|
|
state: UpstreamFailedState {
|
|
failed_at: timestamp,
|
|
failed_wants: failed_upstreams,
|
|
},
|
|
}
|
|
}
|
|
|
|
/// Transition from UpstreamBuilding to Canceled when want is explicitly canceled
|
|
pub fn cancel(
|
|
self,
|
|
source: Option<EventSource>,
|
|
timestamp: u64,
|
|
comment: Option<String>,
|
|
) -> WantWithState<CanceledState> {
|
|
WantWithState {
|
|
want: self.want.updated_timestamp(),
|
|
state: CanceledState {
|
|
canceled_at: timestamp,
|
|
canceled_by: source,
|
|
comment,
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
// ==================== HasRelatedIds trait implementation ====================
|
|
|
|
impl HasRelatedIds for Want {
|
|
/// Get the IDs of all entities this want references.
|
|
/// Note: job_run_ids come from inverted indexes in BuildState, not from Want itself.
|
|
fn related_ids(&self) -> RelatedIds {
|
|
let partition_refs = self
|
|
.want()
|
|
.partitions
|
|
.iter()
|
|
.map(|p| p.r#ref.clone())
|
|
.collect();
|
|
|
|
// Collect want IDs from state-specific relationships
|
|
let want_ids = match self {
|
|
Want::UpstreamBuilding(w) => w.state.upstream_want_ids.clone(),
|
|
Want::UpstreamFailed(w) => w.state.failed_wants.clone(),
|
|
_ => vec![],
|
|
};
|
|
|
|
RelatedIds {
|
|
partition_refs,
|
|
partition_uuids: vec![],
|
|
job_run_ids: vec![],
|
|
want_ids,
|
|
}
|
|
}
|
|
}
|
|
|
|
// Helper methods on the Want enum
|
|
impl Want {
|
|
/// Create a new want in the Idle state
|
|
pub fn new(
|
|
want_id: String,
|
|
partitions: Vec<PartitionRef>,
|
|
data_timestamp: u64,
|
|
ttl_seconds: u64,
|
|
sla_seconds: u64,
|
|
source: Option<EventSource>,
|
|
comment: Option<String>,
|
|
) -> Self {
|
|
Want::Idle(WantWithState {
|
|
want: WantInfo {
|
|
want_id,
|
|
partitions,
|
|
data_timestamp,
|
|
ttl_seconds,
|
|
sla_seconds,
|
|
source,
|
|
comment,
|
|
last_updated_at: current_timestamp(),
|
|
},
|
|
state: IdleState {},
|
|
})
|
|
}
|
|
|
|
/// Check if want is schedulable (Idle or UpstreamBuilding with satisfied upstreams)
|
|
pub fn is_schedulable(&self) -> bool {
|
|
match self {
|
|
Want::Idle(_) => true,
|
|
// Note: upstream building shouldn't return true ever, it should go back to idle to
|
|
// be explicit about upstreams completing, which then makes it schedulable
|
|
_ => false,
|
|
}
|
|
}
|
|
|
|
/// Check if want is in a terminal state (Successful, Failed, UpstreamFailed, or Canceled)
|
|
pub fn is_terminal(&self) -> bool {
|
|
matches!(
|
|
self,
|
|
Want::Successful(_) | Want::Failed(_) | Want::UpstreamFailed(_) | Want::Canceled(_)
|
|
)
|
|
}
|
|
|
|
pub fn want(&self) -> &WantInfo {
|
|
match self {
|
|
Want::New(w) => &w.want,
|
|
Want::Idle(w) => &w.want,
|
|
Want::Building(w) => &w.want,
|
|
Want::UpstreamBuilding(w) => &w.want,
|
|
Want::Successful(w) => &w.want,
|
|
Want::Failed(w) => &w.want,
|
|
Want::UpstreamFailed(w) => &w.want,
|
|
Want::Canceled(w) => &w.want,
|
|
}
|
|
}
|
|
|
|
/// Convert to WantDetail for API responses and queries.
|
|
/// Note: job_run_ids and derivative_want_ids are empty here and will be
|
|
/// populated by BuildState from its inverted indexes.
|
|
pub fn to_detail(&self) -> WantDetail {
|
|
WantDetail {
|
|
want_id: self.want().want_id.clone(),
|
|
partitions: self.want().partitions.clone(),
|
|
upstreams: vec![], // Upstreams are tracked via want relationships, not stored here
|
|
data_timestamp: self.want().data_timestamp,
|
|
ttl_seconds: self.want().ttl_seconds,
|
|
sla_seconds: self.want().sla_seconds,
|
|
source: self.want().source.clone(),
|
|
comment: self.want().comment.clone(),
|
|
last_updated_timestamp: self.want().last_updated_at,
|
|
status: match self {
|
|
Want::New(_) => Some(WantStatusCode::WantNew.into()),
|
|
Want::Idle(_) => Some(WantStatusCode::WantIdle.into()),
|
|
Want::Building(_) => Some(WantStatusCode::WantBuilding.into()),
|
|
Want::UpstreamBuilding(_) => Some(WantStatusCode::WantUpstreamBuilding.into()),
|
|
Want::Successful(_) => Some(WantStatusCode::WantSuccessful.into()),
|
|
Want::Failed(_) => Some(WantStatusCode::WantFailed.into()),
|
|
Want::UpstreamFailed(_) => Some(WantStatusCode::WantUpstreamFailed.into()),
|
|
Want::Canceled(_) => Some(WantStatusCode::WantCanceled.into()),
|
|
},
|
|
job_run_ids: vec![], // Populated by BuildState
|
|
derivative_want_ids: vec![], // Populated by BuildState
|
|
}
|
|
}
|
|
}
|