Compare commits

...

5 commits

10 changed files with 1023 additions and 325 deletions

1
.gitignore vendored
View file

@ -19,3 +19,4 @@ logs/databuild/
# DSL generated code
**/generated/
/databuild/databuild.rs

View file

@ -16,6 +16,22 @@ DataBuild is a bazel-based data build system. Key files:
Please reference these for any related work, as they indicate key technical bias/direction of the project.
## Architecture Pattern
DataBuild implements **Orchestrated State Machines** - a pattern where the application core is composed of:
- **Type-safe state machines** for domain entities (Want, JobRun, Partition)
- **Dependency graphs** expressing relationships between entities
- **Orchestration logic** that coordinates state transitions based on dependencies
This architecture provides compile-time correctness, observability through event sourcing, and clean separation between entity behavior and coordination logic. See [`docs/orchestrated-state-machines.md`](docs/orchestrated-state-machines.md) for the full theory and implementation patterns.
**Key implications for development:**
- Model entities as explicit state machines with type-parameterized states
- Use consuming methods for state transitions (enforces immutability)
- Emit events to BEL for all state changes (observability)
- Centralize coordination logic in the Orchestrator (separation of concerns)
- If it has a `status` field (or similar), it should have a state machine with type safe transitions that governs it
## Tenets
- Declarative over imperative wherever possible/reasonable.
@ -23,6 +39,7 @@ Please reference these for any related work, as they indicate key technical bias
- Do not add "unknown" results when parses or matches fail - these should always throw.
- Compile time correctness is a super-power, and investment in it speeds up flywheel for development and user value.
- **CLI/Service Interchangeability**: Both the CLI and service must produce identical artifacts (BEL events, logs, metrics, outputs) in the same locations. Users should be able to build with one interface and query/inspect results from the other seamlessly. This principle applies to all DataBuild operations, not just builds.
- The BEL represents real things that happen: job run processes that are started or fail, requests from the user, dep misses, etc.
## Build & Test
```bash

View file

@ -1,5 +1,6 @@
use crate::data_build_event::Event;
use crate::data_deps::{WantTimestamps, missing_deps_to_want_events};
use crate::partition_state::{Partition, PartitionWithState, MissingState, BuildingState, LiveState, FailedState, TaintedState};
use crate::util::{DatabuildError, current_timestamp};
use crate::{
JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1,
@ -33,11 +34,13 @@ This means no boxing or "query phase", and means we can have all state updates h
and updates, which is exceptionally fast.
*/
/// Tracks all application state, defines valid state transitions, and manages cross-state machine
/// state transitions (e.g. job run success resulting in partition going from Building to Live)
#[derive(Debug, Clone)]
pub struct BuildState {
wants: BTreeMap<String, WantDetail>,
taints: BTreeMap<String, TaintDetail>,
partitions: BTreeMap<String, PartitionDetail>,
partitions: BTreeMap<String, Partition>, // Type-safe partition storage
job_runs: BTreeMap<String, JobRunDetail>,
}
@ -57,6 +60,23 @@ impl BuildState {
self.job_runs.len()
}
/// Add want_id to partition's want_ids list
fn add_want_to_partition(&mut self, pref: &PartitionRef, want_id: &str) {
// Create partition if it doesn't exist
if !self.partitions.contains_key(&pref.r#ref) {
let partition = Partition::new_missing(pref.clone());
self.partitions.insert(pref.r#ref.clone(), partition);
}
// Add want_id
if let Some(partition) = self.partitions.get_mut(&pref.r#ref) {
let want_ids = partition.want_ids_mut();
if !want_ids.contains(&want_id.to_string()) {
want_ids.push(want_id.to_string());
}
}
}
/// Handles reacting to events, updating state, and erroring if its an invalid state transition
/// Event handlers can return vecs of events that will then be appended to the BEL
pub fn handle_event(&mut self, event: &Event) -> Result<Vec<Event>, DatabuildError> {
@ -119,6 +139,34 @@ impl BuildState {
}
}
// Transition partitions to Building state
for pref in &job_run.building_partitions {
if let Some(partition) = self.partitions.remove(&pref.r#ref) {
// Partition exists - transition based on current state
let transitioned = match partition {
// Valid: Missing -> Building
Partition::Missing(missing) => {
Partition::Building(missing.start_building(event.job_run_id.clone()))
}
// Invalid state: partition should not already be Building, Live, Failed, or Tainted
_ => {
return Err(format!(
"Invalid state: partition {} cannot start building from state {:?}",
pref.r#ref, partition
).into())
}
};
self.partitions.insert(pref.r#ref.clone(), transitioned);
} else {
// Partition doesn't exist yet - create in Missing then transition to Building
let missing = Partition::new_missing(pref.clone());
if let Partition::Missing(m) = missing {
let building = m.start_building(event.job_run_id.clone());
self.partitions.insert(pref.r#ref.clone(), Partition::Building(building));
}
}
}
self.job_runs
.insert(event.job_run_id.clone(), job_run.clone());
println!("Inserted job run: {:?}", job_run);
@ -140,51 +188,6 @@ impl BuildState {
}
}
fn update_partition_status(
&mut self,
pref: &PartitionRef,
status: PartitionStatusCode,
job_run_id: Option<&str>,
) -> Result<(), DatabuildError> {
if let Some(partition) = self.partitions.get_mut(&pref.r#ref) {
partition.status = Some(status.clone().into());
partition.last_updated_timestamp = Some(current_timestamp());
if let Some(job_run_id) = job_run_id.map(str::to_string) {
if !partition.job_run_ids.contains(&job_run_id) {
partition.job_run_ids.push(job_run_id);
}
}
} else {
// Partition doesn't exist yet, needs to be inserted
let want_ids = if let Some(jrid) = job_run_id {
let job_run = self
.get_job_run(jrid)
.expect("Job run must exist for partition");
job_run
.servicing_wants
.iter()
.map(|wap| wap.want_id.clone())
.collect()
} else {
vec![]
};
let partition = PartitionDetail {
r#ref: Some(pref.clone()),
status: Some(status.into()),
last_updated_timestamp: Some(current_timestamp()),
job_run_ids: job_run_id
.map(|jrid| vec![jrid.to_string()])
.unwrap_or(vec![]),
want_ids,
..PartitionDetail::default()
};
self.partitions.insert(pref.r#ref.clone(), partition);
};
self.update_wants_for_partition(&pref)
}
/// Walks the state from this want ID to update its status.
fn update_want_status(&mut self, want_id: &str) -> Result<(), DatabuildError> {
if let Some(want) = self.wants.get(want_id) {
@ -223,13 +226,35 @@ impl BuildState {
// Clone building_partitions before we use it multiple times
let newly_live_partitions: Vec<PartitionRef> = job_run.building_partitions.clone();
// Update partitions being build by this job
// Update partitions being built by this job (strict type-safe transitions)
for pref in &newly_live_partitions {
self.update_partition_status(
pref,
PartitionStatusCode::PartitionLive,
Some(&event.job_run_id),
)?;
let partition = self.partitions.remove(&pref.r#ref).ok_or_else(|| {
format!(
"Partition {} must exist and be in Building state before completion",
pref.r#ref
)
})?;
// ONLY valid transition: Building -> Live
let transitioned = match partition {
Partition::Building(building) => {
Partition::Live(building.complete(
event.job_run_id.clone(),
current_timestamp()
))
}
// All other states are invalid
_ => {
return Err(format!(
"Invalid state: partition {} must be Building to transition to Live, found {:?}",
pref.r#ref, partition
).into())
}
};
self.partitions.insert(pref.r#ref.clone(), transitioned);
// Update wants that reference this partition
self.update_wants_for_partition(pref)?;
}
// Check all wants in WantUpstreamBuilding status to see if their dependencies are now satisfied
@ -253,12 +278,11 @@ impl BuildState {
for want_id in wants_to_update {
if let Some(want) = self.wants.get_mut(&want_id) {
// Check if all upstreams are now satisfied
// Check if all upstreams are now satisfied (using type-safe check)
let all_upstreams_satisfied = want.upstreams.iter().all(|upstream| {
self.partitions
.get(&upstream.r#ref)
.and_then(|p| p.status.as_ref())
.map(|s| s.code == PartitionStatusCode::PartitionLive as i32)
.map(|p| p.is_live())
.unwrap_or(false)
});
@ -274,11 +298,11 @@ impl BuildState {
}
fn update_wants_for_partition(&mut self, pref: &PartitionRef) -> Result<(), DatabuildError> {
// todo!("Go to every want that references this partition and update its status")
// Use type-safe partitions storage
let want_ids = self
.partitions
.get(&pref.r#ref)
.map(|p| p.want_ids.clone())
.map(|p| p.want_ids().clone())
.ok_or(format!("Partition for ref {} not found", pref.r#ref))?;
for want_id in want_ids.iter() {
self.update_want_status(want_id)?;
@ -296,12 +320,35 @@ impl BuildState {
// Clone building_partitions before we use it multiple times
let failed_partitions: Vec<PartitionRef> = job_run.building_partitions.clone();
// Transition partitions using strict type-safe methods
for pref in &failed_partitions {
self.update_partition_status(
pref,
PartitionStatusCode::PartitionFailed,
Some(&event.job_run_id),
)?;
let partition = self.partitions.remove(&pref.r#ref).ok_or_else(|| {
format!(
"Partition {} must exist and be in Building state before failure",
pref.r#ref
)
})?;
// ONLY valid transition: Building -> Failed
let transitioned = match partition {
Partition::Building(building) => {
Partition::Failed(building.fail(
event.job_run_id.clone(),
current_timestamp()
))
}
// All other states are invalid
_ => {
return Err(format!(
"Invalid state: partition {} must be Building to transition to Failed, found {:?}",
pref.r#ref, partition
).into())
}
};
self.partitions.insert(pref.r#ref.clone(), transitioned);
// Update wants that reference this partition
self.update_wants_for_partition(pref)?;
}
// Check all wants in WantUpstreamBuilding status to see if they were waiting for the failed partitions
@ -388,6 +435,31 @@ impl BuildState {
}
}
// Transition partitions back to Missing since this job can't build them yet
for pref in &job_run_detail.building_partitions {
let partition = self.partitions.remove(&pref.r#ref).ok_or_else(|| {
format!(
"Partition {} must exist and be in Building state during dep_miss",
pref.r#ref
)
})?;
// Only valid transition: Building -> Missing
let transitioned = match partition {
Partition::Building(building) => {
Partition::Missing(building.reset_to_missing())
}
// All other states are invalid
_ => {
return Err(format!(
"Invalid state: partition {} must be Building during dep_miss, found {:?}",
pref.r#ref, partition
).into())
}
};
self.partitions.insert(pref.r#ref.clone(), transitioned);
}
// Create wants from dep misses
let want_events = missing_deps_to_want_events(
event.missing_deps.clone(),
@ -416,7 +488,17 @@ impl BuildState {
Self { wants, ..self }
}
fn with_partitions(self, partitions: BTreeMap<String, PartitionDetail>) -> Self {
#[cfg(test)]
fn with_partitions(self, old_partitions: BTreeMap<String, PartitionDetail>) -> Self {
// Convert PartitionDetail to Partition (for backfill scenarios)
let partitions: BTreeMap<String, Partition> = old_partitions
.into_iter()
.map(|(key, detail)| {
// For now, just create in Missing state - real migration would be more sophisticated
let partition = Partition::new_missing(detail.r#ref.clone().unwrap_or_default());
(key, partition)
})
.collect();
Self { partitions, ..self }
}
@ -427,7 +509,7 @@ impl BuildState {
self.taints.get(taint_id).cloned()
}
pub fn get_partition(&self, partition_id: &str) -> Option<PartitionDetail> {
self.partitions.get(partition_id).cloned()
self.partitions.get(partition_id).map(|p| p.to_detail())
}
pub fn get_job_run(&self, job_run_id: &str) -> Option<JobRunDetail> {
self.job_runs.get(job_run_id).cloned()
@ -458,8 +540,14 @@ impl BuildState {
pub fn list_partitions(&self, request: &ListPartitionsRequest) -> ListPartitionsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
// Convert Partition to PartitionDetail for API
let partition_details: BTreeMap<String, PartitionDetail> = self
.partitions
.iter()
.map(|(k, v)| (k.clone(), v.to_detail()))
.collect();
ListPartitionsResponse {
data: list_state_items(&self.partitions, page, page_size),
data: list_state_items(&partition_details, page, page_size),
match_count: self.wants.len() as u64,
page,
page_size,
@ -481,27 +569,27 @@ impl BuildState {
Wants are schedulable when their partition is live and not tainted
*/
pub fn want_schedulability(&self, want: &WantDetail) -> WantSchedulability {
let live_details: Vec<&PartitionDetail> = want
.upstreams
.iter()
.map(|pref| self.partitions.get(&pref.r#ref))
.flatten()
.collect();
let live: Vec<PartitionRef> = live_details
.iter()
.map(|pd| pd.r#ref.clone().expect("pref must have ref"))
.collect();
let missing: Vec<PartitionRef> = want
.upstreams
.iter()
.filter(|pref| self.partitions.get(&pref.r#ref).is_none())
.cloned()
.collect();
let tainted: Vec<PartitionRef> = live_details
.iter()
.filter(|p| p.status == Some(PartitionStatusCode::PartitionTainted.into()))
.map(|pref| pref.r#ref.clone().unwrap())
.collect();
// Use type-safe partition checks from partitions
let mut live: Vec<PartitionRef> = Vec::new();
let mut tainted: Vec<PartitionRef> = Vec::new();
let mut missing: Vec<PartitionRef> = Vec::new();
for upstream_ref in &want.upstreams {
match self.partitions.get(&upstream_ref.r#ref) {
Some(partition) => {
if partition.is_live() {
live.push(upstream_ref.clone());
} else if matches!(partition, Partition::Tainted(_)) {
tainted.push(upstream_ref.clone());
}
// Other states (Missing, Building, Failed) don't add to any list
}
None => {
missing.push(upstream_ref.clone());
}
}
}
WantSchedulability {
want: want.clone(),
status: WantUpstreamStatus {

View file

@ -1,4 +1,4 @@
use crate::job_run::{NotStartedJobRun, SubProcessBackend};
use crate::job_run::{JobRun, SubProcessBackend};
use crate::{JobConfig, PartitionRef, WantDetail};
use regex::Regex;
use crate::util::DatabuildError;
@ -12,11 +12,11 @@ pub struct JobConfiguration {
impl JobConfiguration {
/** Launch job to build the partitions specified by the provided wants. */
pub fn spawn(&self, wants: Vec<WantDetail>) -> Result<NotStartedJobRun<SubProcessBackend>, std::io::Error> {
pub fn spawn(&self, wants: Vec<WantDetail>) -> Result<JobRun<SubProcessBackend>, std::io::Error> {
let wanted_refs: Vec<PartitionRef> =
wants.iter().flat_map(|want| want.partitions.clone()).collect();
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
Ok(NotStartedJobRun::spawn(self.entry_point.clone(), args))
Ok(JobRun::spawn(self.entry_point.clone(), args))
}
pub fn matches(&self, refs: &PartitionRef) -> bool {

View file

@ -25,11 +25,6 @@ pub trait JobRunBackend: Sized {
/// Create a new not-started job run
fn create(entry_point: String, args: Vec<String>) -> Self::NotStartedState;
/// Convenience method to spawn a new job run (calls create and wraps in JobRun)
fn spawn(entry_point: String, args: Vec<String>) -> NotStartedJobRun<Self> {
NotStartedJobRun::spawn(entry_point, args)
}
/// Transition from NotStarted to Running
fn start(
not_started: Self::NotStartedState,
@ -59,103 +54,35 @@ pub enum PollResult<C, F, D> {
DepMiss(D),
}
/// Generic JobRun that works with any backend, parameterized by state
pub struct JobRun<B: JobRunBackend, S> {
// ===== TYPE-SAFE STATE MACHINE PATTERN =====
// Uses parameterized JobRunWithState wrapped in JobRun enum for storage
/// New JobRun with embedded state enum
/// Type-safe job run struct, parameterized by backend and state
/// This struct can only perform operations valid for its current state type
pub struct JobRunWithState<B: JobRunBackend, S> {
pub job_run_id: Uuid,
pub state: S,
pub _backend: PhantomData<B>,
}
/// Type aliases for specific states
pub type NotStartedJobRun<B> = JobRun<B, <B as JobRunBackend>::NotStartedState>;
pub type RunningJobRun<B> = JobRun<B, <B as JobRunBackend>::RunningState>;
pub type CompletedJobRun<B> = JobRun<B, <B as JobRunBackend>::CompletedState>;
pub type FailedJobRun<B> = JobRun<B, <B as JobRunBackend>::FailedState>;
pub type CanceledJobRun<B> = JobRun<B, <B as JobRunBackend>::CanceledState>;
pub type DepMissJobRun<B> = JobRun<B, <B as JobRunBackend>::DepMissState>;
// Methods available on all JobRun states
impl<B: JobRunBackend, S> JobRun<B, S> {
pub fn id(&self) -> Uuid {
self.job_run_id
}
/// Wrapper enum for storing job runs in a single collection
/// This allows us to store jobs in different states together while maintaining type safety
pub enum JobRun<B: JobRunBackend> {
NotStarted(JobRunWithState<B, B::NotStartedState>),
Running(JobRunWithState<B, B::RunningState>),
Completed(JobRunWithState<B, B::CompletedState>),
Failed(JobRunWithState<B, B::FailedState>),
Canceled(JobRunWithState<B, B::CanceledState>),
DepMiss(JobRunWithState<B, B::DepMissState>),
}
// Methods available only on NotStarted state
impl<B: JobRunBackend> NotStartedJobRun<B> {
pub fn spawn(entry_point: String, args: Vec<String>) -> Self {
JobRun {
job_run_id: Uuid::new_v4(),
state: B::create(entry_point, args),
_backend: PhantomData,
}
}
pub fn run(self) -> Result<RunningJobRun<B>, DatabuildError> {
self.run_with_env(None)
}
pub fn run_with_env(
self,
env: Option<HashMap<String, String>>,
) -> Result<RunningJobRun<B>, DatabuildError> {
let running_state = B::start(self.state, env)?;
Ok(JobRun {
job_run_id: self.job_run_id,
state: running_state,
_backend: PhantomData,
})
}
}
// Methods available only on Running state
impl<B: JobRunBackend> RunningJobRun<B> {
pub fn visit(&mut self) -> Result<JobRunVisitResult<B>, DatabuildError> {
match B::poll(&mut self.state)? {
PollResult::StillRunning => Ok(JobRunVisitResult::StillRunning),
PollResult::Completed(completed_state) => {
let job_run_id = self.job_run_id;
Ok(JobRunVisitResult::Completed(JobRun {
job_run_id,
state: completed_state,
_backend: PhantomData,
}))
}
PollResult::Failed(failed_state) => {
let job_run_id = self.job_run_id;
Ok(JobRunVisitResult::Failed(JobRun {
job_run_id,
state: failed_state,
_backend: PhantomData,
}))
}
PollResult::DepMiss(state) => {
let job_run_id = self.job_run_id;
Ok(JobRunVisitResult::DepMiss(JobRun {
job_run_id,
state,
_backend: PhantomData,
}))
}
}
}
pub fn cancel(self, source: EventSource) -> Result<CanceledJobRun<B>, DatabuildError> {
let canceled_state = B::cancel_job(self.state, source)?;
Ok(JobRun {
job_run_id: self.job_run_id,
state: canceled_state,
_backend: PhantomData,
})
}
}
/// Result of visiting a running job
pub enum JobRunVisitResult<B: JobRunBackend> {
StillRunning,
Completed(CompletedJobRun<B>),
Failed(FailedJobRun<B>),
DepMiss(DepMissJobRun<B>),
/// Result of visiting a running job - returns the typed states
pub enum VisitResult<B: JobRunBackend> {
StillRunning(JobRunWithState<B, B::RunningState>),
Completed(JobRunWithState<B, B::CompletedState>),
Failed(JobRunWithState<B, B::FailedState>),
DepMiss(JobRunWithState<B, B::DepMissState>),
}
pub enum JobRunConfig {
@ -368,40 +295,164 @@ pub struct JobRunPollResult {
pub status: JobRunStatus,
}
// ===== Type-Safe State Transition Implementation =====
// Factory and helper methods on the JobRun enum
impl<B: JobRunBackend> JobRun<B> {
/// Create a new job run in the NotStarted state
pub fn spawn(entry_point: String, args: Vec<String>) -> Self {
JobRun::NotStarted(JobRunWithState {
job_run_id: Uuid::new_v4(),
state: B::create(entry_point, args),
_backend: PhantomData,
})
}
/// Get the job run ID regardless of state
pub fn job_run_id(&self) -> &Uuid {
match self {
JobRun::NotStarted(j) => &j.job_run_id,
JobRun::Running(j) => &j.job_run_id,
JobRun::Completed(j) => &j.job_run_id,
JobRun::Failed(j) => &j.job_run_id,
JobRun::Canceled(j) => &j.job_run_id,
JobRun::DepMiss(j) => &j.job_run_id,
}
}
/// Check if the job is in a terminal state
pub fn is_terminal(&self) -> bool {
matches!(
self,
JobRun::Completed(_) | JobRun::Failed(_) | JobRun::Canceled(_) | JobRun::DepMiss(_)
)
}
}
// Type-safe transition: NotStarted -> Running
// This method can ONLY be called on NotStarted jobs - compile error otherwise!
impl<B: JobRunBackend> JobRunWithState<B, B::NotStartedState> {
pub fn run(
self,
env: Option<HashMap<String, String>>,
) -> Result<JobRunWithState<B, B::RunningState>, DatabuildError> {
let running = B::start(self.state, env)?;
Ok(JobRunWithState {
job_run_id: self.job_run_id,
state: running,
_backend: PhantomData,
})
}
}
// Type-safe transition: Running -> (Running | Completed | Failed | DepMiss)
// This method can ONLY be called on Running jobs - compile error otherwise!
impl<B: JobRunBackend> JobRunWithState<B, B::RunningState> {
pub fn visit(mut self) -> Result<VisitResult<B>, DatabuildError> {
match B::poll(&mut self.state)? {
PollResult::StillRunning => Ok(VisitResult::StillRunning(self)),
PollResult::Completed(completed) => Ok(VisitResult::Completed(JobRunWithState {
job_run_id: self.job_run_id,
state: completed,
_backend: PhantomData,
})),
PollResult::Failed(failed) => Ok(VisitResult::Failed(JobRunWithState {
job_run_id: self.job_run_id,
state: failed,
_backend: PhantomData,
})),
PollResult::DepMiss(dep_miss) => Ok(VisitResult::DepMiss(JobRunWithState {
job_run_id: self.job_run_id,
state: dep_miss,
_backend: PhantomData,
})),
}
}
pub fn cancel(
self,
source: EventSource,
) -> Result<JobRunWithState<B, B::CanceledState>, DatabuildError> {
let canceled = B::cancel_job(self.state, source)?;
Ok(JobRunWithState {
job_run_id: self.job_run_id,
state: canceled,
_backend: PhantomData,
})
}
}
// Helper trait for converting states to events
pub trait ToEvent {
fn to_event(&self, job_run_id: &Uuid) -> Event;
}
impl ToEvent for SubProcessCompleted {
fn to_event(&self, job_run_id: &Uuid) -> Event {
Event::JobRunSuccessV1(JobRunSuccessEventV1 {
job_run_id: job_run_id.to_string(),
})
}
}
impl ToEvent for SubProcessFailed {
fn to_event(&self, job_run_id: &Uuid) -> Event {
Event::JobRunFailureV1(JobRunFailureEventV1 {
job_run_id: job_run_id.to_string(),
reason: self.reason.clone(),
})
}
}
impl ToEvent for SubProcessDepMiss {
fn to_event(&self, job_run_id: &Uuid) -> Event {
Event::JobRunMissingDepsV1(JobRunMissingDepsEventV1 {
job_run_id: job_run_id.to_string(),
missing_deps: self.missing_deps.clone(),
read_deps: self.read_deps.clone(),
})
}
}
mod tests {
use crate::data_build_event::Event;
use crate::data_deps::DATABUILD_MISSING_DEPS_JSON;
use crate::job_run::{JobRunBackend, JobRunVisitResult, SubProcessBackend};
use crate::job_run::{JobRun, JobRunBackend, VisitResult, SubProcessBackend};
use crate::mock_job_run::MockJobRun;
use crate::{JobRunMissingDeps, ManuallyTriggeredEvent, MissingDeps};
use crate::{JobRunMissingDeps, MissingDeps};
/// Happy path - run that succeeds should emit a JobRunSuccessEventV1
#[test]
fn test_job_run_success_returns_job_run_success_event() {
// Spawn a job run that will succeed (exit code 0)
let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]);
let job_run = JobRun::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
// Start the job - this consumes the NotStarted and returns Running
let mut running_job = job_run.run().unwrap();
let running_job = match job_run {
JobRun::NotStarted(not_started) => not_started.run(None).unwrap(),
_ => panic!("Expected NotStarted job"),
};
// Poll until we get completion
let mut current_job = running_job;
loop {
match running_job.visit().unwrap() {
JobRunVisitResult::Completed(completed) => {
match current_job.visit().unwrap() {
VisitResult::Completed(completed) => {
// Generate the event from the completed state
let event = completed.state.to_event(&completed.id());
let event = completed.state.to_event(&completed.job_run_id);
assert!(matches!(event, Event::JobRunSuccessV1(_)));
break;
}
JobRunVisitResult::Failed(failed) => {
VisitResult::Failed(failed) => {
panic!("Job failed unexpectedly: {}", failed.state.reason);
}
JobRunVisitResult::StillRunning => {
VisitResult::StillRunning(still_running) => {
// Sleep briefly and poll again
std::thread::sleep(std::time::Duration::from_millis(10));
current_job = still_running;
continue;
}
JobRunVisitResult::DepMiss(dep_miss) => {
VisitResult::DepMiss(_dep_miss) => {
panic!("Job dep miss unexpectedly");
}
}
@ -412,30 +463,35 @@ mod tests {
#[test]
fn test_job_run_failure_returns_job_run_failure_event() {
// Spawn a job run
let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]);
let job_run = JobRun::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
// Start the job with an exit code that indicates failure (non-zero)
let env = MockJobRun::new().exit_code(1).to_env();
let mut running_job = job_run.run_with_env(Some(env)).unwrap();
let running_job = match job_run {
JobRun::NotStarted(not_started) => not_started.run(Some(env)).unwrap(),
_ => panic!("Expected NotStarted job"),
};
// Poll until we get completion
let mut current_job = running_job;
loop {
match running_job.visit().unwrap() {
JobRunVisitResult::Completed(_) => {
match current_job.visit().unwrap() {
VisitResult::Completed(_) => {
panic!("Job succeeded unexpectedly");
}
JobRunVisitResult::Failed(failed) => {
VisitResult::Failed(failed) => {
// Generate the event from the failed state
let event = failed.state.to_event(&failed.id());
let event = failed.state.to_event(&failed.job_run_id);
assert!(matches!(event, Event::JobRunFailureV1(_)));
break;
}
JobRunVisitResult::StillRunning => {
VisitResult::StillRunning(still_running) => {
// Sleep briefly and poll again
std::thread::sleep(std::time::Duration::from_millis(10));
current_job = still_running;
continue;
}
JobRunVisitResult::DepMiss(dep_miss) => {
VisitResult::DepMiss(_dep_miss) => {
panic!("Job dep miss unexpectedly");
}
}
@ -455,14 +511,17 @@ mod tests {
let temp_file = format!("/tmp/databuild_test_cancel_{}", Uuid::new_v4());
// Spawn a job run that will sleep for 1 second and write a file
let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]);
let job_run = JobRun::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
let env = MockJobRun::new()
.sleep_ms(1000)
.output_file(&temp_file, &"completed".to_string())
.exit_code(0)
.to_env();
let running_job = job_run.run_with_env(Some(env)).unwrap();
let running_job = match job_run {
JobRun::NotStarted(not_started) => not_started.run(Some(env)).unwrap(),
_ => panic!("Expected NotStarted job"),
};
// Give it a tiny bit of time to start
std::thread::sleep(std::time::Duration::from_millis(10));
@ -478,10 +537,10 @@ mod tests {
.unwrap();
// Generate the cancel event from the canceled state
let cancel_event = canceled_job.state.to_event(&canceled_job.id());
let cancel_event = canceled_job.state.to_event(&canceled_job.job_run_id);
// Verify we got the cancel event
assert_eq!(cancel_event.job_run_id, canceled_job.id().to_string());
assert_eq!(cancel_event.job_run_id, canceled_job.job_run_id.to_string());
assert!(cancel_event.source.is_some());
assert_eq!(cancel_event.comment, Some("Job was canceled".to_string()));
@ -499,7 +558,7 @@ mod tests {
#[test]
fn test_job_run_fail_on_missing_deps_should_emit_missing_deps_event() {
// Spawn a job run that will sleep for 1 second and write a file
let job_run = SubProcessBackend::spawn(MockJobRun::bin_path(), vec![]);
let job_run = JobRun::<SubProcessBackend>::spawn(MockJobRun::bin_path(), vec![]);
let expected_dep_miss = JobRunMissingDeps {
version: "1".into(),
@ -515,24 +574,29 @@ mod tests {
.stdout_msg(&dep_miss_line)
.exit_code(1)
.to_env();
let mut running_job = job_run.run_with_env(Some(env)).unwrap();
let running_job = match job_run {
JobRun::NotStarted(not_started) => not_started.run(Some(env)).unwrap(),
_ => panic!("Expected NotStarted job"),
};
// Poll until we get completion
let mut current_job = running_job;
loop {
match running_job.visit().unwrap() {
JobRunVisitResult::Completed(_) => {
match current_job.visit().unwrap() {
VisitResult::Completed(_) => {
panic!("Job succeeded unexpectedly");
}
JobRunVisitResult::Failed(failed) => {
VisitResult::Failed(_failed) => {
panic!("Job failed unexpectedly");
}
JobRunVisitResult::StillRunning => {
VisitResult::StillRunning(still_running) => {
// Sleep briefly and poll again
std::thread::sleep(std::time::Duration::from_millis(10));
current_job = still_running;
continue;
}
JobRunVisitResult::DepMiss(backend) => {
assert_eq!(backend.state.missing_deps, expected_dep_miss.missing_deps);
VisitResult::DepMiss(dep_miss) => {
assert_eq!(dep_miss.state.missing_deps, expected_dep_miss.missing_deps);
break;
}
}

View file

@ -2,6 +2,7 @@ mod build_event_log;
mod orchestrator;
mod job_run;
mod job;
mod partition_state;
mod util;
mod build_state;
mod event_transforms;

View file

@ -1,10 +1,7 @@
use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage};
use crate::data_build_event::Event;
use crate::job::JobConfiguration;
use crate::job_run::{
CompletedJobRun, DepMissJobRun, FailedJobRun, JobRunVisitResult, NotStartedJobRun,
RunningJobRun, SubProcessBackend,
};
use crate::job_run::SubProcessBackend;
use crate::util::DatabuildError;
use crate::{JobRunBufferEventV1, PartitionRef, WantDetail};
use std::collections::HashMap;
@ -22,24 +19,16 @@ JTBDs:
*/
struct Orchestrator<S: BELStorage + Debug> {
pub bel: BuildEventLog<S>,
pub not_started_jobs: Vec<NotStartedJobRun<SubProcessBackend>>,
pub running_jobs: Vec<RunningJobRun<SubProcessBackend>>,
pub completed_jobs: Vec<CompletedJobRun<SubProcessBackend>>,
pub failed_jobs: Vec<FailedJobRun<SubProcessBackend>>,
pub dep_miss_jobs: Vec<DepMissJobRun<SubProcessBackend>>,
pub config: OrchestratorConfig,
pub job_runs: Vec<crate::job_run::JobRun<SubProcessBackend>>,
}
impl Default for Orchestrator<MemoryBELStorage> {
fn default() -> Self {
Self {
bel: Default::default(),
not_started_jobs: Default::default(),
running_jobs: Default::default(),
completed_jobs: Default::default(),
failed_jobs: Default::default(),
dep_miss_jobs: Default::default(),
config: Default::default(),
job_runs: Default::default(),
}
}
}
@ -48,12 +37,8 @@ impl Orchestrator<MemoryBELStorage> {
fn copy(&self) -> Self {
Self {
bel: self.bel.clone(),
not_started_jobs: Default::default(),
running_jobs: Default::default(),
completed_jobs: Default::default(),
failed_jobs: Default::default(),
dep_miss_jobs: Default::default(),
config: self.config.clone(),
job_runs: Default::default(),
}
}
}
@ -112,11 +97,6 @@ struct WantGroup {
wants: Vec<WantDetail>,
}
impl WantGroup {
pub fn spawn(&self) -> Result<NotStartedJobRun<SubProcessBackend>, std::io::Error> {
self.job.spawn(self.wants.clone())
}
}
#[derive(Debug, Clone)]
struct GroupedWants {
@ -140,63 +120,68 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
fn new(storage: S, config: OrchestratorConfig) -> Self {
Self {
bel: BuildEventLog::new(storage, Default::default()),
not_started_jobs: Vec::new(),
running_jobs: Vec::new(),
completed_jobs: Vec::new(),
failed_jobs: Vec::new(),
dep_miss_jobs: Vec::new(),
config,
job_runs: Vec::new(),
}
}
fn schedule_queued_jobs(&mut self) -> Result<(), DatabuildError> {
// TODO need to incorporate concurrency limit/pools here, probably?
while let Some(job) = self.not_started_jobs.pop() {
self.running_jobs.push(job.run()?);
}
use crate::job_run::JobRun;
let mut new_jobs = Vec::new();
for job in self.job_runs.drain(..) {
let transitioned = match job {
JobRun::NotStarted(not_started) => JobRun::Running(not_started.run(None)?),
other => other, // Pass through all other states unchanged
};
new_jobs.push(transitioned);
}
self.job_runs = new_jobs;
Ok(())
}
/// Visits individual job runs, appending resulting events, and moving runs between run status
/// containers. Either jobs are still running, or they are moved to terminal states.
fn poll_job_runs(&mut self) -> Result<(), DatabuildError> {
use crate::job_run::{JobRun, VisitResult};
self.schedule_queued_jobs()?;
// Visit running jobs and transition them to terminal states
let mut still_running = Vec::new();
// TODO make sure that failure in the middle can't mess up build state - likely need to
// refactor here (e.g. turn state changes into data, commit them after all have been
// calculated and validated)
for mut job in self.running_jobs.drain(..) {
match job.visit()? {
JobRunVisitResult::StillRunning => {
println!("Still running job: {:?}", job.id());
still_running.push(job);
// Visit all running jobs using type-safe transitions
let mut new_jobs = Vec::new();
for job in self.job_runs.drain(..) {
let transitioned = match job {
JobRun::Running(running) => {
match running.visit()? {
VisitResult::StillRunning(still_running) => {
println!("Still running job: {:?}", still_running.job_run_id);
JobRun::Running(still_running)
}
VisitResult::Completed(completed) => {
println!("Completed job: {:?}", completed.job_run_id);
let event = completed.state.to_event(&completed.job_run_id);
self.bel.append_event(&event)?;
JobRun::Completed(completed)
}
VisitResult::Failed(failed) => {
println!("Failed job: {:?}", failed.job_run_id);
let event = failed.state.to_event(&failed.job_run_id);
self.bel.append_event(&event)?;
JobRun::Failed(failed)
}
VisitResult::DepMiss(dep_miss) => {
println!("Dep miss job: {:?}", dep_miss.job_run_id);
let event = dep_miss.state.to_event(&dep_miss.job_run_id);
self.bel.append_event(&event)?;
JobRun::DepMiss(dep_miss)
}
}
}
JobRunVisitResult::Completed(completed) => {
// Emit success event
println!("Completed job: {:?}", completed.id());
self.bel
.append_event(&completed.state.to_event(&completed.id()))?;
self.completed_jobs.push(completed);
}
JobRunVisitResult::Failed(failed) => {
// Emit failure event
println!("Failed job: {:?}", failed.id());
let event: Event = failed.state.to_event(&failed.id());
self.bel.append_event(&event)?;
self.failed_jobs.push(failed);
}
JobRunVisitResult::DepMiss(dep_miss) => {
println!("Dep miss job: {:?}", dep_miss.job_run_id);
let event = dep_miss.state.to_event(&dep_miss.id());
self.bel.append_event(&event)?;
self.dep_miss_jobs.push(dep_miss);
}
}
other => other, // Pass through all non-running states unchanged
};
new_jobs.push(transitioned);
}
self.running_jobs = still_running;
self.job_runs = new_jobs;
Ok(())
}
@ -243,12 +228,17 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
}
fn queue_job(&mut self, wg: WantGroup) -> Result<(), DatabuildError> {
// Spawn job run (not started, but need only be `.run`'d)
let job_run = wg.spawn()?;
use crate::job_run::JobRun;
// Compute args from wants the same way JobConfiguration::spawn() does
let wanted_refs: Vec<crate::PartitionRef> =
wg.wants.iter().flat_map(|want| want.partitions.clone()).collect();
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
let job_run = JobRun::spawn(wg.job.entry_point.clone(), args);
// Create job run buffer event
let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 {
job_run_id: job_run.job_run_id.into(),
job_run_id: job_run.job_run_id().to_string(),
job_label: wg.job.label,
building_partitions: wg
.wants
@ -259,7 +249,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
want_attributed_partitions: wg.wants.iter().map(|w| w.into()).collect(),
});
self.bel.append_event(&job_buffer_event)?;
self.not_started_jobs.push(job_run);
self.job_runs.push(job_run);
Ok(())
}
@ -270,6 +260,36 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
Ok(())
}
// Helper methods for tests to count jobs by state
#[cfg(test)]
fn count_running_jobs(&self) -> usize {
use crate::job_run::JobRun;
self.job_runs.iter().filter(|j| matches!(j, JobRun::Running(_))).count()
}
#[cfg(test)]
fn count_terminal_jobs(&self) -> usize {
self.job_runs.iter().filter(|j| j.is_terminal()).count()
}
#[cfg(test)]
fn count_not_started_jobs(&self) -> usize {
use crate::job_run::JobRun;
self.job_runs.iter().filter(|j| matches!(j, JobRun::NotStarted(_))).count()
}
#[cfg(test)]
fn count_dep_miss_jobs(&self) -> usize {
use crate::job_run::JobRun;
self.job_runs.iter().filter(|j| matches!(j, JobRun::DepMiss(_))).count()
}
#[cfg(test)]
fn count_completed_jobs(&self) -> usize {
use crate::job_run::JobRun;
self.job_runs.iter().filter(|j| matches!(j, JobRun::Completed(_))).count()
}
/** Entrypoint for running jobs */
pub fn join(&mut self) -> Result<(), DatabuildError> {
loop {
@ -376,14 +396,14 @@ mod tests {
fn test_empty_wants_noop() {
let mut orchestrator = build_orchestrator();
// Should init with no work to do
assert!(orchestrator.not_started_jobs.is_empty());
assert!(orchestrator.running_jobs.is_empty());
assert_eq!(orchestrator.count_not_started_jobs(), 0);
assert_eq!(orchestrator.count_running_jobs(), 0);
orchestrator
.poll_wants()
.expect("shouldn't fail to poll empty wants");
// Should still be empty since no work to do
assert!(orchestrator.not_started_jobs.is_empty());
assert!(orchestrator.running_jobs.is_empty());
assert_eq!(orchestrator.count_not_started_jobs(), 0);
assert_eq!(orchestrator.count_running_jobs(), 0);
}
// Use case: Some schedulable wants with jobs that can be matched should launch those jobs
@ -400,7 +420,7 @@ mod tests {
for e in events {
orchestrator.bel.append_event(&e).expect("append");
}
assert_eq!(orchestrator.not_started_jobs.len(), 0);
assert_eq!(orchestrator.count_not_started_jobs(), 0);
assert_eq!(orchestrator.bel.state.count_job_runs(), 0);
// When
@ -410,19 +430,13 @@ mod tests {
.expect("shouldn't fail to poll wants");
// Should schedule alpha job
assert_eq!(orchestrator.not_started_jobs.len(), 1);
assert_eq!(
orchestrator
.not_started_jobs
.iter()
.take(1)
.last()
.unwrap()
.state
.args,
vec!["data/alpha"],
"should have scheduled alpha job"
);
assert_eq!(orchestrator.count_not_started_jobs(), 1);
// Verify the job has the right args by checking the first NotStarted job
use crate::job_run::JobRun;
let not_started_job = orchestrator.job_runs.iter().find(|j| matches!(j, JobRun::NotStarted(_))).unwrap();
if let JobRun::NotStarted(job) = not_started_job {
assert_eq!(job.state.args, vec!["data/alpha"], "should have scheduled alpha job");
}
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
}
@ -442,7 +456,7 @@ mod tests {
.expect("shouldn't fail to poll wants");
// Should not have scheduled any jobs
assert_eq!(orchestrator.not_started_jobs.len(), 0);
assert_eq!(orchestrator.count_not_started_jobs(), 0);
}
}
@ -540,7 +554,7 @@ mod tests {
mod orchestration {
use crate::data_build_event::Event;
use crate::orchestrator::tests::{build_orchestrator, setup_scenario_a_to_b};
use crate::{PartitionStatusCode, WantCreateEventV1};
use crate::{PartitionStatusCode, WantCreateEventV1, WantStatusCode};
use std::thread;
use std::time::Duration;
@ -565,32 +579,42 @@ mod tests {
orchestrator
.poll_wants()
.expect("stage unscheduled jobs based on wants failed");
assert_eq!(orchestrator.not_started_jobs.len(), 1);
// poll job runs should start job run
orchestrator.poll_job_runs().expect("should start run");
assert_eq!(orchestrator.running_jobs.len(), 1);
assert_eq!(orchestrator.count_not_started_jobs(), 1);
// step should start job run
orchestrator.step().expect("should start run");
assert_eq!(orchestrator.count_running_jobs(), 1);
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
// Partition should be in Building state after job starts
assert_eq!(
orchestrator
.bel
.state
.get_partition(partition)
.unwrap()
.status,
Some(PartitionStatusCode::PartitionBuilding.into()),
"partition should be in Building state after job starts"
);
thread::sleep(Duration::from_millis(1));
// Should still be running after 1ms
orchestrator
.poll_job_runs()
.step()
.expect("should still be running");
assert_eq!(orchestrator.running_jobs.len(), 1);
assert_eq!(orchestrator.count_running_jobs(), 1);
assert_eq!(orchestrator.bel.state.count_job_runs(), 1);
println!("STATE: {:?}", orchestrator.bel.state);
// Wait for it to complete
thread::sleep(Duration::from_millis(10));
orchestrator
.poll_job_runs()
.step()
.expect("should be able to poll existing job run");
// Job run should have succeeded
assert!(orchestrator.not_started_jobs.is_empty());
assert!(orchestrator.failed_jobs.is_empty());
assert!(orchestrator.dep_miss_jobs.is_empty());
assert!(orchestrator.running_jobs.is_empty());
assert_eq!(orchestrator.completed_jobs.len(), 1);
assert_eq!(orchestrator.count_not_started_jobs(), 0);
assert_eq!(orchestrator.count_completed_jobs(), 1);
// Build state should show partition as live
assert_eq!(
@ -615,7 +639,7 @@ mod tests {
for _i in 0..max_steps {
thread::sleep(Duration::from_millis(50));
if orchestrator.running_jobs.is_empty() {
if orchestrator.count_running_jobs() == 0 {
return Ok(());
}
orchestrator
@ -702,7 +726,7 @@ echo 'Beta succeeded'
// Step 1: Should schedule beta job (want -> not_started_jobs)
orchestrator.step().expect("step 1");
assert_eq!(
orchestrator.not_started_jobs.len(),
orchestrator.count_not_started_jobs(),
1,
"beta job should be queued"
);
@ -710,33 +734,75 @@ echo 'Beta succeeded'
// Step 2: Should start beta job (not_started_jobs -> running_jobs)
orchestrator.step().expect("step 2");
assert_eq!(
orchestrator.running_jobs.len(),
orchestrator.count_running_jobs(),
1,
"beta job should be running"
);
// Beta partition should be in Building state after job starts
assert_eq!(
orchestrator
.bel
.state
.get_partition(partition_beta)
.unwrap()
.status,
Some(PartitionStatusCode::PartitionBuilding.into()),
"beta partition should be in Building state after job starts"
);
// Step 3: Beta job detects missing alpha dep and creates want
wait_for_jobs_to_complete(&mut orchestrator, 10).expect("beta job should complete");
// (Beta should now be in dep_miss state, and a want for alpha should be created)
assert_eq!(
orchestrator.dep_miss_jobs.len(),
orchestrator.count_dep_miss_jobs(),
1,
"beta should have dep miss"
);
// Beta want should be in UpstreamBuilding state waiting for alpha
// (Check that at least one want referencing beta is in UpstreamBuilding)
let wants_response = orchestrator
.bel
.state
.list_wants(&crate::ListWantsRequest::default());
let beta_wants: Vec<_> = wants_response
.data
.iter()
.filter(|w| w.partitions.iter().any(|p| p.r#ref == partition_beta))
.collect();
assert!(
beta_wants.iter().any(|w| w.status.as_ref().map(|s| s.code) == Some(WantStatusCode::WantUpstreamBuilding as i32)),
"At least one beta want should be in UpstreamBuilding state, found: {:?}",
beta_wants.iter().map(|w| &w.status).collect::<Vec<_>>()
);
// Step 4: Should schedule and start alpha job
// (dep miss handler created the alpha want, which will be picked up by poll_wants)
orchestrator.step().expect("step 4");
assert_eq!(
orchestrator.running_jobs.len(),
orchestrator.count_running_jobs(),
1,
"alpha job should be running"
);
// Alpha partition should be in Building state after job starts
assert_eq!(
orchestrator
.bel
.state
.get_partition(partition_alpha)
.unwrap()
.status,
Some(PartitionStatusCode::PartitionBuilding.into()),
"alpha partition should be in Building state after job starts"
);
// Step 6: Alpha completes successfully
wait_for_jobs_to_complete(&mut orchestrator, 10).expect("alpha job should complete");
assert_eq!(
orchestrator.completed_jobs.len(),
orchestrator.count_completed_jobs(),
1,
"alpha should complete"
);
@ -753,23 +819,22 @@ echo 'Beta succeeded'
// Step 7: Beta is rescheduled and started (want -> running_jobs)
orchestrator.step().expect("step 7");
assert_eq!(orchestrator.running_jobs.len(), 1, "beta should be running");
assert_eq!(orchestrator.count_running_jobs(), 1, "beta should be running");
// Step 8: Beta completes successfully
wait_for_jobs_to_complete(&mut orchestrator, 10).expect("beta job should complete");
// Then: Verify both partitions are live and both jobs completed
assert_eq!(
orchestrator.completed_jobs.len(),
orchestrator.count_completed_jobs(),
2,
"both jobs should complete"
);
assert_eq!(
orchestrator.dep_miss_jobs.len(),
orchestrator.count_dep_miss_jobs(),
1,
"should have one dep miss"
);
assert!(orchestrator.failed_jobs.is_empty(), "no jobs should fail");
assert_eq!(
orchestrator

View file

@ -0,0 +1,268 @@
use crate::{PartitionRef, PartitionDetail, PartitionStatus, PartitionStatusCode};
/// State: Partition has been referenced but not yet built
#[derive(Debug, Clone)]
pub struct MissingState {}
/// State: Partition is currently being built by one or more jobs
#[derive(Debug, Clone)]
pub struct BuildingState {
pub building_by: Vec<String>, // job_run_ids
}
/// State: Partition has been successfully built
#[derive(Debug, Clone)]
pub struct LiveState {
pub built_at: u64,
pub built_by: String, // job_run_id
}
/// State: Partition build failed
#[derive(Debug, Clone)]
pub struct FailedState {
pub failed_at: u64,
pub failed_by: String, // job_run_id
}
/// State: Partition has been marked as invalid/tainted
#[derive(Debug, Clone)]
pub struct TaintedState {
pub tainted_at: u64,
pub taint_ids: Vec<String>,
}
/// Generic partition struct parameterized by state
#[derive(Debug, Clone)]
pub struct PartitionWithState<S> {
pub partition_ref: PartitionRef,
pub want_ids: Vec<String>,
pub state: S,
}
/// Wrapper enum for storing partitions in collections
#[derive(Debug, Clone)]
pub enum Partition {
Missing(PartitionWithState<MissingState>),
Building(PartitionWithState<BuildingState>),
Live(PartitionWithState<LiveState>),
Failed(PartitionWithState<FailedState>),
Tainted(PartitionWithState<TaintedState>),
}
// Type-safe transition methods for MissingState
impl PartitionWithState<MissingState> {
/// Transition from Missing to Building when a job starts building this partition
pub fn start_building(self, job_run_id: String) -> PartitionWithState<BuildingState> {
PartitionWithState {
partition_ref: self.partition_ref,
want_ids: self.want_ids,
state: BuildingState {
building_by: vec![job_run_id],
},
}
}
}
// Type-safe transition methods for BuildingState
impl PartitionWithState<BuildingState> {
/// Transition from Building to Live when a job successfully completes
pub fn complete(self, job_run_id: String, timestamp: u64) -> PartitionWithState<LiveState> {
PartitionWithState {
partition_ref: self.partition_ref,
want_ids: self.want_ids,
state: LiveState {
built_at: timestamp,
built_by: job_run_id,
},
}
}
/// Transition from Building to Failed when a job fails
pub fn fail(self, job_run_id: String, timestamp: u64) -> PartitionWithState<FailedState> {
PartitionWithState {
partition_ref: self.partition_ref,
want_ids: self.want_ids,
state: FailedState {
failed_at: timestamp,
failed_by: job_run_id,
},
}
}
/// Add another job to the list of jobs building this partition
pub fn add_building_job(mut self, job_run_id: String) -> Self {
if !self.state.building_by.contains(&job_run_id) {
self.state.building_by.push(job_run_id);
}
self
}
/// Transition from Building back to Missing when a job discovers missing dependencies
pub fn reset_to_missing(self) -> PartitionWithState<MissingState> {
PartitionWithState {
partition_ref: self.partition_ref,
want_ids: self.want_ids,
state: MissingState {},
}
}
}
// Type-safe transition methods for LiveState
impl PartitionWithState<LiveState> {
/// Transition from Live to Tainted when a taint is applied
pub fn taint(self, taint_id: String, timestamp: u64) -> PartitionWithState<TaintedState> {
PartitionWithState {
partition_ref: self.partition_ref,
want_ids: self.want_ids,
state: TaintedState {
tainted_at: timestamp,
taint_ids: vec![taint_id],
},
}
}
}
// Type-safe transition methods for TaintedState
impl PartitionWithState<TaintedState> {
/// Add another taint to an already-tainted partition
pub fn add_taint(mut self, taint_id: String) -> Self {
if !self.state.taint_ids.contains(&taint_id) {
self.state.taint_ids.push(taint_id);
}
self
}
}
// Helper methods on the Partition enum
impl Partition {
/// Create a new partition in the Missing state
pub fn new_missing(partition_ref: PartitionRef) -> Self {
Partition::Missing(PartitionWithState {
partition_ref,
want_ids: vec![],
state: MissingState {},
})
}
/// Get the partition reference from any state
pub fn partition_ref(&self) -> &PartitionRef {
match self {
Partition::Missing(p) => &p.partition_ref,
Partition::Building(p) => &p.partition_ref,
Partition::Live(p) => &p.partition_ref,
Partition::Failed(p) => &p.partition_ref,
Partition::Tainted(p) => &p.partition_ref,
}
}
/// Get want_ids from any state
pub fn want_ids(&self) -> &Vec<String> {
match self {
Partition::Missing(p) => &p.want_ids,
Partition::Building(p) => &p.want_ids,
Partition::Live(p) => &p.want_ids,
Partition::Failed(p) => &p.want_ids,
Partition::Tainted(p) => &p.want_ids,
}
}
/// Get mutable want_ids from any state
pub fn want_ids_mut(&mut self) -> &mut Vec<String> {
match self {
Partition::Missing(p) => &mut p.want_ids,
Partition::Building(p) => &mut p.want_ids,
Partition::Live(p) => &mut p.want_ids,
Partition::Failed(p) => &mut p.want_ids,
Partition::Tainted(p) => &mut p.want_ids,
}
}
/// Check if partition is in Live state
pub fn is_live(&self) -> bool {
matches!(self, Partition::Live(_))
}
/// Check if partition is satisfied (Live or Tainted both count as "available")
pub fn is_satisfied(&self) -> bool {
matches!(self, Partition::Live(_) | Partition::Tainted(_))
}
/// Check if partition is in a terminal state (Live, Failed, or Tainted)
pub fn is_terminal(&self) -> bool {
matches!(
self,
Partition::Live(_) | Partition::Failed(_) | Partition::Tainted(_)
)
}
/// Check if partition is currently being built
pub fn is_building(&self) -> bool {
matches!(self, Partition::Building(_))
}
/// Check if partition is missing (referenced but not built)
pub fn is_missing(&self) -> bool {
matches!(self, Partition::Missing(_))
}
/// Convert to PartitionDetail for API responses and queries
pub fn to_detail(&self) -> PartitionDetail {
match self {
Partition::Missing(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
status: Some(PartitionStatus {
code: PartitionStatusCode::PartitionWanted as i32,
name: "PartitionWanted".to_string(),
}),
want_ids: p.want_ids.clone(),
job_run_ids: vec![],
taint_ids: vec![],
last_updated_timestamp: None,
},
Partition::Building(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
status: Some(PartitionStatus {
code: PartitionStatusCode::PartitionBuilding as i32,
name: "PartitionBuilding".to_string(),
}),
want_ids: p.want_ids.clone(),
job_run_ids: p.state.building_by.clone(),
taint_ids: vec![],
last_updated_timestamp: None,
},
Partition::Live(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
status: Some(PartitionStatus {
code: PartitionStatusCode::PartitionLive as i32,
name: "PartitionLive".to_string(),
}),
want_ids: p.want_ids.clone(),
job_run_ids: vec![p.state.built_by.clone()],
taint_ids: vec![],
last_updated_timestamp: Some(p.state.built_at),
},
Partition::Failed(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
status: Some(PartitionStatus {
code: PartitionStatusCode::PartitionFailed as i32,
name: "PartitionFailed".to_string(),
}),
want_ids: p.want_ids.clone(),
job_run_ids: vec![p.state.failed_by.clone()],
taint_ids: vec![],
last_updated_timestamp: Some(p.state.failed_at),
},
Partition::Tainted(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
status: Some(PartitionStatus {
code: PartitionStatusCode::PartitionTainted as i32,
name: "PartitionTainted".to_string(),
}),
want_ids: p.want_ids.clone(),
job_run_ids: vec![],
taint_ids: p.state.taint_ids.clone(),
last_updated_timestamp: Some(p.state.tainted_at),
},
}
}
}

View file

@ -0,0 +1,173 @@
# Orchestrated State Machines: A Theory of Application Architecture
## Overview
DataBuild's core architecture exemplifies a pattern we call **Dependency-Aware State Machine Orchestration** or **Stateful Dataflow Architecture**. This document crystallizes the theory behind this approach and its applications.
## The Pattern
At its essence, the pattern is:
```
Application = State Machines + Dependency Graph + Orchestration Logic
```
Where:
- **State Machines**: Individual entities (Want, JobRun, Partition) with well-defined, type-safe states
- **Dependency Graph**: Relationships between entities (wants depend on partitions, partitions depend on job runs)
- **Orchestration Logic**: Coordination rules that trigger state transitions when dependencies are satisfied
## Key Components
### 1. State Machines
Each domain entity is modeled as an explicit state machine with:
- **Well-defined states** (NotStarted, Running, Completed, Failed, etc.)
- **Type-safe transitions** enforced at compile time
- **Immutable state progression** via consuming methods
Example from DataBuild:
```rust
pub enum JobRun<B: JobRunBackend> {
NotStarted(JobRunWithState<B, B::NotStartedState>),
Running(JobRunWithState<B, B::RunningState>),
Completed(JobRunWithState<B, B::CompletedState>),
Failed(JobRunWithState<B, B::FailedState>),
// ...
}
// Can ONLY call run() on NotStarted jobs - compiler enforces this!
impl<B: JobRunBackend> JobRunWithState<B, B::NotStartedState> {
pub fn run(self, env) -> Result<JobRunWithState<B, B::RunningState>, Error>
}
```
### 2. Dependency Graph
Entities are connected through explicit dependencies:
- Wants → Partitions (wants request specific partitions)
- Partitions → JobRuns (jobs build partitions)
- JobRuns → Partitions (jobs declare what they built)
### 3. Orchestration Logic
A central orchestrator:
- Observes all entity states
- Evaluates dependency conditions
- Triggers state transitions when conditions are met
- Maintains global consistency invariants
## Core Principles
1. **Model domain entities as explicit state machines** - Don't hide state in boolean flags
2. **Express dependencies as a graph** - Make relationships first-class
3. **Centralize coordination logic** - Separate entity behavior from system coordination
4. **Make state transitions event-sourced** - Append-only log enables time-travel and auditability
5. **Use types to enforce valid transitions** - Catch errors at compile time, not runtime
## Advantages
### Type Safety
Compile-time guarantees prevent invalid state transitions:
```rust
// This will not compile:
let job = JobRun::Running(running_job);
job.run(); // ERROR: no method `run` found for `JobRun<Running>`
```
### Observability
Event-sourced state transitions provide complete audit trail:
- What's running? Query running jobs
- What failed? Filter by failed state
- When did it transition? Check BEL timestamps
### Testability
- State machines can be tested in isolation
- Orchestration logic can be tested with mock state machines
- Dependency resolution can be tested independently
### Incremental Progress
System can be stopped and restarted:
- State is persisted in BEL
- Resume from last known state
- No need to restart from beginning
### Correctness
- Type system prevents impossible states
- Event log provides ground truth
- Dependency graph ensures proper ordering
## Real-World Applications
This pattern is the **fundamental architecture** of:
**Build Systems**
- Bazel, Buck, Pants - artifacts depend on other artifacts
- Your "builds" are literally builds
**Workflow Engines**
- Temporal, Prefect, Airflow - DAG of tasks with state
- Each task is a state machine, orchestrator schedules based on dependencies
**Data Orchestration**
- Dagster, Kedro - data assets with lineage
- Partitions are data assets, jobs are transformations
**Game Engines**
- Entity Component Systems - entities have state
- Game loop orchestrates entity state transitions
**Business Process Management**
- BPMN engines - business processes as state machines
- Workflow engine coordinates process instances
## When to Use This Pattern
This architecture is particularly powerful for systems where:
- **Eventual consistency** is acceptable (not strict ACID transactions)
- **Incremental progress** is important (can checkpoint and resume)
- **Observability** is critical (need to know what's happening)
- **Correctness** matters (type-safe transitions prevent bugs)
- **Concurrency** is inherent (multiple things happening simultaneously)
- **Dependencies** are complex (can't just process sequentially)
## Implementation Lessons
### Use Drain for State Transitions
Clean pattern for moving entities through states:
```rust
fn schedule_queued_jobs(&mut self) -> Result<()> {
let mut new_jobs = Vec::new();
for job in self.job_runs.drain(..) {
let transitioned = match job {
JobRun::NotStarted(ns) => JobRun::Running(ns.run(None)?),
other => other, // Pass through unchanged
};
new_jobs.push(transitioned);
}
self.job_runs = new_jobs;
Ok(())
}
```
### Parameterize State for Type Safety
```rust
pub struct JobRunWithState<Backend, State> {
job_run_id: Uuid,
state: State, // Type parameter enforces valid operations
}
```
### Event Sourcing for Auditability
All state changes emit events to append-only log:
```rust
self.bel.append_event(&JobRunSuccessEvent {
job_run_id,
timestamp
})?;
```
### Separate Entity Logic from Coordination
- Entity state machines: "What transitions are valid for me?"
- Orchestrator: "Given all entity states, what should happen next?"

View file

@ -0,0 +1,21 @@
If you look at the BEL definition, you'll see that there's two components to it, the literal serialized event stream, and the build state, a projection of the events into objects (e.g. via reducer, etc):
```rust
pub struct BuildEventLog<S: BELStorage + Debug> {
pub storage: S,
pub state: BuildState,
}
```
`storage` is the literal events that happened: a job run being launched, a want being requested, a job run finishing and producing some number of partitions, etc. `state` answers questions about the state of the world as a result of the serial occurrence of the recorded events, like "is the partition x/y/z live?" and "why hasn't partition a/b/c been built yet"? `state` is essentially the thing responsible for system consistency.
Most of the code in this project is in calculating next states for build state objects: determining wants that can have jobs run to satisfy them, updating partitions to live after a job run succeeds, etc. Can we formalize this into a composition of state machines to simplify the codebase, achieve more compile-time safety, and potentially unlock greater concurrency as a byproduct?
CPN concurrency can be describe succinctly: if the workloads touch disjoint places, they can be run concurrently. This seems to overwhelmingly be the case for the domain databuild is interested in, where a single "data service" is traditionally responsible for producing partitions in a given dataset. Another huge benefit to using a CPN framing for databuild is to separate concerns between state updates/consistency and all the stuff that connects to it.
# Appendix
## Partition Collisions?
Random thought, we also have this lingering "what if unrelated wants collide in the partition space", specifically for a paradigm where job runs produce multiple partitions based on their parameterization. This may also give us the confidence to just cancel the later of the colliding jobs and have it reschedule (how would partitions be diff?). Or, given that we update partition building status on job schedule, we would be confident that we just never get into that situation at the later want grouping stage (pre job scheduling), it would see the conflict partition as building thanks to the earlier job being started. Probably worth constructing a literal situation for this to war game it or implement a literal integration test.