Compare commits

...

5 commits

6 changed files with 1050 additions and 467 deletions

View file

@ -164,7 +164,7 @@ impl<S: BELStorage + Debug> BuildEventLog<S> {
}
pub fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> {
let events = self.state.handle_event(&event)?;
let events = self.state.handle_event(&event);
let idx = self.storage.append_event(event)?;
// Recursion here might be dangerous, but in theory the event propagation always terminates
for event in events {

File diff suppressed because it is too large Load diff

View file

@ -20,7 +20,7 @@ impl From<WantCreateEventV1> for WantDetail {
sla_seconds: e.sla_seconds,
source: e.source,
comment: e.comment,
status: Some(Default::default()),
status: Some(WantStatusCode::WantIdle.into()),
last_updated_timestamp: current_timestamp(),
}
}

View file

@ -8,6 +8,7 @@ mod build_state;
mod event_transforms;
mod data_deps;
mod mock_job_run;
mod want_state;
// Include generated protobuf code
include!("databuild.rs");

386
databuild/want_state.rs Normal file
View file

@ -0,0 +1,386 @@
use crate::util::current_timestamp;
use crate::{EventSource, PartitionRef, WantCreateEventV1, WantDetail, WantStatusCode};
/// State: Want has been created and is ready to be scheduled
#[derive(Debug, Clone)]
pub struct IdleState {}
/// State: Want is currently being built by one or more jobs
#[derive(Debug, Clone)]
pub struct BuildingState {
pub started_at: u64,
}
/// State: Want is waiting for upstream dependencies to be built
#[derive(Debug, Clone)]
pub struct UpstreamBuildingState {
pub upstream_want_ids: Vec<String>,
}
/// State: Want has been successfully completed (all partitions live)
#[derive(Debug, Clone)]
pub struct SuccessfulState {
pub completed_at: u64,
}
/// State: Want failed because a partition build failed
#[derive(Debug, Clone)]
pub struct FailedState {
pub failed_at: u64,
pub failed_partition_refs: Vec<PartitionRef>,
pub failure_reason: String,
}
/// State: Want failed because an upstream dependency failed
#[derive(Debug, Clone)]
pub struct UpstreamFailedState {
pub failed_at: u64,
pub failed_wants: Vec<String>, // wants that failed
}
/// State: Want has been explicitly canceled
#[derive(Debug, Clone)]
pub struct CanceledState {
pub canceled_at: u64,
pub canceled_by: Option<EventSource>,
pub comment: Option<String>,
}
#[derive(Debug, Clone)]
pub struct WantInfo {
pub want_id: String,
pub partitions: Vec<PartitionRef>,
pub data_timestamp: u64,
pub ttl_seconds: u64,
pub sla_seconds: u64,
pub source: Option<EventSource>,
pub comment: Option<String>,
pub last_updated_at: u64,
}
impl Default for WantInfo {
fn default() -> Self {
Self {
want_id: uuid::Uuid::new_v4().to_string(),
partitions: vec![],
data_timestamp: 0,
ttl_seconds: 0,
sla_seconds: 0,
source: None,
comment: None,
last_updated_at: 0,
}
}
}
impl WantInfo {
pub fn updated_timestamp(self) -> Self {
Self {
last_updated_at: current_timestamp(),
..self
}
}
}
/// Generic want struct parameterized by state
#[derive(Debug, Clone)]
pub struct WantWithState<S> {
pub want: WantInfo,
pub state: S,
}
/// Wrapper enum for storing wants in collections
#[derive(Debug, Clone)]
pub enum Want {
Idle(WantWithState<IdleState>),
Building(WantWithState<BuildingState>),
UpstreamBuilding(WantWithState<UpstreamBuildingState>),
Successful(WantWithState<SuccessfulState>),
Failed(WantWithState<FailedState>),
UpstreamFailed(WantWithState<UpstreamFailedState>),
Canceled(WantWithState<CanceledState>),
}
// From impl for creating want from event
impl From<WantCreateEventV1> for WantWithState<IdleState> {
fn from(event: WantCreateEventV1) -> Self {
WantWithState {
want: WantInfo {
want_id: event.want_id,
partitions: event.partitions,
data_timestamp: event.data_timestamp,
ttl_seconds: event.ttl_seconds,
sla_seconds: event.sla_seconds,
source: event.source,
comment: event.comment,
last_updated_at: current_timestamp(),
},
state: IdleState {},
}
}
}
// Type-safe transition methods for IdleState
impl WantWithState<IdleState> {
/// Transition from Idle to Building when a job starts building this want's partitions
pub fn start_building(self, started_at: u64) -> WantWithState<BuildingState> {
WantWithState {
want: self.want.updated_timestamp(),
state: BuildingState { started_at },
}
}
/// Transition from Idle to Canceled when want is explicitly canceled
pub fn cancel(
self,
canceled_by: Option<EventSource>,
comment: Option<String>,
) -> WantWithState<CanceledState> {
WantWithState {
want: self.want.updated_timestamp(),
state: CanceledState {
canceled_at: current_timestamp(),
canceled_by,
comment,
},
}
}
}
// Type-safe transition methods for BuildingState
impl WantWithState<BuildingState> {
/// Transition from Building to Successful when all partitions are built
pub fn complete(self, job_run_id: String, timestamp: u64) -> WantWithState<SuccessfulState> {
WantWithState {
want: self.want.updated_timestamp(),
state: SuccessfulState {
completed_at: timestamp,
},
}
}
/// Transition from Building to Failed when a partition build fails
pub fn fail(
self,
failed_partition_refs: Vec<PartitionRef>,
reason: String,
) -> WantWithState<FailedState> {
WantWithState {
want: self.want.updated_timestamp(),
state: FailedState {
failed_at: current_timestamp(),
failed_partition_refs,
failure_reason: reason,
},
}
}
/// Transition from Building to UpstreamBuilding when missing dependencies are detected
pub fn detect_missing_deps(
self,
upstream_want_ids: Vec<String>,
) -> WantWithState<UpstreamBuildingState> {
WantWithState {
want: self.want.updated_timestamp(),
state: UpstreamBuildingState { upstream_want_ids },
}
}
/// Transition from Building to Canceled when want is explicitly canceled
pub fn cancel(
self,
source: Option<EventSource>,
timestamp: u64,
comment: Option<String>,
) -> WantWithState<CanceledState> {
WantWithState {
want: self.want.updated_timestamp(),
state: CanceledState {
canceled_at: timestamp,
canceled_by: source,
comment,
},
}
}
}
// Type-safe transition methods for FailedState
impl WantWithState<FailedState> {
/// Add more failed partitions to an already-failed want (self-transition)
pub fn add_failed_partitions(mut self, partition_refs: Vec<PartitionRef>) -> Self {
for partition_ref in partition_refs {
if self
.state
.failed_partition_refs
.iter()
.any(|p| p.r#ref == partition_ref.r#ref)
{
panic!(
"BUG: Attempted to add failed partition {} that already exists in want {}",
partition_ref.r#ref, self.want.want_id
);
}
self.state.failed_partition_refs.push(partition_ref);
}
WantWithState {
want: self.want.updated_timestamp(),
state: self.state,
}
}
}
// Type-safe transition methods for UpstreamBuildingState
impl WantWithState<UpstreamBuildingState> {
/// Transition from UpstreamBuilding back to Idle when upstreams are satisfied and no jobs are still building
pub fn upstreams_satisfied(self) -> WantWithState<IdleState> {
WantWithState {
want: self.want.updated_timestamp(),
state: IdleState {},
}
}
/// Transition from UpstreamBuilding to Building when upstreams are satisfied but jobs are still actively building
pub fn continue_building(
self,
_job_run_id: String, // Reference to active building job for safety/documentation
started_at: u64,
) -> WantWithState<BuildingState> {
WantWithState {
want: self.want.updated_timestamp(),
state: BuildingState { started_at },
}
}
/// Add more upstream dependencies (self-transition)
pub fn add_upstreams(mut self, want_ids: Vec<String>) -> Self {
for want_id in want_ids {
if self.state.upstream_want_ids.contains(&want_id) {
panic!(
"BUG: Attempted to add upstream want {} that already exists in want {}",
want_id, self.want.want_id
);
}
self.state.upstream_want_ids.push(want_id);
}
WantWithState {
want: self.want.updated_timestamp(),
state: self.state,
}
}
/// Transition from UpstreamBuilding to UpstreamFailed when an upstream dependency fails
pub fn upstream_failed(
self,
failed_upstreams: Vec<String>,
timestamp: u64,
) -> WantWithState<UpstreamFailedState> {
WantWithState {
want: self.want.updated_timestamp(),
state: UpstreamFailedState {
failed_at: timestamp,
failed_wants: failed_upstreams,
},
}
}
/// Transition from UpstreamBuilding to Canceled when want is explicitly canceled
pub fn cancel(
self,
source: Option<EventSource>,
timestamp: u64,
comment: Option<String>,
) -> WantWithState<CanceledState> {
WantWithState {
want: self.want.updated_timestamp(),
state: CanceledState {
canceled_at: timestamp,
canceled_by: source,
comment,
},
}
}
}
// Helper methods on the Want enum
impl Want {
/// Create a new want in the Idle state
pub fn new(
want_id: String,
partitions: Vec<PartitionRef>,
data_timestamp: u64,
ttl_seconds: u64,
sla_seconds: u64,
source: Option<EventSource>,
comment: Option<String>,
) -> Self {
Want::Idle(WantWithState {
want: WantInfo {
want_id,
partitions,
data_timestamp,
ttl_seconds,
sla_seconds,
source,
comment,
last_updated_at: current_timestamp(),
},
state: IdleState {},
})
}
/// Check if want is schedulable (Idle or UpstreamBuilding with satisfied upstreams)
pub fn is_schedulable(&self) -> bool {
match self {
Want::Idle(_) => true,
// Note: upstream building shouldn't return true ever, it should go back to idle to
// be explicit about upstreams completing, which then makes it schedulable
_ => false,
}
}
/// Check if want is in a terminal state (Successful, Failed, UpstreamFailed, or Canceled)
pub fn is_terminal(&self) -> bool {
matches!(
self,
Want::Successful(_) | Want::Failed(_) | Want::UpstreamFailed(_) | Want::Canceled(_)
)
}
pub fn want(&self) -> &WantInfo {
match self {
Want::Idle(w) => &w.want,
Want::Building(w) => &w.want,
Want::UpstreamBuilding(w) => &w.want,
Want::Successful(w) => &w.want,
Want::Failed(w) => &w.want,
Want::UpstreamFailed(w) => &w.want,
Want::Canceled(w) => &w.want,
}
}
/// Convert to WantDetail for API responses and queries
pub fn to_detail(&self) -> WantDetail {
WantDetail {
want_id: self.want().want_id.clone(),
partitions: self.want().partitions.clone(),
upstreams: vec![], // Upstreams are tracked via want relationships, not stored here
data_timestamp: self.want().data_timestamp,
ttl_seconds: self.want().ttl_seconds,
sla_seconds: self.want().sla_seconds,
source: self.want().source.clone(),
comment: self.want().comment.clone(),
last_updated_timestamp: self.want().last_updated_at,
status: match self {
Want::Idle(_) => Some(WantStatusCode::WantIdle.into()),
Want::Building(_) => Some(WantStatusCode::WantBuilding.into()),
Want::UpstreamBuilding(_) => Some(WantStatusCode::WantUpstreamBuilding.into()),
Want::Successful(_) => Some(WantStatusCode::WantSuccessful.into()),
Want::Failed(_) => Some(WantStatusCode::WantFailed.into()),
Want::UpstreamFailed(_) => Some(WantStatusCode::WantUpstreamFailed.into()),
Want::Canceled(_) => Some(WantStatusCode::WantCanceled.into()),
},
}
}
}

View file

@ -16,6 +16,10 @@ CPN concurrency can be describe succinctly: if the workloads touch disjoint plac
# Appendix
## Reasoning About State Transitions
Shits hard man. You effectively wander around the potential state space, conjuring potential situations in your head, then trying to figure out the edge cases relative to your current state machines and connecting logic, to figure out if there are any consistency errors possible or gaps in the model. Imagine how hard it would be without state machines! The purpose is to pull bugs related to this forward as much as possible.
## Partition Collisions?
Random thought, we also have this lingering "what if unrelated wants collide in the partition space", specifically for a paradigm where job runs produce multiple partitions based on their parameterization. This may also give us the confidence to just cancel the later of the colliding jobs and have it reschedule (how would partitions be diff?). Or, given that we update partition building status on job schedule, we would be confident that we just never get into that situation at the later want grouping stage (pre job scheduling), it would see the conflict partition as building thanks to the earlier job being started. Probably worth constructing a literal situation for this to war game it or implement a literal integration test.