Compare commits
5 commits
a9b68bfa6a
...
8e8ff33ef8
| Author | SHA1 | Date | |
|---|---|---|---|
| 8e8ff33ef8 | |||
| 01d50dde1b | |||
| c8e2b4fdaf | |||
| 4af41533d4 | |||
| 0c766a381b |
6 changed files with 1050 additions and 467 deletions
|
|
@ -164,7 +164,7 @@ impl<S: BELStorage + Debug> BuildEventLog<S> {
|
|||
}
|
||||
|
||||
pub fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> {
|
||||
let events = self.state.handle_event(&event)?;
|
||||
let events = self.state.handle_event(&event);
|
||||
let idx = self.storage.append_event(event)?;
|
||||
// Recursion here might be dangerous, but in theory the event propagation always terminates
|
||||
for event in events {
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -20,7 +20,7 @@ impl From<WantCreateEventV1> for WantDetail {
|
|||
sla_seconds: e.sla_seconds,
|
||||
source: e.source,
|
||||
comment: e.comment,
|
||||
status: Some(Default::default()),
|
||||
status: Some(WantStatusCode::WantIdle.into()),
|
||||
last_updated_timestamp: current_timestamp(),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ mod build_state;
|
|||
mod event_transforms;
|
||||
mod data_deps;
|
||||
mod mock_job_run;
|
||||
mod want_state;
|
||||
|
||||
// Include generated protobuf code
|
||||
include!("databuild.rs");
|
||||
|
|
|
|||
386
databuild/want_state.rs
Normal file
386
databuild/want_state.rs
Normal file
|
|
@ -0,0 +1,386 @@
|
|||
use crate::util::current_timestamp;
|
||||
use crate::{EventSource, PartitionRef, WantCreateEventV1, WantDetail, WantStatusCode};
|
||||
|
||||
/// State: Want has been created and is ready to be scheduled
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct IdleState {}
|
||||
|
||||
/// State: Want is currently being built by one or more jobs
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BuildingState {
|
||||
pub started_at: u64,
|
||||
}
|
||||
|
||||
/// State: Want is waiting for upstream dependencies to be built
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct UpstreamBuildingState {
|
||||
pub upstream_want_ids: Vec<String>,
|
||||
}
|
||||
|
||||
/// State: Want has been successfully completed (all partitions live)
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SuccessfulState {
|
||||
pub completed_at: u64,
|
||||
}
|
||||
|
||||
/// State: Want failed because a partition build failed
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FailedState {
|
||||
pub failed_at: u64,
|
||||
pub failed_partition_refs: Vec<PartitionRef>,
|
||||
pub failure_reason: String,
|
||||
}
|
||||
|
||||
/// State: Want failed because an upstream dependency failed
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct UpstreamFailedState {
|
||||
pub failed_at: u64,
|
||||
pub failed_wants: Vec<String>, // wants that failed
|
||||
}
|
||||
|
||||
/// State: Want has been explicitly canceled
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CanceledState {
|
||||
pub canceled_at: u64,
|
||||
pub canceled_by: Option<EventSource>,
|
||||
pub comment: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WantInfo {
|
||||
pub want_id: String,
|
||||
pub partitions: Vec<PartitionRef>,
|
||||
pub data_timestamp: u64,
|
||||
pub ttl_seconds: u64,
|
||||
pub sla_seconds: u64,
|
||||
pub source: Option<EventSource>,
|
||||
pub comment: Option<String>,
|
||||
pub last_updated_at: u64,
|
||||
}
|
||||
|
||||
impl Default for WantInfo {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
want_id: uuid::Uuid::new_v4().to_string(),
|
||||
partitions: vec![],
|
||||
data_timestamp: 0,
|
||||
ttl_seconds: 0,
|
||||
sla_seconds: 0,
|
||||
source: None,
|
||||
comment: None,
|
||||
last_updated_at: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl WantInfo {
|
||||
pub fn updated_timestamp(self) -> Self {
|
||||
Self {
|
||||
last_updated_at: current_timestamp(),
|
||||
..self
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Generic want struct parameterized by state
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WantWithState<S> {
|
||||
pub want: WantInfo,
|
||||
pub state: S,
|
||||
}
|
||||
|
||||
/// Wrapper enum for storing wants in collections
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Want {
|
||||
Idle(WantWithState<IdleState>),
|
||||
Building(WantWithState<BuildingState>),
|
||||
UpstreamBuilding(WantWithState<UpstreamBuildingState>),
|
||||
Successful(WantWithState<SuccessfulState>),
|
||||
Failed(WantWithState<FailedState>),
|
||||
UpstreamFailed(WantWithState<UpstreamFailedState>),
|
||||
Canceled(WantWithState<CanceledState>),
|
||||
}
|
||||
|
||||
// From impl for creating want from event
|
||||
impl From<WantCreateEventV1> for WantWithState<IdleState> {
|
||||
fn from(event: WantCreateEventV1) -> Self {
|
||||
WantWithState {
|
||||
want: WantInfo {
|
||||
want_id: event.want_id,
|
||||
partitions: event.partitions,
|
||||
data_timestamp: event.data_timestamp,
|
||||
ttl_seconds: event.ttl_seconds,
|
||||
sla_seconds: event.sla_seconds,
|
||||
source: event.source,
|
||||
comment: event.comment,
|
||||
last_updated_at: current_timestamp(),
|
||||
},
|
||||
state: IdleState {},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Type-safe transition methods for IdleState
|
||||
impl WantWithState<IdleState> {
|
||||
/// Transition from Idle to Building when a job starts building this want's partitions
|
||||
pub fn start_building(self, started_at: u64) -> WantWithState<BuildingState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: BuildingState { started_at },
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition from Idle to Canceled when want is explicitly canceled
|
||||
pub fn cancel(
|
||||
self,
|
||||
canceled_by: Option<EventSource>,
|
||||
comment: Option<String>,
|
||||
) -> WantWithState<CanceledState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: CanceledState {
|
||||
canceled_at: current_timestamp(),
|
||||
canceled_by,
|
||||
comment,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Type-safe transition methods for BuildingState
|
||||
impl WantWithState<BuildingState> {
|
||||
/// Transition from Building to Successful when all partitions are built
|
||||
pub fn complete(self, job_run_id: String, timestamp: u64) -> WantWithState<SuccessfulState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: SuccessfulState {
|
||||
completed_at: timestamp,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition from Building to Failed when a partition build fails
|
||||
pub fn fail(
|
||||
self,
|
||||
failed_partition_refs: Vec<PartitionRef>,
|
||||
reason: String,
|
||||
) -> WantWithState<FailedState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: FailedState {
|
||||
failed_at: current_timestamp(),
|
||||
failed_partition_refs,
|
||||
failure_reason: reason,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition from Building to UpstreamBuilding when missing dependencies are detected
|
||||
pub fn detect_missing_deps(
|
||||
self,
|
||||
upstream_want_ids: Vec<String>,
|
||||
) -> WantWithState<UpstreamBuildingState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: UpstreamBuildingState { upstream_want_ids },
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition from Building to Canceled when want is explicitly canceled
|
||||
pub fn cancel(
|
||||
self,
|
||||
source: Option<EventSource>,
|
||||
timestamp: u64,
|
||||
comment: Option<String>,
|
||||
) -> WantWithState<CanceledState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: CanceledState {
|
||||
canceled_at: timestamp,
|
||||
canceled_by: source,
|
||||
comment,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Type-safe transition methods for FailedState
|
||||
impl WantWithState<FailedState> {
|
||||
/// Add more failed partitions to an already-failed want (self-transition)
|
||||
pub fn add_failed_partitions(mut self, partition_refs: Vec<PartitionRef>) -> Self {
|
||||
for partition_ref in partition_refs {
|
||||
if self
|
||||
.state
|
||||
.failed_partition_refs
|
||||
.iter()
|
||||
.any(|p| p.r#ref == partition_ref.r#ref)
|
||||
{
|
||||
panic!(
|
||||
"BUG: Attempted to add failed partition {} that already exists in want {}",
|
||||
partition_ref.r#ref, self.want.want_id
|
||||
);
|
||||
}
|
||||
self.state.failed_partition_refs.push(partition_ref);
|
||||
}
|
||||
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: self.state,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Type-safe transition methods for UpstreamBuildingState
|
||||
impl WantWithState<UpstreamBuildingState> {
|
||||
/// Transition from UpstreamBuilding back to Idle when upstreams are satisfied and no jobs are still building
|
||||
pub fn upstreams_satisfied(self) -> WantWithState<IdleState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: IdleState {},
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition from UpstreamBuilding to Building when upstreams are satisfied but jobs are still actively building
|
||||
pub fn continue_building(
|
||||
self,
|
||||
_job_run_id: String, // Reference to active building job for safety/documentation
|
||||
started_at: u64,
|
||||
) -> WantWithState<BuildingState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: BuildingState { started_at },
|
||||
}
|
||||
}
|
||||
|
||||
/// Add more upstream dependencies (self-transition)
|
||||
pub fn add_upstreams(mut self, want_ids: Vec<String>) -> Self {
|
||||
for want_id in want_ids {
|
||||
if self.state.upstream_want_ids.contains(&want_id) {
|
||||
panic!(
|
||||
"BUG: Attempted to add upstream want {} that already exists in want {}",
|
||||
want_id, self.want.want_id
|
||||
);
|
||||
}
|
||||
self.state.upstream_want_ids.push(want_id);
|
||||
}
|
||||
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: self.state,
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition from UpstreamBuilding to UpstreamFailed when an upstream dependency fails
|
||||
pub fn upstream_failed(
|
||||
self,
|
||||
failed_upstreams: Vec<String>,
|
||||
timestamp: u64,
|
||||
) -> WantWithState<UpstreamFailedState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: UpstreamFailedState {
|
||||
failed_at: timestamp,
|
||||
failed_wants: failed_upstreams,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition from UpstreamBuilding to Canceled when want is explicitly canceled
|
||||
pub fn cancel(
|
||||
self,
|
||||
source: Option<EventSource>,
|
||||
timestamp: u64,
|
||||
comment: Option<String>,
|
||||
) -> WantWithState<CanceledState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: CanceledState {
|
||||
canceled_at: timestamp,
|
||||
canceled_by: source,
|
||||
comment,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper methods on the Want enum
|
||||
impl Want {
|
||||
/// Create a new want in the Idle state
|
||||
pub fn new(
|
||||
want_id: String,
|
||||
partitions: Vec<PartitionRef>,
|
||||
data_timestamp: u64,
|
||||
ttl_seconds: u64,
|
||||
sla_seconds: u64,
|
||||
source: Option<EventSource>,
|
||||
comment: Option<String>,
|
||||
) -> Self {
|
||||
Want::Idle(WantWithState {
|
||||
want: WantInfo {
|
||||
want_id,
|
||||
partitions,
|
||||
data_timestamp,
|
||||
ttl_seconds,
|
||||
sla_seconds,
|
||||
source,
|
||||
comment,
|
||||
last_updated_at: current_timestamp(),
|
||||
},
|
||||
state: IdleState {},
|
||||
})
|
||||
}
|
||||
|
||||
/// Check if want is schedulable (Idle or UpstreamBuilding with satisfied upstreams)
|
||||
pub fn is_schedulable(&self) -> bool {
|
||||
match self {
|
||||
Want::Idle(_) => true,
|
||||
// Note: upstream building shouldn't return true ever, it should go back to idle to
|
||||
// be explicit about upstreams completing, which then makes it schedulable
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if want is in a terminal state (Successful, Failed, UpstreamFailed, or Canceled)
|
||||
pub fn is_terminal(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
Want::Successful(_) | Want::Failed(_) | Want::UpstreamFailed(_) | Want::Canceled(_)
|
||||
)
|
||||
}
|
||||
|
||||
pub fn want(&self) -> &WantInfo {
|
||||
match self {
|
||||
Want::Idle(w) => &w.want,
|
||||
Want::Building(w) => &w.want,
|
||||
Want::UpstreamBuilding(w) => &w.want,
|
||||
Want::Successful(w) => &w.want,
|
||||
Want::Failed(w) => &w.want,
|
||||
Want::UpstreamFailed(w) => &w.want,
|
||||
Want::Canceled(w) => &w.want,
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert to WantDetail for API responses and queries
|
||||
pub fn to_detail(&self) -> WantDetail {
|
||||
WantDetail {
|
||||
want_id: self.want().want_id.clone(),
|
||||
partitions: self.want().partitions.clone(),
|
||||
upstreams: vec![], // Upstreams are tracked via want relationships, not stored here
|
||||
data_timestamp: self.want().data_timestamp,
|
||||
ttl_seconds: self.want().ttl_seconds,
|
||||
sla_seconds: self.want().sla_seconds,
|
||||
source: self.want().source.clone(),
|
||||
comment: self.want().comment.clone(),
|
||||
last_updated_timestamp: self.want().last_updated_at,
|
||||
status: match self {
|
||||
Want::Idle(_) => Some(WantStatusCode::WantIdle.into()),
|
||||
Want::Building(_) => Some(WantStatusCode::WantBuilding.into()),
|
||||
Want::UpstreamBuilding(_) => Some(WantStatusCode::WantUpstreamBuilding.into()),
|
||||
Want::Successful(_) => Some(WantStatusCode::WantSuccessful.into()),
|
||||
Want::Failed(_) => Some(WantStatusCode::WantFailed.into()),
|
||||
Want::UpstreamFailed(_) => Some(WantStatusCode::WantUpstreamFailed.into()),
|
||||
Want::Canceled(_) => Some(WantStatusCode::WantCanceled.into()),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -16,6 +16,10 @@ CPN concurrency can be describe succinctly: if the workloads touch disjoint plac
|
|||
|
||||
# Appendix
|
||||
|
||||
## Reasoning About State Transitions
|
||||
|
||||
Shits hard man. You effectively wander around the potential state space, conjuring potential situations in your head, then trying to figure out the edge cases relative to your current state machines and connecting logic, to figure out if there are any consistency errors possible or gaps in the model. Imagine how hard it would be without state machines! The purpose is to pull bugs related to this forward as much as possible.
|
||||
|
||||
## Partition Collisions?
|
||||
|
||||
Random thought, we also have this lingering "what if unrelated wants collide in the partition space", specifically for a paradigm where job runs produce multiple partitions based on their parameterization. This may also give us the confidence to just cancel the later of the colliding jobs and have it reschedule (how would partitions be diff?). Or, given that we update partition building status on job schedule, we would be confident that we just never get into that situation at the later want grouping stage (pre job scheduling), it would see the conflict partition as building thanks to the earlier job being started. Probably worth constructing a literal situation for this to war game it or implement a literal integration test.
|
||||
|
|
|
|||
Loading…
Reference in a new issue