Compare commits

...

12 commits

12 changed files with 2979 additions and 892 deletions

View file

@ -0,0 +1,579 @@
---
name: databuild-build-state-semantics
description: The core semantics of databuild's build state; conceptual boundaries, responsibilities, and interfaces; the architecture mental model and rationale; read when discussing architecture or design.
---
# Build State Semantics
To achieve databuild's goal of declarative partitioned data builds (via explicitly stated data dependencies between jobs), databuild employs a "build state" concept that, together with the orchestrator, makes up all of the data catalog and job run scheduling logic needed to produce data based on user wants.
## Core Mental Model
DataBuild's BuildState implements **event sourcing** combined with an **Entity Component System (ECS)** pattern:
- **Immutable event log**: All state changes recorded as events (WantCreateEventV1, JobRunBufferEventV1, etc.)
- **Derived mutable state**: BuildState reconstructed by replaying events through state machines
- **ECS pattern**: Entities stored in flat collections, relationships via inverted indexes (not nested objects)
- **Type-state machines**: Compile-time enforcement of valid state transitions
**Public interface**:
- Query: `get_want()`, `list_partitions()`, `get_partition()`, etc.
- Mutation: `handle_event()` processes events and transitions states
- No direct state manipulation outside event handling
**Separation of concerns**:
- **BuildState**: Maintains entity state, processes events, provides queries
- **Orchestrator**: Polls BuildState, makes scheduling decisions, emits job events
## Compile-Time Correctness Strategy
The primary defense against bugs is making invalid states **unrepresentable** at compile time.
**Type-state pattern**: States encoded in type system, transitions consume self
```rust
// Can only call .complete() on BuildingState
impl PartitionWithState<BuildingState> {
pub fn complete(self, job_run_id: String, timestamp: u64) -> PartitionWithState<LiveState> {
PartitionWithState {
partition_ref: self.partition_ref,
state: LiveState { built_at: timestamp, built_by: job_run_id },
}
}
}
// Cannot call .complete() on LiveState - method doesn't exist
impl PartitionWithState<LiveState> {
pub fn taint(self, taint_id: String, timestamp: u64) -> PartitionWithState<TaintedState> { ... }
}
```
**Benefits**:
- Invalid transitions caught at compile time: `live_partition.complete()` → compile error
- Refactoring safety: compiler guides you through state machine changes
- Self-documenting: `fn schedule(want: WantWithState<IdleState>)` encodes precondition
- Fast feedback loop: seconds (compile error) vs minutes (runtime panic) vs days (production bug)
**Runtime panics reserved for invariant violations** (bugs in BuildState implementation):
- Missing references: `partitions_by_uuid[uuid]` doesn't exist → panic with context
- Index inconsistencies: `canonical_partitions[ref]` points to invalid UUID → panic
- These should never happen in correct implementation
## Architectural Layering
Three entity types with pragmatic data flow:
```
Wants (user requests for data)
↓ references partition refs (Vec<PartitionRef>)
Partitions (data artifacts being built)
↓ building_by/built_by job_run_ids (tracking)
↑ wants_for_partition inverted index
JobRuns (execution processes)
```
**Direct references**:
- Wants → Partitions: wants store `partitions: Vec<PartitionRef>`
- JobRuns → Partitions: jobs store `building_partition_uuids: Vec<Uuid>`
- Partitions → JobRuns: partitions store `building_by: Vec<String>` (job_run_ids)
**Inverted index**:
- Partitions → Wants: `wants_for_partition: BTreeMap<String, Vec<String>>`
- Maps partition_ref → want_ids waiting for it
- Why not direct? Partitions keyed by UUID, but wants use partition_ref for mapping
- Efficient lookup: "which wants are waiting for partition ref X?"
**Intentional separation**:
- JobRuns don't know about Wants (jobs build partitions, agnostic to requesters)
- Wants don't know about JobRuns (users care about data availability, not execution)
## Entity State Machines
### Want States
```
New → {Idle, Building, UpstreamBuilding, Successful, Failed, UpstreamFailed, Canceled}
```
**State semantics**: "What is the current status of my requested partitions?"
- **New**: Just created, state not yet determined (ephemeral, transitions immediately)
- **Idle**: Partitions don't exist or are ready to retry (UpForRetry) → schedulable
- **Building**: Canonical partitions currently being built by jobs
- **UpstreamBuilding**: Canonical partitions waiting for upstream dependencies
- **Successful**: All canonical partitions are Live
- **Failed**: Canonical partition hard failure (shouldn't retry)
- **UpstreamFailed**: Canonical partition's upstream failed (can't succeed)
- **Canceled**: Explicitly canceled by user/system
**Key insight**: Want state reflects canonical partition state, not bound to specific partition UUIDs.
Example:
```rust
// Want created for "data/beta"
want.partitions = ["data/beta"]
// Determine state by checking canonical partition
if let Some(uuid) = canonical_partitions.get("data/beta") {
let partition = partitions_by_uuid[uuid];
match partition.state {
Building => want.state = Building,
Live => want.state = Successful,
Failed => want.state = Failed,
// ...
}
} else {
want.state = Idle // No canonical partition exists
}
```
### Partition States
```
Building → {UpstreamBuilding, UpForRetry, Live, Failed, UpstreamFailed, Tainted}
```
**State semantics**: "What is the current build status? Is this partition leasable?"
- **Building**: Job actively building, lease held (prevent concurrent builds)
- **UpstreamBuilding**: Dep miss occurred, waiting for upstreams, lease held
- **UpForRetry**: Upstreams satisfied, ready to retry, lease released
- **Live**: Successfully built (terminal)
- **Failed**: Hard failure, shouldn't retry (terminal, lease released)
- **UpstreamFailed**: Upstream deps failed, can't succeed (terminal, lease released)
- **Tainted**: Marked invalid by taint event (terminal)
**No Missing state**: Partitions only exist when jobs start building them or have completed.
**State as lease mechanism**:
- Building/UpstreamBuilding: Lease held → orchestrator will NOT schedule new jobs
- UpForRetry/Failed/UpstreamFailed: Lease released → safe to schedule (though Failed/UpstreamFailed block wants)
- Live/Tainted: Not lease states
Example lease behavior:
```
Partition uuid-1 ("data/beta"): Building
Want W1 arrives for "data/beta" → New → Building (sees canonical is Building)
Want W2 arrives for "data/beta" → New → Building (sees canonical is Building)
Orchestrator polls: both wants Building, canonical partition Building → NOT schedulable (lease held)
```
### JobRun States
```
Queued → Running → {Successful, Failed, DepMissed}
```
- **Queued**: Job buffered, not yet started
- **Running**: Process executing
- **Successful**: Completed successfully, partitions built
- **Failed**: Process failed
- **DepMissed**: Job discovered missing dependencies, created derivative wants
## Temporal Identity & References
**Problem**: How do we distinguish "the partition being built now" from "the partition built yesterday"?
**Solution**: Partition UUIDs for temporal identity, separate from user-facing refs.
### Partition UUIDs (Immutable Identity)
Each partition build attempt gets unique UUID:
```rust
fn derive_partition_uuid(job_run_id: &str, partition_ref: &str) -> Uuid {
let mut hasher = Sha256::new();
hasher.update(job_run_id.as_bytes());
hasher.update(partition_ref.as_bytes());
let hash = hasher.finalize();
Uuid::from_slice(&hash[0..16]).unwrap()
}
```
**Properties**:
- Deterministic: Same job + ref → same UUID (enables event replay)
- Immutable: Partition(uuid-1) represents specific historical build
- Jobs reference UUIDs: "Job J built Partition uuid-1 at time T"
### Partition Refs (Canonical Names)
User-facing identifier like `"data/category=tech/date=2024-01-15"`:
- Wants reference refs: "I want data/beta to be Live"
- Canonical partitions: `canonical_partitions["data/beta"] → uuid-3`
- One canonical UUID per ref at any time
### Dual Indexing
```rust
// All partition instances (historical + current)
partitions_by_uuid: BTreeMap<Uuid, Partition>
// Current/canonical partition for each ref
canonical_partitions: BTreeMap<String, Uuid>
```
**Lifecycle example**:
```
1. Job J1 starts → uuid-1 generated for "data/beta"
2. Partition(uuid-1, "data/beta", Building) created
3. canonical_partitions["data/beta"] = uuid-1
4. Job completes → Partition(uuid-1, Live)
5. Partition tainted → Partition(uuid-1, Tainted), still canonical
6. New job J2 starts → uuid-2 generated
7. Partition(uuid-2, "data/beta", Building) created
8. canonical_partitions["data/beta"] = uuid-2 (updated)
9. Partition(uuid-1) remains in partitions_by_uuid for history
```
**Query semantics**:
- "What's the current state of data/beta?" → lookup canonical_partitions["data/beta"], then partitions_by_uuid[uuid]
- "What partition did job J build?" → job.building_partition_uuids → partitions_by_uuid[uuid]
- "What was the state at time T?" → replay events up to T, query canonical_partitions
## BuildState Data Structure (ECS Pattern)
Flat collections, not nested objects:
```rust
pub struct BuildState {
// Entity collections
wants: BTreeMap<String, Want>,
partitions_by_uuid: BTreeMap<Uuid, Partition>,
canonical_partitions: BTreeMap<String, Uuid>,
job_runs: BTreeMap<String, JobRun>,
// Inverted indexes
wants_for_partition: BTreeMap<String, Vec<String>>, // partition_ref → want_ids
downstream_waiting: BTreeMap<String, Vec<Uuid>>, // partition_ref → waiting_partition_uuids
}
```
**Why ECS over OOP**:
- Avoids deep object hierarchies (`Want { partitions: Vec<Partition { job_runs: Vec<JobRun> }>}`)
- Flexible querying without coupling
- Inverted indexes provide O(1) reverse lookups
- State rebuilds from events without complex object reconstruction
- Access patterns drive data structure (not inheritance)
**Inverted index example**:
```rust
// Traditional OOP (tight coupling)
partition.wants.iter().for_each(|want| transition_want(want));
// ECS with inverted index (decoupled)
if let Some(want_ids) = wants_for_partition.get(&partition_ref) {
for want_id in want_ids {
let want = wants.get_mut(want_id).unwrap();
transition_want(want);
}
}
```
## Inverted Indexes
### wants_for_partition
```rust
BTreeMap<String, Vec<String>> // partition_ref → want_ids
```
**Purpose**: Find all wants waiting for a partition ref
**Maintenance**:
- Updated on want creation: add want_id to each partition_ref in want
- NOT cleaned up on want completion (acceptable, bounded growth)
- Replaces `partition.wants: Vec<String>` that would exist in OOP
**Usage**:
```rust
// When partition transitions Building → Live
let partition_ref = &partition.partition_ref.r#ref;
if let Some(want_ids) = wants_for_partition.get(partition_ref) {
for want_id in want_ids {
// Check if all partitions for this want are Live
// If yes, transition want Idle/Building → Successful
}
}
```
### downstream_waiting
```rust
BTreeMap<String, Vec<Uuid>> // partition_ref → waiting_partition_uuids
```
**Purpose**: O(1) lookup of partitions waiting for an upstream when it completes/fails
**Maintenance**:
- Updated when partition transitions Building → UpstreamBuilding
- For each missing upstream ref, add partition UUID to `downstream_waiting[upstream_ref]`
- Cleaned up when partition transitions UpstreamBuilding → UpForRetry/UpstreamFailed
- Remove partition UUID from all `downstream_waiting` entries
**Usage**:
```rust
// When upstream partition "data/alpha" becomes Live
if let Some(waiting_uuids) = downstream_waiting.get("data/alpha") {
for uuid in waiting_uuids {
let partition = partitions_by_uuid.get_mut(uuid).unwrap();
// Check if ALL this partition's MissingDeps are now satisfied
if all_deps_satisfied(partition) {
partition = partition.transition_to_up_for_retry();
}
}
}
```
**Why needed**: Avoids scanning all UpstreamBuilding partitions when upstreams complete.
## BuildState Responsibilities
What BuildState does:
- Maintain entity state machines (process events, transition states)
- Provide query interfaces (`get_want`, `list_partitions`, etc.)
- Maintain inverted indexes for efficient lookups
- Enforce invariants (panic on reference errors with context)
- Rebuild state from event log (replay)
What BuildState does NOT do:
- Make scheduling decisions (that's Orchestrator)
- Execute jobs (that's external processes)
- Generate UUIDs (done deterministically during event handling from job_run_id)
**Key insight**: BuildState is a pure state container. All coordination logic lives in Orchestrator.
## Want State Determination (Sensing)
When a want is created, it observes canonical partition states and transitions accordingly.
**Priority order** (first match wins):
1. If ANY canonical partition is Failed → New → Failed
2. If ANY canonical partition is UpstreamFailed → New → UpstreamFailed
3. If ALL canonical partitions exist AND are Live → New → Successful
4. If ANY canonical partition is Building → New → Building
5. If ANY canonical partition is UpstreamBuilding → New → UpstreamBuilding
6. If ANY canonical partition is UpForRetry → New → Idle (deps satisfied, ready to schedule)
7. Otherwise (partitions don't exist) → New → Idle
**Example**:
```rust
// Want W1 created for ["data/alpha", "data/beta"]
// canonical_partitions["data/alpha"] = uuid-1 (Building)
// canonical_partitions["data/beta"] = uuid-2 (Live)
// Result: W1 goes New → Building (rule 4: ANY partition Building)
// Want W2 created for ["data/gamma"]
// canonical_partitions["data/gamma"] doesn't exist
// Result: W2 goes New → Idle (rule 7: partition doesn't exist)
```
**Key insight**: Most wants go New → Idle because canonical partitions only exist when jobs are running or completed. This is correct: "nothing is building yet, ready to schedule."
## Schedulability vs Want State
**Want State**: Reflects current reality of canonical partitions
**Schedulability**: Orchestrator's decision logic for queuing jobs
**Not the same thing**:
```
Want W1: Idle → orchestrator schedules job → canonical partition becomes Building
Want W1: Idle → Building (event handling transitions it)
Want W2 arrives → sees canonical partition Building → New → Building
Orchestrator polls: both W1 and W2 are Building
Should orchestrator schedule another job? NO (lease held)
```
**Schedulability check**: A want is schedulable if canonical partition is:
- Doesn't exist (no lease), OR
- Tainted (invalid, needs rebuild), OR
- UpForRetry (lease released, deps satisfied)
**Not schedulable** if canonical partition is:
- Building (lease held, job running)
- UpstreamBuilding (lease held, waiting for deps)
**Implementation**:
```rust
fn is_schedulable(want: &Want, canonical_partitions: &BTreeMap<String, Uuid>) -> bool {
for partition_ref in &want.partitions {
if let Some(uuid) = canonical_partitions.get(partition_ref) {
let partition = partitions_by_uuid[uuid];
match partition.state {
Building | UpstreamBuilding => return false, // Lease held
Tainted | UpForRetry => continue, // Schedulable
_ => continue,
}
}
// Partition doesn't exist → schedulable
}
true
}
```
## Dependency Miss & Resolution Flow
The "dep miss" is the key mechanism for achieving multi-hop and complex data builds (traditionally solved via DAGs). When a job run fails due to missing upstream data, it generates a list of `MissingDeps`, which map the specific individual missing deps to the output partitions that needed them. This information enables databuild to create derivative wants, that will result in it scheduling jobs to build those partitions.
Complete flow when job encounters missing dependencies:
### 1. Job Reports Dep Miss
```
Job J1 building partition uuid-1 ("data/beta")
Discovers missing upstream: "data/alpha" not Live
Emits JobRunDepMissEventV1 {
missing_deps: [
MissingDeps {
missing: [ PartitionRef { ref: "data/alpha" } ],
impacted: PartitionRef { ref: "data/beta" }
}, ...
], ...
}
```
### 2. Partition Transitions to UpstreamBuilding
```rust
// handle_job_run_dep_miss_event()
partition = partition.transition_building_to_upstream_building(missing_deps);
partition.state.missing_deps = ["data/alpha"];
// Update inverted index
for upstream_ref in missing_deps {
downstream_waiting.entry(upstream_ref).or_default().push(uuid-1);
}
// downstream_waiting["data/alpha"] = [uuid-1]
// Partition remains canonical (lease still held)
// Job run transitions to DepMissed state
```
### 3. Want Transitions
```rust
// All wants waiting for "data/beta" transition Building → UpstreamBuilding
for want_id in wants_for_partition["data/beta"] {
want = want.transition_building_to_upstream_building(derivative_want_ids);
}
```
### 4. Derivative Wants Created
```rust
// System creates derivative want for missing upstream
derivative_want = Want::new(["data/alpha"]);
// This want goes New → Idle (alpha doesn't exist) → schedulable
```
### 5. Upstream Builds Complete or Fail
**Success case**:
```rust
// Derivative want builds "data/alpha" → partition becomes Live
// Look up downstream partitions waiting for "data/alpha"
if let Some(waiting_uuids) = downstream_waiting.get("data/alpha") {
for uuid in waiting_uuids {
let partition = partitions_by_uuid.get_mut(uuid).unwrap();
// Check if ALL missing deps now satisfied
let all_satisfied = partition.state.missing_deps.iter().all(|dep_ref| {
canonical_partitions.get(dep_ref)
.and_then(|uuid| partitions_by_uuid.get(uuid))
.map(|p| p.is_live())
.unwrap_or(false)
});
if all_satisfied {
partition = partition.transition_to_up_for_retry();
// Transition wants: UpstreamBuilding → Idle
}
}
}
```
**Failure case**:
```rust
// Upstream partition "data/alpha" transitions to Failed
if let Some(waiting_uuids) = downstream_waiting.get("data/alpha") {
for uuid in waiting_uuids {
let partition = partitions_by_uuid.get_mut(uuid).unwrap();
if matches!(partition, Partition::UpstreamBuilding(_)) {
partition = partition.transition_to_upstream_failed();
// Transition wants: UpstreamBuilding → UpstreamFailed
}
}
}
```
### 6. Want Becomes Schedulable
```rust
// Partition uuid-1 now in UpForRetry state
// Wants transition UpstreamBuilding → Idle
// Orchestrator polls, sees Idle wants with UpForRetry canonical partition → schedulable
// New job J2 queued → fresh uuid-2 generated for "data/beta"
// Partition uuid-2 created in Building state, replaces uuid-1 in canonical_partitions
// Partition uuid-1 remains in partitions_by_uuid (historical record)
```
**Key properties**:
- `downstream_waiting` enables O(1) lookup (no scanning all partitions)
- Failure propagates down dependency chain automatically
- Lease mechanism prevents concurrent retry attempts
- Historical partition instances preserved for lineage
## Orchestrator Responsibilities
The Orchestrator coordinates execution but maintains no state:
**Core loop**:
1. Poll BuildState for schedulable wants: `build_state.list_wants()` filtered by schedulability
2. Make scheduling decisions (respect leases, check resources, etc.)
3. Derive partition UUIDs for job: `derive_partition_uuid(job_run_id, partition_ref)`
4. Emit JobRunBufferEventV1 with job_run_id and partition_refs
5. BuildState processes event → creates partitions in Building state → updates canonical pointers → transitions wants
**Does NOT**:
- Maintain its own state (always queries BuildState)
- Know about partition UUIDs before emitting event (derives deterministically)
- Track want-partition relationships (uses inverted index)
**Separation rationale**:
- BuildState: source of truth for state
- Orchestrator: coordination logic
- Clear boundary enables testing, reasoning, replay
## Design Principles & Invariants
### 1. Compile-Time Correctness First
Invalid states should be unrepresentable. Type-state pattern enforces valid transitions at compile time.
Example: Cannot call `complete()` on a partition that isn't Building.
### 2. Runtime Panics for Invariant Violations
Reference errors and index inconsistencies represent BuildState bugs, not invalid input. Panic with context.
Example: `partitions_by_uuid[uuid]` missing → panic with "Partition UUID {uuid} referenced by canonical_partitions but not in partitions_by_uuid"
### 3. ECS Over OOP
Flat collections with inverted indexes beat nested object hierarchies for flexibility and query performance.
### 4. Data Structure Follows Access Patterns
Use inverted indexes where efficient reverse lookup is needed (`wants_for_partition`, `downstream_waiting`).
### 5. Events Represent Reality
Events encode real things: job processes started, dependency misses occurred, user requests received. Not speculative.
### 6. No Backwards Compatibility Hacks
Clean breaks preferred over technical debt. Code should be honest about state.
### 7. Fail Fast with Context
Better to panic immediately with rich context than silently corrupt state or fail later mysteriously.
### 8. Type-State for Self-Documentation
Function signatures encode preconditions: `fn schedule(want: WantWithState<IdleState>)` vs `fn schedule(want: Want)`.
## Summary
BuildState is a type-safe, event-sourced state machine using ECS patterns:
- **Compile-time correctness**: Invalid states unrepresentable
- **Flat data structures**: Collections + inverted indexes, not nested objects
- **Temporal identity**: UUID-based partition instances + canonical refs
- **Lease mechanism**: State encodes schedulability (Building/UpstreamBuilding hold lease)
- **Efficient lookups**: O(1) reverse queries via inverted indexes
- **Clear separation**: BuildState maintains state, Orchestrator coordinates
The architecture prioritizes fast feedback during development (compile errors), clear semantics (explicit states), and correctness (type-safe transitions).

View file

@ -86,3 +86,4 @@ def lookup_job_for_partition(partition_ref: str) -> str:
## Notes / Tips
- Rust dependencies are implemented via rules_rust, so new dependencies should be added in the `MODULE.bazel` file.
- Designs/plans should rarely include code snippets, outside of specifying interfaces or very specific changes.

View file

@ -49,6 +49,10 @@ crate.spec(
package = "uuid",
version = "1.0",
)
crate.spec(
package = "sha2",
version = "0.10",
)
crate.spec(
features = ["bundled"],
package = "rusqlite",

File diff suppressed because one or more lines are too long

View file

@ -39,6 +39,7 @@ rust_library(
"@crates//:tower-http",
"@crates//:tracing",
"@crates//:uuid",
"@crates//:sha2",
],
)

File diff suppressed because it is too large Load diff

View file

@ -176,13 +176,27 @@ message WantStatus {
string name = 2;
}
enum WantStatusCode {
WantIdle = 0;
WantBuilding = 1;
WantFailed = 2;
WantSuccessful = 3;
WantCanceled = 4;
WantUpstreamBuilding = 5;
WantUpstreamFailed = 6;
// Wants are created in this state, and they should immediately transition to another state based on the current state
// of partitions they reference.
WantNew = 0;
// The want is not building, but not blocked from building either - it is schedulable.
WantIdle = 1;
// No referenced partitions are failed, and at least one referenced partition is building.
WantBuilding = 2;
// At least 1 referenced partition is failed.
WantFailed = 3;
// All referenced partitions are live.
WantSuccessful = 4;
// The want itself has been canceled. It should no longer influence job scheduling, and any existing jobs not building
// partitions requested by other active wants should be canceled.
WantCanceled = 5;
// A referenced partition's building job failed with a dep miss, and a derivative want is now building the missed
// partitions. This want is waiting for missed partitions to be live before going back to Idle and becoming
// schedulable again.
WantUpstreamBuilding = 6;
// After entering WantUpstreamBuilding state, one of the derivative want's triggered jobs has failed, meaning this
// want will not be able to succeed.
WantUpstreamFailed = 7;
}
message WantDetail {
@ -213,23 +227,42 @@ message PartitionDetail {
// Wants that reference this partition
repeated string want_ids = 5;
repeated string taint_ids = 6;
// The unique identifier for this partition instance (UUID as string)
// Each time a partition is built, it gets a new UUID derived from the job_run_id
string uuid = 7;
}
message PartitionStatus {
PartitionStatusCode code = 1;
string name = 2;
}
enum PartitionStatusCode {
// TODO how do we avoid copying job states here? This is essentially a union of job states and taints?
PartitionUnknown = 0;
PartitionWanted = 1;
PartitionBuilding = 2;
PartitionLive = 3;
PartitionFailed = 4;
PartitionTainted = 5;
// Work is in progress to produce the partition. This state acts as a leasing mechanism: the orchestrator will not
// schedule other jobs to produce this partition while it is in Building; e.g., a dep miss may have occurred when
// trying to build the partition, and jobs for the upstreams may be in progress, and this state enables us to signal
// that we shouldn't reschedule
PartitionBuilding = 0;
// The partition has been produced and is currently valid.
PartitionLive = 1;
// Building of the partition has failed in a way that is not retryable.
PartitionFailed = 2;
// The partition has been marked as tainted. It shouldn't be read, and if any active wants reference it, a job to
// build it should be scheduled.
PartitionTainted = 3;
}
message TaintDetail {
// TODO
// The unique identifier for this taint
string taint_id = 1;
// The root taint ID (for taint hierarchies)
string root_taint_id = 2;
// The parent taint ID (for taint hierarchies)
string parent_taint_id = 3;
// The partitions affected by this taint
repeated PartitionRef partitions = 4;
// Source of the taint event
EventSource source = 5;
// Optional comment describing the taint
optional string comment = 6;
}
message JobRunStatus {
@ -237,11 +270,17 @@ message JobRunStatus {
string name = 2;
}
enum JobRunStatusCode {
// The job run has been queued, and will be run at some point in the future (e.g. pool slot opens, etc).
JobRunQueued = 0;
// The job run is now running.
JobRunRunning = 1;
// The job run has failed for a non-recoverable reason.
JobRunFailed = 2;
// The job run has been canceled.
JobRunCanceled = 3;
// The job run succeeded.
JobRunSucceeded = 4;
// The job run failed due to specific missing deps, emitting a JobRunMissingDeps.
JobRunDepMiss = 5;
}
message JobRunDetail {

View file

@ -1,14 +1,35 @@
use crate::{PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use uuid::Uuid;
/// State: Partition has been referenced but not yet built
#[derive(Debug, Clone)]
pub struct MissingState {}
/// Derive a deterministic UUID from job_run_id and partition_ref.
/// This ensures replay produces the same UUIDs.
pub fn derive_partition_uuid(job_run_id: &str, partition_ref: &str) -> Uuid {
let mut hasher = Sha256::new();
hasher.update(job_run_id.as_bytes());
hasher.update(partition_ref.as_bytes());
let hash = hasher.finalize();
Uuid::from_slice(&hash[0..16]).expect("SHA256 produces at least 16 bytes")
}
/// State: Partition is currently being built by one or more jobs
/// State: Partition is currently being built by a job
#[derive(Debug, Clone)]
pub struct BuildingState {
pub building_by: Vec<String>, // job_run_ids
pub job_run_id: String,
}
/// State: Partition is waiting for upstream dependencies to be built
#[derive(Debug, Clone)]
pub struct UpstreamBuildingState {
pub job_run_id: String,
pub missing_deps: Vec<PartitionRef>, // partition refs that are missing
}
/// State: Upstream dependencies are satisfied, partition is ready to retry building
#[derive(Debug, Clone)]
pub struct UpForRetryState {
pub original_job_run_id: String, // job that had the dep miss
}
/// State: Partition has been successfully built
@ -18,13 +39,20 @@ pub struct LiveState {
pub built_by: String, // job_run_id
}
/// State: Partition build failed
/// State: Partition build failed (hard failure, not retryable)
#[derive(Debug, Clone)]
pub struct FailedState {
pub failed_at: u64,
pub failed_by: String, // job_run_id
}
/// State: Partition failed because upstream dependencies failed (terminal)
#[derive(Debug, Clone)]
pub struct UpstreamFailedState {
pub failed_at: u64,
pub failed_upstream_refs: Vec<PartitionRef>, // which upstream partitions failed
}
/// State: Partition has been marked as invalid/tainted
#[derive(Debug, Clone)]
pub struct TaintedState {
@ -32,21 +60,25 @@ pub struct TaintedState {
pub taint_ids: Vec<String>,
}
/// Generic partition struct parameterized by state
/// Generic partition struct parameterized by state.
/// Each partition has a unique UUID derived from the job_run_id that created it.
#[derive(Debug, Clone)]
pub struct PartitionWithState<S> {
pub uuid: Uuid,
pub partition_ref: PartitionRef,
pub want_ids: Vec<String>,
pub state: S,
}
/// Wrapper enum for storing partitions in collections
/// Wrapper enum for storing partitions in collections.
/// Note: Missing state has been removed - partitions are only created when jobs start building them.
#[derive(Debug, Clone)]
pub enum Partition {
Missing(PartitionWithState<MissingState>),
Building(PartitionWithState<BuildingState>),
UpstreamBuilding(PartitionWithState<UpstreamBuildingState>),
UpForRetry(PartitionWithState<UpForRetryState>),
Live(PartitionWithState<LiveState>),
Failed(PartitionWithState<FailedState>),
UpstreamFailed(PartitionWithState<UpstreamFailedState>),
Tainted(PartitionWithState<TaintedState>),
}
@ -54,14 +86,6 @@ pub enum Partition {
/// is critical that these be treated with respect, not just summoned because it's convenient.
/// These should be created ephemerally from typestate objects via .get_ref() and used
/// immediately — never stored long-term, as partition state can change.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct MissingPartitionRef(pub PartitionRef);
impl PartitionWithState<MissingState> {
pub fn get_ref(&self) -> MissingPartitionRef {
MissingPartitionRef(self.partition_ref.clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct BuildingPartitionRef(pub PartitionRef);
impl PartitionWithState<BuildingState> {
@ -70,6 +94,22 @@ impl PartitionWithState<BuildingState> {
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct UpstreamBuildingPartitionRef(pub PartitionRef);
impl PartitionWithState<UpstreamBuildingState> {
pub fn get_ref(&self) -> UpstreamBuildingPartitionRef {
UpstreamBuildingPartitionRef(self.partition_ref.clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct UpForRetryPartitionRef(pub PartitionRef);
impl PartitionWithState<UpForRetryState> {
pub fn get_ref(&self) -> UpForRetryPartitionRef {
UpForRetryPartitionRef(self.partition_ref.clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct LivePartitionRef(pub PartitionRef);
impl PartitionWithState<LiveState> {
@ -86,6 +126,14 @@ impl PartitionWithState<FailedState> {
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct UpstreamFailedPartitionRef(pub PartitionRef);
impl PartitionWithState<UpstreamFailedState> {
pub fn get_ref(&self) -> UpstreamFailedPartitionRef {
UpstreamFailedPartitionRef(self.partition_ref.clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct TaintedPartitionRef(pub PartitionRef);
impl PartitionWithState<TaintedState> {
@ -94,61 +142,101 @@ impl PartitionWithState<TaintedState> {
}
}
// Type-safe transition methods for MissingState
impl PartitionWithState<MissingState> {
/// Transition from Missing to Building when a job starts building this partition
pub fn start_building(self, job_run_id: String) -> PartitionWithState<BuildingState> {
// Type-safe transition methods for BuildingState
impl PartitionWithState<BuildingState> {
/// Create a new partition directly in Building state.
/// UUID is derived from job_run_id + partition_ref for deterministic replay.
pub fn new(job_run_id: String, partition_ref: PartitionRef) -> Self {
let uuid = derive_partition_uuid(&job_run_id, &partition_ref.r#ref);
PartitionWithState {
uuid,
partition_ref,
state: BuildingState { job_run_id },
}
}
/// Transition from Building to Live when a job successfully completes
pub fn complete(self, timestamp: u64) -> PartitionWithState<LiveState> {
PartitionWithState {
uuid: self.uuid,
partition_ref: self.partition_ref,
want_ids: self.want_ids,
state: BuildingState {
building_by: vec![job_run_id],
state: LiveState {
built_at: timestamp,
built_by: self.state.job_run_id,
},
}
}
/// Transition from Building to Failed when a job fails (hard failure)
pub fn fail(self, timestamp: u64) -> PartitionWithState<FailedState> {
PartitionWithState {
uuid: self.uuid,
partition_ref: self.partition_ref,
state: FailedState {
failed_at: timestamp,
failed_by: self.state.job_run_id,
},
}
}
/// Transition from Building to UpstreamBuilding when job reports missing dependencies
pub fn dep_miss(
self,
missing_deps: Vec<PartitionRef>,
) -> PartitionWithState<UpstreamBuildingState> {
PartitionWithState {
uuid: self.uuid,
partition_ref: self.partition_ref,
state: UpstreamBuildingState {
job_run_id: self.state.job_run_id,
missing_deps,
},
}
}
}
// Type-safe transition methods for BuildingState
impl PartitionWithState<BuildingState> {
/// Transition from Building to Live when a job successfully completes
pub fn complete(self, job_run_id: String, timestamp: u64) -> PartitionWithState<LiveState> {
// Type-safe transition methods for UpstreamBuildingState
impl PartitionWithState<UpstreamBuildingState> {
/// Transition from UpstreamBuilding to UpForRetry when all upstream deps are satisfied
pub fn upstreams_satisfied(self) -> PartitionWithState<UpForRetryState> {
PartitionWithState {
uuid: self.uuid,
partition_ref: self.partition_ref,
want_ids: self.want_ids,
state: LiveState {
built_at: timestamp,
built_by: job_run_id,
state: UpForRetryState {
original_job_run_id: self.state.job_run_id,
},
}
}
/// Transition from Building to Failed when a job fails
pub fn fail(self, job_run_id: String, timestamp: u64) -> PartitionWithState<FailedState> {
/// Transition from UpstreamBuilding to UpstreamFailed when an upstream dep fails
pub fn upstream_failed(
self,
failed_upstream_refs: Vec<PartitionRef>,
timestamp: u64,
) -> PartitionWithState<UpstreamFailedState> {
PartitionWithState {
uuid: self.uuid,
partition_ref: self.partition_ref,
want_ids: self.want_ids,
state: FailedState {
state: UpstreamFailedState {
failed_at: timestamp,
failed_by: job_run_id,
failed_upstream_refs,
},
}
}
/// Add another job to the list of jobs building this partition
pub fn add_building_job(mut self, job_run_id: String) -> Self {
if !self.state.building_by.contains(&job_run_id) {
self.state.building_by.push(job_run_id);
}
self
/// Check if a specific upstream ref is in our missing deps
pub fn is_waiting_for(&self, upstream_ref: &str) -> bool {
self.state
.missing_deps
.iter()
.any(|d| d.r#ref == upstream_ref)
}
/// Transition from Building back to Missing when a job discovers missing dependencies
pub fn reset_to_missing(self) -> PartitionWithState<MissingState> {
PartitionWithState {
partition_ref: self.partition_ref,
want_ids: self.want_ids,
state: MissingState {},
}
/// Remove a satisfied upstream from missing deps. Returns remaining count.
pub fn satisfy_upstream(mut self, upstream_ref: &str) -> (Self, usize) {
self.state.missing_deps.retain(|r| r.r#ref != upstream_ref);
let remaining = self.state.missing_deps.len();
(self, remaining)
}
}
@ -157,8 +245,8 @@ impl PartitionWithState<LiveState> {
/// Transition from Live to Tainted when a taint is applied
pub fn taint(self, taint_id: String, timestamp: u64) -> PartitionWithState<TaintedState> {
PartitionWithState {
uuid: self.uuid,
partition_ref: self.partition_ref,
want_ids: self.want_ids,
state: TaintedState {
tainted_at: timestamp,
taint_ids: vec![taint_id],
@ -180,100 +268,115 @@ impl PartitionWithState<TaintedState> {
// Helper methods on the Partition enum
impl Partition {
/// Create a new partition in the Missing state
pub fn new_missing(partition_ref: PartitionRef) -> Self {
Partition::Missing(PartitionWithState {
partition_ref,
want_ids: vec![],
state: MissingState {},
})
/// Get the UUID from any state
pub fn uuid(&self) -> Uuid {
match self {
Partition::Building(p) => p.uuid,
Partition::UpstreamBuilding(p) => p.uuid,
Partition::UpForRetry(p) => p.uuid,
Partition::Live(p) => p.uuid,
Partition::Failed(p) => p.uuid,
Partition::UpstreamFailed(p) => p.uuid,
Partition::Tainted(p) => p.uuid,
}
}
/// Get the partition reference from any state
pub fn partition_ref(&self) -> &PartitionRef {
match self {
Partition::Missing(p) => &p.partition_ref,
Partition::Building(p) => &p.partition_ref,
Partition::UpstreamBuilding(p) => &p.partition_ref,
Partition::UpForRetry(p) => &p.partition_ref,
Partition::Live(p) => &p.partition_ref,
Partition::Failed(p) => &p.partition_ref,
Partition::UpstreamFailed(p) => &p.partition_ref,
Partition::Tainted(p) => &p.partition_ref,
}
}
/// Get want_ids from any state
pub fn want_ids(&self) -> &Vec<String> {
match self {
Partition::Missing(p) => &p.want_ids,
Partition::Building(p) => &p.want_ids,
Partition::Live(p) => &p.want_ids,
Partition::Failed(p) => &p.want_ids,
Partition::Tainted(p) => &p.want_ids,
}
}
/// Get mutable want_ids from any state
pub fn want_ids_mut(&mut self) -> &mut Vec<String> {
match self {
Partition::Missing(p) => &mut p.want_ids,
Partition::Building(p) => &mut p.want_ids,
Partition::Live(p) => &mut p.want_ids,
Partition::Failed(p) => &mut p.want_ids,
Partition::Tainted(p) => &mut p.want_ids,
}
}
/// Check if partition is in Live state
pub fn is_live(&self) -> bool {
matches!(self, Partition::Live(_))
}
/// Check if partition is satisfied (Live or Tainted both count as "available")
pub fn is_satisfied(&self) -> bool {
matches!(self, Partition::Live(_) | Partition::Tainted(_))
}
/// Check if partition is in a terminal state (Live, Failed, or Tainted)
/// Check if partition is in a terminal state (Live, Failed, UpstreamFailed, or Tainted)
pub fn is_terminal(&self) -> bool {
matches!(
self,
Partition::Live(_) | Partition::Failed(_) | Partition::Tainted(_)
Partition::Live(_)
| Partition::Failed(_)
| Partition::UpstreamFailed(_)
| Partition::Tainted(_)
)
}
/// Check if partition is currently being built
/// Check if partition is currently being built (includes UpstreamBuilding as it holds a "lease")
pub fn is_building(&self) -> bool {
matches!(self, Partition::Building(_))
matches!(
self,
Partition::Building(_) | Partition::UpstreamBuilding(_)
)
}
/// Check if partition is missing (referenced but not built)
pub fn is_missing(&self) -> bool {
matches!(self, Partition::Missing(_))
/// Check if partition is in UpForRetry state (ready to be rebuilt)
pub fn is_up_for_retry(&self) -> bool {
matches!(self, Partition::UpForRetry(_))
}
/// Convert to PartitionDetail for API responses and queries
/// Check if partition is failed (hard failure)
pub fn is_failed(&self) -> bool {
matches!(self, Partition::Failed(_))
}
/// Check if partition is upstream failed
pub fn is_upstream_failed(&self) -> bool {
matches!(self, Partition::UpstreamFailed(_))
}
/// Check if partition is tainted
pub fn is_tainted(&self) -> bool {
matches!(self, Partition::Tainted(_))
}
/// Convert to PartitionDetail for API responses and queries.
/// Note: want_ids is now empty - this will be populated by BuildState from the inverted index.
pub fn to_detail(&self) -> PartitionDetail {
match self {
Partition::Missing(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
status: Some(PartitionStatus {
code: PartitionStatusCode::PartitionWanted as i32,
name: "PartitionWanted".to_string(),
}),
want_ids: p.want_ids.clone(),
job_run_ids: vec![],
taint_ids: vec![],
last_updated_timestamp: None,
},
Partition::Building(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
status: Some(PartitionStatus {
code: PartitionStatusCode::PartitionBuilding as i32,
name: "PartitionBuilding".to_string(),
}),
want_ids: p.want_ids.clone(),
job_run_ids: p.state.building_by.clone(),
want_ids: vec![], // Populated by BuildState
job_run_ids: vec![p.state.job_run_id.clone()],
taint_ids: vec![],
last_updated_timestamp: None,
uuid: p.uuid.to_string(),
},
Partition::UpstreamBuilding(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
status: Some(PartitionStatus {
code: PartitionStatusCode::PartitionBuilding as i32, // Use Building status for UpstreamBuilding
name: "PartitionUpstreamBuilding".to_string(),
}),
want_ids: vec![], // Populated by BuildState
job_run_ids: vec![p.state.job_run_id.clone()],
taint_ids: vec![],
last_updated_timestamp: None,
uuid: p.uuid.to_string(),
},
Partition::UpForRetry(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
status: Some(PartitionStatus {
code: PartitionStatusCode::PartitionBuilding as i32, // Still "building" conceptually
name: "PartitionUpForRetry".to_string(),
}),
want_ids: vec![], // Populated by BuildState
job_run_ids: vec![p.state.original_job_run_id.clone()],
taint_ids: vec![],
last_updated_timestamp: None,
uuid: p.uuid.to_string(),
},
Partition::Live(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
@ -281,10 +384,11 @@ impl Partition {
code: PartitionStatusCode::PartitionLive as i32,
name: "PartitionLive".to_string(),
}),
want_ids: p.want_ids.clone(),
want_ids: vec![], // Populated by BuildState
job_run_ids: vec![p.state.built_by.clone()],
taint_ids: vec![],
last_updated_timestamp: Some(p.state.built_at),
uuid: p.uuid.to_string(),
},
Partition::Failed(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
@ -292,10 +396,23 @@ impl Partition {
code: PartitionStatusCode::PartitionFailed as i32,
name: "PartitionFailed".to_string(),
}),
want_ids: p.want_ids.clone(),
want_ids: vec![], // Populated by BuildState
job_run_ids: vec![p.state.failed_by.clone()],
taint_ids: vec![],
last_updated_timestamp: Some(p.state.failed_at),
uuid: p.uuid.to_string(),
},
Partition::UpstreamFailed(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
status: Some(PartitionStatus {
code: PartitionStatusCode::PartitionFailed as i32, // Use Failed status
name: "PartitionUpstreamFailed".to_string(),
}),
want_ids: vec![], // Populated by BuildState
job_run_ids: vec![],
taint_ids: vec![],
last_updated_timestamp: Some(p.state.failed_at),
uuid: p.uuid.to_string(),
},
Partition::Tainted(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
@ -303,11 +420,92 @@ impl Partition {
code: PartitionStatusCode::PartitionTainted as i32,
name: "PartitionTainted".to_string(),
}),
want_ids: p.want_ids.clone(),
want_ids: vec![], // Populated by BuildState
job_run_ids: vec![],
taint_ids: p.state.taint_ids.clone(),
last_updated_timestamp: Some(p.state.tainted_at),
uuid: p.uuid.to_string(),
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_derive_partition_uuid_deterministic() {
let uuid1 = derive_partition_uuid("job-123", "data/beta");
let uuid2 = derive_partition_uuid("job-123", "data/beta");
assert_eq!(uuid1, uuid2);
}
#[test]
fn test_derive_partition_uuid_different_inputs() {
let uuid1 = derive_partition_uuid("job-123", "data/beta");
let uuid2 = derive_partition_uuid("job-456", "data/beta");
let uuid3 = derive_partition_uuid("job-123", "data/alpha");
assert_ne!(uuid1, uuid2);
assert_ne!(uuid1, uuid3);
assert_ne!(uuid2, uuid3);
}
#[test]
fn test_partition_building_transitions() {
let partition = PartitionWithState::<BuildingState>::new(
"job-123".to_string(),
PartitionRef {
r#ref: "data/beta".to_string(),
},
);
// Can transition to Live
let live = partition.clone().complete(1000);
assert_eq!(live.state.built_at, 1000);
assert_eq!(live.state.built_by, "job-123");
// Can transition to Failed
let failed = partition.clone().fail(2000);
assert_eq!(failed.state.failed_at, 2000);
assert_eq!(failed.state.failed_by, "job-123");
// Can transition to UpstreamBuilding (dep miss)
let upstream_building = partition.dep_miss(vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}]);
assert_eq!(upstream_building.state.missing_deps.len(), 1);
assert_eq!(upstream_building.state.missing_deps[0].r#ref, "data/alpha");
}
#[test]
fn test_upstream_building_transitions() {
let building = PartitionWithState::<BuildingState>::new(
"job-123".to_string(),
PartitionRef {
r#ref: "data/beta".to_string(),
},
);
let upstream_building = building.dep_miss(vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}]);
// Can transition to UpForRetry
let up_for_retry = upstream_building.clone().upstreams_satisfied();
assert_eq!(up_for_retry.state.original_job_run_id, "job-123");
// Can transition to UpstreamFailed
let upstream_failed = upstream_building.upstream_failed(
vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
3000,
);
assert_eq!(upstream_failed.state.failed_at, 3000);
assert_eq!(upstream_failed.state.failed_upstream_refs.len(), 1);
assert_eq!(
upstream_failed.state.failed_upstream_refs[0].r#ref,
"data/alpha"
);
}
}

View file

@ -4,6 +4,10 @@ use crate::{EventSource, PartitionRef, WantCreateEventV1, WantDetail, WantStatus
use serde::{Deserialize, Serialize};
use uuid::Uuid;
/// State: Want has just been created, state not yet determined by sensing partition states
#[derive(Debug, Clone)]
pub struct NewState {}
/// State: Want has been created and is ready to be scheduled
#[derive(Debug, Clone)]
pub struct IdleState {}
@ -95,6 +99,7 @@ pub struct WantWithState<S> {
/// Wrapper enum for storing wants in collections
#[derive(Debug, Clone)]
pub enum Want {
New(WantWithState<NewState>),
Idle(WantWithState<IdleState>),
Building(WantWithState<BuildingState>),
UpstreamBuilding(WantWithState<UpstreamBuildingState>),
@ -108,6 +113,14 @@ pub enum Want {
/// is critical that these be treated with respect, not just summoned because it's convenient.
/// These should be created ephemerally from typestate objects via .get_ref() and used
/// immediately — never stored long-term, as partition state can change.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct NewWantId(pub String);
impl WantWithState<NewState> {
pub fn get_id(&self) -> NewWantId {
NewWantId(self.want.want_id.clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct IdleWantId(pub String);
impl WantWithState<IdleState> {
@ -164,8 +177,8 @@ impl WantWithState<CanceledState> {
}
}
// From impl for creating want from event
impl From<WantCreateEventV1> for WantWithState<IdleState> {
// From impl for creating want from event - creates in New state for sensing
impl From<WantCreateEventV1> for WantWithState<NewState> {
fn from(event: WantCreateEventV1) -> Self {
WantWithState {
want: WantInfo {
@ -178,9 +191,94 @@ impl From<WantCreateEventV1> for WantWithState<IdleState> {
comment: event.comment,
last_updated_at: current_timestamp(),
},
state: NewState {},
}
}
}
// Type-safe transition methods for NewState
impl WantWithState<NewState> {
/// Transition from New to Idle when partitions don't exist or are ready to schedule
pub fn to_idle(self) -> WantWithState<IdleState> {
WantWithState {
want: self.want.updated_timestamp(),
state: IdleState {},
}
}
/// Transition from New to Building when partitions are currently being built
pub fn to_building(self, started_at: u64) -> WantWithState<BuildingState> {
WantWithState {
want: self.want.updated_timestamp(),
state: BuildingState { started_at },
}
}
/// Transition from New to Successful when all partitions are already Live
pub fn to_successful(self, completed_at: u64) -> WantWithState<SuccessfulState> {
WantWithState {
want: self.want.updated_timestamp(),
state: SuccessfulState { completed_at },
}
}
/// Transition from New to Failed when any partition has failed
pub fn to_failed(
self,
failed_partition_refs: Vec<PartitionRef>,
reason: String,
) -> WantWithState<FailedState> {
WantWithState {
want: self.want.updated_timestamp(),
state: FailedState {
failed_at: current_timestamp(),
failed_partition_refs,
failure_reason: reason,
},
}
}
/// Transition from New to UpstreamBuilding when partitions are waiting for upstream deps
pub fn to_upstream_building(
self,
upstream_want_ids: Vec<String>,
) -> WantWithState<UpstreamBuildingState> {
WantWithState {
want: self.want.updated_timestamp(),
state: UpstreamBuildingState { upstream_want_ids },
}
}
/// Transition from New to UpstreamFailed when upstream dependencies have failed
pub fn to_upstream_failed(
self,
failed_wants: Vec<String>,
) -> WantWithState<UpstreamFailedState> {
WantWithState {
want: self.want.updated_timestamp(),
state: UpstreamFailedState {
failed_at: current_timestamp(),
failed_wants,
},
}
}
/// Transition from New to Canceled when want is explicitly canceled
/// (Rarely used - wants are typically transitioned before cancel can arrive)
pub fn cancel(
self,
canceled_by: Option<EventSource>,
comment: Option<String>,
) -> WantWithState<CanceledState> {
WantWithState {
want: self.want.updated_timestamp(),
state: CanceledState {
canceled_at: current_timestamp(),
canceled_by,
comment,
},
}
}
}
// Type-safe transition methods for IdleState
@ -413,6 +511,7 @@ impl Want {
pub fn want(&self) -> &WantInfo {
match self {
Want::New(w) => &w.want,
Want::Idle(w) => &w.want,
Want::Building(w) => &w.want,
Want::UpstreamBuilding(w) => &w.want,
@ -436,6 +535,7 @@ impl Want {
comment: self.want().comment.clone(),
last_updated_timestamp: self.want().last_updated_at,
status: match self {
Want::New(_) => Some(WantStatusCode::WantNew.into()),
Want::Idle(_) => Some(WantStatusCode::WantIdle.into()),
Want::Building(_) => Some(WantStatusCode::WantBuilding.into()),
Want::UpstreamBuilding(_) => Some(WantStatusCode::WantUpstreamBuilding.into()),

View file

@ -0,0 +1,4 @@
If you look at the collection of state machines, objects, and state transitions that make up databuild, it's tempting to ask, should this be implemented by a SQL database instead with constraint checks and foreign keys?
Answer: it is tempting! But at its core, databuild models the coordinated state machines of Wants, Partitions, and Job Runs. Compile-time correctness checks of the integrity of the system that are possible via rust with type-state style object state machines and typed object IDs do more to mitigate system integrity risks than foreign key checks mitigate. And after all, we panic on reference errors already, so should detect related issues systematically.

View file

@ -1,577 +0,0 @@
# Web Server Implementation Plan
## Architecture Summary
**Concurrency Model: Event Log Separation**
- Orchestrator runs synchronously in dedicated thread, owns BEL exclusively
- Web server reads from shared BEL storage, sends write commands via channel
- No locks on hot path, orchestrator stays single-threaded
- Eventual consistency for reads (acceptable since builds take time anyway)
**Daemon Model:**
- Server binary started manually from workspace root (for now)
- Server tracks last request time, shuts down after idle timeout (default: 3 hours)
- HTTP REST on localhost random port
- Future: CLI can auto-discover/start server
---
## Thread Model
```
Main Process
├─ HTTP Server (tokio multi-threaded runtime)
│ ├─ Request handlers (async, read from BEL storage)
│ └─ Command sender (send writes to orchestrator)
└─ Orchestrator Thread (std::thread, synchronous)
├─ Receives commands via mpsc channel
├─ Owns BEL (exclusive mutable access)
└─ Runs existing step() loop
```
**Read Path (Low Latency):**
1. HTTP request → Axum handler
2. Read events from shared BEL storage (no lock contention)
3. Reconstruct BuildState from events (can cache this)
4. Return response
**Write Path (Strong Consistency):**
1. HTTP request → Axum handler
2. Send command via channel to orchestrator
3. Orchestrator processes command in its thread
4. Reply sent back via oneshot channel
5. Return response
**Why This Works:**
- Orchestrator remains completely synchronous (no refactoring needed)
- Reads scale horizontally (multiple handlers, no locks)
- Writes are serialized through orchestrator (consistent with current model)
- Event sourcing means reads can be eventually consistent
---
## Phase 1: Foundation - Make BEL Storage Thread-Safe
**Goal:** Allow BEL storage to be safely shared between orchestrator and web server
**Tasks:**
1. Add `Send + Sync` bounds to `BELStorage` trait
2. Wrap `SqliteBELStorage::connection` in `Arc<Mutex<Connection>>` or use r2d2 pool
3. Add read-only methods to BELStorage:
- `list_events(offset: usize, limit: usize) -> Vec<DataBuildEvent>`
- `get_event(event_id: u64) -> Option<DataBuildEvent>`
- `latest_event_id() -> u64`
4. Add builder method to reconstruct BuildState from events:
- `BuildState::from_events(events: &[DataBuildEvent]) -> Self`
**Files Modified:**
- `databuild/build_event_log.rs` - update trait and storage impls
- `databuild/build_state.rs` - add `from_events()` builder
**Acceptance Criteria:**
- `BELStorage` trait has `Send + Sync` bounds
- Can clone `Arc<SqliteBELStorage>` and use from multiple threads
- Can reconstruct BuildState from events without mutating storage
---
## Phase 2: Web Server - HTTP API with Axum
**Goal:** HTTP server serving read/write APIs
**Tasks:**
1. Add dependencies to MODULE.bazel:
```python
crate.spec(package = "tokio", features = ["full"], version = "1.0")
crate.spec(package = "axum", version = "0.7")
crate.spec(package = "tower", version = "0.4")
crate.spec(package = "tower-http", features = ["trace", "cors"], version = "0.5")
```
2. Create `databuild/http_server.rs` module with:
- `AppState` struct holding:
- `bel_storage: Arc<dyn BELStorage>` - shared read access
- `command_tx: mpsc::Sender<Command>` - channel to orchestrator
- `last_request_time: Arc<AtomicU64>` - for idle tracking
- Axum router with all endpoints
- Handler functions delegating to existing `api_handle_*` methods
3. API Endpoints:
```
GET /health → health check
GET /api/wants → list_wants
POST /api/wants → create_want
GET /api/wants/:id → get_want
DELETE /api/wants/:id → cancel_want
GET /api/partitions → list_partitions
GET /api/job_runs → list_job_runs
GET /api/job_runs/:id/logs/stdout → stream_logs (stub)
```
4. Handler pattern (reads):
```rust
async fn list_wants(
State(state): State<AppState>,
Query(params): Query<ListWantsParams>,
) -> Json<ListWantsResponse> {
// Read events from storage
let events = state.bel_storage.list_events(0, 10000)?;
// Reconstruct state
let build_state = BuildState::from_events(&events);
// Use existing API method
Json(build_state.list_wants(&params.into()))
}
```
5. Handler pattern (writes):
```rust
async fn create_want(
State(state): State<AppState>,
Json(req): Json<CreateWantRequest>,
) -> Json<CreateWantResponse> {
// Send command to orchestrator
let (reply_tx, reply_rx) = oneshot::channel();
state.command_tx.send(Command::CreateWant(req, reply_tx)).await?;
// Wait for orchestrator reply
let response = reply_rx.await?;
Json(response)
}
```
**Files Created:**
- `databuild/http_server.rs` - new module
**Files Modified:**
- `databuild/lib.rs` - add `pub mod http_server;`
- `MODULE.bazel` - add dependencies
**Acceptance Criteria:**
- Server starts on localhost random port, prints "Listening on http://127.0.0.1:XXXXX"
- All read endpoints return correct JSON responses
- Write endpoints return stub responses (Phase 4 will connect to orchestrator)
---
## Phase 3: CLI - HTTP Client
**Goal:** CLI that sends HTTP requests to running server
**Tasks:**
1. Add dependencies to MODULE.bazel:
```python
crate.spec(package = "clap", features = ["derive"], version = "4.0")
crate.spec(package = "reqwest", features = ["blocking", "json"], version = "0.11")
```
2. Create `databuild/bin/databuild.rs` main binary:
```rust
#[derive(Parser)]
#[command(name = "databuild")]
enum Cli {
/// Start the databuild server
Serve(ServeArgs),
/// Create a want for partitions
Build(BuildArgs),
/// Want operations
Want(WantCommand),
/// Stream job run logs
Logs(LogsArgs),
}
#[derive(Args)]
struct ServeArgs {
#[arg(long, default_value = "8080")]
port: u16,
}
#[derive(Subcommand)]
enum WantCommand {
Create(CreateWantArgs),
List,
Get { want_id: String },
Cancel { want_id: String },
}
```
3. Server address discovery:
- For now: hardcode `http://localhost:8080` or accept `--server-url` flag
- Future: read from `.databuild/server.json` file
4. HTTP client implementation:
```rust
fn list_wants(server_url: &str) -> Result<Vec<WantDetail>> {
let client = reqwest::blocking::Client::new();
let resp = client.get(&format!("{}/api/wants", server_url))
.send()?
.json::<ListWantsResponse>()?;
Ok(resp.data)
}
```
5. Commands:
- `databuild serve --port 8080` - Start server (blocks)
- `databuild build part1 part2` - Create want for partitions
- `databuild want list` - List all wants
- `databuild want get <id>` - Get specific want
- `databuild want cancel <id>` - Cancel want
- `databuild logs <job_run_id>` - Stream logs (stub)
**Files Created:**
- `databuild/bin/databuild.rs` - new CLI binary
**Files Modified:**
- `databuild/BUILD.bazel` - add `rust_binary` target for databuild CLI
**Acceptance Criteria:**
- Can run `databuild serve` to start server
- Can run `databuild want list` in another terminal and see wants
- Commands print pretty JSON or formatted tables
---
## Phase 4: Orchestrator Integration - Command Channel
**Goal:** Connect orchestrator to web server via message passing
**Tasks:**
1. Create `databuild/commands.rs` with command enum:
```rust
pub enum Command {
CreateWant(CreateWantRequest, oneshot::Sender<CreateWantResponse>),
CancelWant(CancelWantRequest, oneshot::Sender<CancelWantResponse>),
// Only write operations need commands
}
```
2. Update `Orchestrator`:
- Add `command_rx: mpsc::Receiver<Command>` field
- In `step()` method, before polling:
```rust
// Process all pending commands
while let Ok(cmd) = self.command_rx.try_recv() {
match cmd {
Command::CreateWant(req, reply) => {
let resp = self.bel.api_handle_want_create(req);
let _ = reply.send(resp); // Ignore send errors
}
// ... other commands
}
}
```
3. Create server startup function in `http_server.rs`:
```rust
pub fn start_server(
bel_storage: Arc<dyn BELStorage>,
port: u16,
) -> (JoinHandle<()>, mpsc::Sender<Command>) {
let (cmd_tx, cmd_rx) = mpsc::channel(100);
// Spawn orchestrator in background thread
let orch_bel = bel_storage.clone();
let orch_handle = std::thread::spawn(move || {
let mut orch = Orchestrator::new_with_commands(orch_bel, cmd_rx);
orch.join().unwrap();
});
// Start HTTP server in tokio runtime
let runtime = tokio::runtime::Runtime::new().unwrap();
let http_handle = runtime.spawn(async move {
let app_state = AppState {
bel_storage,
command_tx: cmd_tx.clone(),
last_request_time: Arc::new(AtomicU64::new(0)),
};
let app = create_router(app_state);
let addr = SocketAddr::from(([127, 0, 0, 1], port));
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
});
(http_handle, cmd_tx)
}
```
4. Update `databuild serve` command to use `start_server()`
**Files Created:**
- `databuild/commands.rs` - new module
**Files Modified:**
- `databuild/orchestrator.rs` - accept command channel, process in `step()`
- `databuild/http_server.rs` - send commands for writes
- `databuild/bin/databuild.rs` - use `start_server()` in `serve` command
**Acceptance Criteria:**
- Creating a want via HTTP actually creates it in BuildState
- Orchestrator processes commands without blocking its main loop
- Can observe wants being scheduled into job runs
---
## Phase 5: Daemon Lifecycle - Auto-Shutdown
**Goal:** Server shuts down gracefully after idle timeout
**Tasks:**
1. Update AppState to track last request time:
```rust
pub struct AppState {
bel_storage: Arc<dyn BELStorage>,
command_tx: mpsc::Sender<Command>,
last_request_time: Arc<AtomicU64>, // epoch millis
shutdown_tx: broadcast::Sender<()>,
}
```
2. Add Tower middleware to update timestamp:
```rust
async fn update_last_request_time<B>(
State(state): State<AppState>,
req: Request<B>,
next: Next<B>,
) -> Response {
state.last_request_time.store(
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
Ordering::Relaxed,
);
next.run(req).await
}
```
3. Background idle checker task:
```rust
tokio::spawn(async move {
let idle_timeout = Duration::from_secs(3 * 60 * 60); // 3 hours
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
let last_request = state.last_request_time.load(Ordering::Relaxed);
let now = SystemTime::now()...;
if now - last_request > idle_timeout.as_millis() as u64 {
eprintln!("Server idle for {} hours, shutting down",
idle_timeout.as_secs() / 3600);
shutdown_tx.send(()).unwrap();
break;
}
}
});
```
4. Graceful shutdown handling:
```rust
let app = create_router(state);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.with_graceful_shutdown(async {
shutdown_rx.recv().await.ok();
})
.await?;
```
5. Cleanup on shutdown:
- Orchestrator: finish current step, don't start new one
- HTTP server: stop accepting new connections, finish in-flight requests
- Log: "Shutdown complete"
**Files Modified:**
- `databuild/http_server.rs` - add idle tracking, shutdown logic
- `databuild/orchestrator.rs` - accept shutdown signal, check before each step
**Acceptance Criteria:**
- Server shuts down after configured idle timeout
- In-flight requests complete successfully during shutdown
- Shutdown is logged clearly
---
## Phase 6: Testing & Polish
**Goal:** End-to-end testing and production readiness
**Tasks:**
1. Integration tests:
```rust
#[test]
fn test_server_lifecycle() {
// Start server
let (handle, port) = start_test_server();
// Make requests
let wants = reqwest::blocking::get(
&format!("http://localhost:{}/api/wants", port)
).unwrap().json::<ListWantsResponse>().unwrap();
// Stop server
handle.shutdown();
}
```
2. Error handling improvements:
- Proper HTTP status codes (400, 404, 500)
- Structured error responses:
```json
{"error": "Want not found", "want_id": "abc123"}
```
- Add `tracing` crate for structured logging
3. Add CORS middleware for web app:
```rust
let cors = CorsLayer::new()
.allow_origin("http://localhost:3000".parse::<HeaderValue>().unwrap())
.allow_methods([Method::GET, Method::POST, Method::DELETE]);
app.layer(cors)
```
4. Health check endpoint:
```rust
async fn health() -> &'static str {
"OK"
}
```
5. Optional: Metrics endpoint (prometheus format):
```rust
async fn metrics() -> String {
format!(
"# HELP databuild_wants_total Total number of wants\n\
databuild_wants_total {}\n\
# HELP databuild_job_runs_total Total number of job runs\n\
databuild_job_runs_total {}\n",
want_count, job_run_count
)
}
```
**Files Created:**
- `databuild/tests/http_integration_test.rs` - integration tests
**Files Modified:**
- `databuild/http_server.rs` - add CORS, health, metrics, better errors
- `MODULE.bazel` - add `tracing` dependency
**Acceptance Criteria:**
- All endpoints have proper error handling
- CORS works for web app development
- Health check returns 200 OK
- Integration tests pass
---
## Future Enhancements (Not in Initial Plan)
### Workspace Auto-Discovery
- Walk up directory tree looking for `.databuild/` marker
- Store server metadata in `.databuild/server.json`:
```json
{
"pid": 12345,
"port": 54321,
"started_at": "2025-01-22T10:30:00Z",
"workspace_root": "/Users/stuart/Projects/databuild"
}
```
- CLI auto-starts server if not running
### Log Streaming (SSE)
- Implement `GET /api/job_runs/:id/logs/stdout?follow=true`
- Use Server-Sent Events for streaming
- Integrate with FileLogStore from logging.md plan
### State Caching
- Cache reconstructed BuildState for faster reads
- Invalidate cache when new events arrive
- Use `tokio::sync::RwLock<Option<(u64, BuildState)>>` where u64 is latest_event_id
### gRPC Support (If Needed)
- Add Tonic alongside Axum
- Share same orchestrator/command channel
- Useful for language-agnostic clients
---
## Dependencies Summary
New dependencies to add to `MODULE.bazel`:
```python
# Async runtime
crate.spec(package = "tokio", features = ["full"], version = "1.0")
# Web framework
crate.spec(package = "axum", version = "0.7")
crate.spec(package = "tower", version = "0.4")
crate.spec(package = "tower-http", features = ["trace", "cors"], version = "0.5")
# CLI
crate.spec(package = "clap", features = ["derive"], version = "4.0")
# HTTP client for CLI
crate.spec(package = "reqwest", features = ["blocking", "json"], version = "0.11")
# Logging
crate.spec(package = "tracing", version = "0.1")
crate.spec(package = "tracing-subscriber", version = "0.3")
```
---
## Estimated Timeline
- **Phase 1:** 2-3 hours (thread-safe BEL storage)
- **Phase 2:** 4-6 hours (HTTP server with Axum)
- **Phase 3:** 3-4 hours (basic CLI)
- **Phase 4:** 3-4 hours (orchestrator integration)
- **Phase 5:** 2-3 hours (idle shutdown)
- **Phase 6:** 4-6 hours (testing and polish)
**Total:** ~18-26 hours for complete implementation
---
## Design Rationale
### Why Event Log Separation?
**Alternatives Considered:**
1. **Shared State with RwLock**: Orchestrator holds write lock during `step()`, blocking all reads
2. **Actor Model**: Extra overhead from message passing for all operations
**Why Event Log Separation Wins:**
- Orchestrator stays completely synchronous (no refactoring)
- Reads don't block writes (eventual consistency acceptable for build system)
- Natural fit with event sourcing architecture
- Can cache reconstructed state for even better read performance
### Why Not gRPC?
- User requirement: "JSON is a must"
- REST is more debuggable (curl, browser dev tools)
- gRPC adds complexity without clear benefit
- Can add gRPC later if needed (both can coexist)
### Why Axum Over Actix?
- Better compile-time type safety (extractors)
- Cleaner middleware composition (Tower)
- Native async/await (Actix uses actor model internally)
- More ergonomic for this use case
### Why Per-Workspace Server?
- Isolation: builds in different projects don't interfere
- Simpler: no need to route requests by workspace
- Matches Bazel's model (users already understand it)
- Easier to reason about resource usage

View file

@ -0,0 +1,801 @@
# Partition Identity Refactor: Adding UUIDs for Temporal Consistency
## Problem Statement
### Current Architecture
Partitions are currently keyed only by their reference string (e.g., "data/beta"):
```rust
partitions: HashMap<String, Partition> // ref → partition
```
When a partition transitions through states (Missing → Building → Live → Tainted), **it's the same object mutating**. This creates several architectural problems:
### Core Issue: Lack of Temporal Identity
**The fundamental problem:** We cannot distinguish between "the partition being built now" and "the partition built yesterday" or "the partition that will be built tomorrow."
This manifests in several ways:
1. **Ambiguous Job-Partition Relationships**
- When job J completes, which partition instance did it build?
- If partition is rebuilt, we lose information about previous builds
- Can't answer: "What was the state of data/beta when job Y ran?"
2. **State Mutation Loss**
- Once a partition transitions Live → Tainted → Missing, the Live state information is lost
- Can't track "Partition P was built successfully by job J at time T"
- Lineage and provenance information disappears on each rebuild
3. **Redundant Data Structures** (Symptoms)
- `WantAttributedPartitions` in `JobRunDetail` exists to snapshot want-partition relationships
- Partitions carry `want_ids: Vec<String>` that get cleared/modified as partitions transition
- Jobs need to capture relationships at creation time because they can't be reliably reconstructed later
### Concrete Bug Example
The bug that led to this design discussion illustrates the problem:
```
1. Want 1 created for "data/beta" → partition becomes Building
2. Want 2 created for "data/beta" → but partition is ALREADY Building
3. Job has dep miss → creates derivative want
4. System expects all wants to be Building/UpstreamBuilding, but Want 2 is Idle → panic
```
**Root cause:** All wants reference the same mutable partition object. We can't distinguish:
- "The partition instance Want 1 triggered"
- "The partition instance Want 2 is waiting for"
- They're the same object, but semantically they represent different temporal relationships
## Proposed Solution: Partition UUIDs
### Architecture Changes
**Two-level indexing:**
```rust
// All partition instances, keyed by UUID
partitions_by_uuid: HashMap<Uuid, Partition>
// Current/canonical partition for each ref
canonical_partitions: HashMap<String, Uuid>
```
### Key Properties
1. **Immutable Identity**: Each partition build gets a unique UUID
- `Partition(uuid-1, ref="data/beta", state=Building)` is a distinct entity
- When rebuilt, create `Partition(uuid-2, ref="data/beta", state=Missing)`
- Both can coexist; uuid-1 represents historical fact, uuid-2 is current state
2. **Stable Job References**: Jobs reference the specific partition UUIDs they built
```rust
JobRunBufferEventV1 {
building_partition_uuids: [uuid-1, uuid-2] // Specific instances being built
}
```
3. **Wants Reference Refs**: Wants continue to reference partition refs, not UUIDs
```rust
WantCreateEventV1 {
partitions: ["data/beta"] // User-facing reference
}
// Want's state determined by canonical partition for "data/beta"
```
4. **Temporal Queries**: Can reconstruct state at any point
- "What was partition uuid-1's state when job J ran?" → Look up uuid-1, it's immutable
- "Which wants were waiting for data/beta at time T?" → Check canonical partition at T
- "What's the current state of data/beta?" → canonical_partitions["data/beta"] → uuid-2
## Benefits
### 1. Removes WantAttributedPartitions Redundancy
**Before:**
```rust
JobRunBufferEventV1 {
building_partitions: [PartitionRef("data/beta")],
// Redundant: snapshot want-partition relationship
servicing_wants: [WantAttributedPartitions {
want_id: "w1",
partitions: ["data/beta"]
}]
}
```
**After:**
```rust
JobRunBufferEventV1 {
building_partition_uuids: [uuid-1, uuid-2]
}
// To find serviced wants - use inverted index in BuildState
for uuid in job.building_partition_uuids {
let partition = partitions_by_uuid[uuid];
let partition_ref = &partition.partition_ref.r#ref;
// Look up wants via inverted index (not stored on partition)
if let Some(want_ids) = wants_for_partition.get(partition_ref) {
for want_id in want_ids {
// transition want
}
}
}
```
The relationship is **discoverable** via inverted index, not **baked-in** at event creation or **stored on partitions**.
**Key improvement:** Partitions don't store `want_ids`. This is cleaner separation of concerns:
- Want → Partition: Inherent (want defines partitions it wants)
- Partition → Want: Derived (maintained as inverted index in BuildState)
**Note on want state vs schedulability:**
- Want state (Building) reflects current reality: "my partitions are being built"
- Schedulability prevents duplicate jobs: "don't schedule another job if partitions already building"
- Both mechanisms needed: state for correctness, schedulability for efficiency
### 2. Proper State Semantics for Wants
**Current (problematic):**
```
Want 1 → triggers build → Building (owns the job somehow?)
Want 2 → sees partition Building → stays Idle (different from Want 1?)
Want 3 → same partition → also Idle
```
**With UUIDs and New state:**
```
Want 1 arrives → New → no canonical partition exists → Idle → schedulable
Orchestrator queues job → generates uuid-1 for "data/beta"
Job buffer event → creates Partition(uuid-1, "data/beta", Building)
→ updates canonical["data/beta"] = uuid-1
→ transitions Want 1: Idle → Building
Want 2 arrives → New → canonical["data/beta"] = uuid-1 (Building) → Building
Want 3 arrives → New → canonical["data/beta"] = uuid-1 (Building) → Building
Want 4 arrives → New → canonical["data/beta"] = uuid-1 (Building) → Building
```
All 4 wants have **identical relationship** to the canonical partition. The state reflects reality: "is the canonical partition for my ref being built?"
**Key insights:**
- Wants don't bind to UUIDs. They look up the canonical partition for their ref and base their state on that.
- New state makes state determination explicit: want creation → observe world → transition to appropriate state
### 3. Historical Lineage
```rust
// Track partition lineage over time
Partition {
uuid: uuid-3,
partition_ref: "data/beta",
previous_uuid: Some(uuid-2), // Link to previous instance
created_at: 1234567890,
state: Live,
produced_by_job: Some("job-xyz"),
}
```
Can answer:
- "What partitions existed for this ref over time?"
- "Which job produced this specific partition instance?"
- "What was the dependency chain when this partition was built?"
## Implementation Plan
### Phase 1: Add UUID Infrastructure (Non-Breaking)
**Goals:**
- Add UUID field to Partition
- Create dual indexing (by UUID and by ref)
- Maintain backward compatibility
**Changes:**
1. **Update Partition struct** (databuild/partition_state.rs)
Add UUID field to partition:
- `uuid: Uuid` - Unique identifier for this partition instance
- Remove `want_ids` field (now maintained as inverted index in BuildState)
**Update partition state machine:**
States:
- **Building**: Job actively building this partition
- **UpstreamBuilding**: Job had dep miss, partition waiting for upstream dependencies (stores MissingDeps)
- **UpForRetry**: Upstream dependencies satisfied, partition ready to retry building
- **Live**: Successfully built
- **Failed**: Hard failure (shouldn't retry)
- **UpstreamFailed**: Partition failed because upstream dependencies failed (terminal state)
- **Tainted**: Marked invalid by taint event
**Removed:** Missing state - partitions only exist when jobs start building them or are completed.
Key transitions:
- Building → UpstreamBuilding (job reports dep miss)
- UpstreamBuilding → UpForRetry (all upstream deps satisfied)
- UpstreamBuilding → UpstreamFailed (upstream dependency hard failure)
- Building → Live (job succeeds)
- Building → Failed (job hard failure)
- UpForRetry → Building (new job queued for retry, creates fresh UUID)
- Live → Tainted (partition tainted)
2. **Add dual indexing and inverted indexes** (databuild/build_state.rs)
```rust
pub struct BuildState {
partitions_by_uuid: BTreeMap<Uuid, Partition>, // NEW
canonical_partitions: BTreeMap<String, Uuid>, // NEW
wants_for_partition: BTreeMap<String, Vec<String>>, // NEW: partition ref → want IDs
downstream_waiting: BTreeMap<String, Vec<Uuid>>, // NEW: partition ref → UUIDs waiting for it
partitions: BTreeMap<String, Partition>, // DEPRECATED, keep for now
// ...
}
```
**Rationale for inverted indexes:**
**`wants_for_partition`:**
- Partitions shouldn't know about wants (layering violation)
- Want → Partition is inherent (want defines what it wants)
- Partition → Want is derived (computed from wants, maintained as index)
- BuildState owns this inverted relationship
**`downstream_waiting`:**
- Enables efficient dep miss resolution: when partition becomes Live, directly find which partitions are waiting for it
- Maps upstream partition ref → list of downstream partition UUIDs that have this ref in their MissingDeps
- Avoids scanning all UpstreamBuilding partitions when upstreams complete
- O(1) lookup to find affected partitions
3. **Partition creation happens at job buffer time**
Partitions are **only** created when a job starts building them:
- Orchestrator generates fresh UUIDs when queuing job
- `handle_job_run_buffer()` creates partitions directly in Building state with those UUIDs
- Store in both maps: `partitions_by_uuid[uuid]` and `canonical_partitions[ref] = uuid`
- Keep `partitions[ref]` updated for backward compatibility during migration
No partitions created during want creation - wants just register in inverted index.
4. **Add helper methods** for accessing partitions by UUID and ref
- `get_canonical_partition(ref)` - lookup canonical partition for a ref
- `get_canonical_partition_uuid(ref)` - get UUID of canonical partition
- `get_partition_by_uuid(uuid)` - direct UUID lookup
- `get_wants_for_partition(ref)` - query inverted index
5. **Update inverted index maintenance**
When wants are created, the `wants_for_partition` index must be updated:
- **Want creation:** Add want_id to index for each partition ref in the want
- **Want completion/cancellation:** For now, do NOT remove from index. Cleanup can be added later if needed.
**No partition creation needed** - just update the index. Partitions are created later when jobs are queued.
**Rationale for not cleaning up:**
- Index size should be manageable for now
- Cleanup logic is straightforward to add later when needed
- Avoids complexity around replay (removal operations not in event log)
Key consideration: The index maps partition refs (not UUIDs) to want IDs, since wants reference refs. When a partition is rebuilt with a new UUID, the same ref continues to map to the same wants until those wants complete.
### Phase 2: Add New State and Want State Sensing
**Goals:**
- Add explicit "New" state to Want state machine
- Wants sense canonical partition state and transition appropriately
- Clarify distinction between want state and schedulability
**Changes:**
1. **Add New state** to want_state.rs
Add a new state that represents a want that has just been created but hasn't yet observed the world:
- `NewState` - Want has been created from event, state not yet determined
- Transitions from New:
- New → Failed (any partition failed)
- New → Successful (all partitions live)
- New → Building (any partition building)
- New → Idle (partitions don't exist or other states)
This makes state determination explicit and observable in the event log.
2. **Update handle_want_create()** to sense and transition
During want creation event processing:
- Create want in New state from WantCreateEventV1
- Register want in inverted index (`wants_for_partition`)
- Check canonical partition states for all partition refs
- Transition based on observation (in priority order):
- If ANY canonical partition is Failed → New → Failed (job can't be safely retried)
- If ANY canonical partition is UpstreamFailed → New → UpstreamFailed (upstream deps failed)
- If ALL canonical partitions exist AND are Live → New → Successful (already built!)
- If ANY canonical partition is Building → New → Building (being built now)
- If ANY canonical partition is UpstreamBuilding → New → UpstreamBuilding (waiting for deps)
- If ANY canonical partition is UpForRetry → New → Idle (deps satisfied, ready to schedule)
- Otherwise (partitions don't exist or other states) → New → Idle (need to schedule)
- For derivative wants, additional logic may transition to UpstreamBuilding
Key insight: Most wants will go New → Idle because partitions won't exist yet (only created when jobs start). Subsequent wants for already-building partitions go New → Building. Wants arriving during dep miss go New → UpstreamBuilding. Wants for partitions ready to retry go New → Idle. Wants for already-Live partitions go New → Successful. Wants for Failed or UpstreamFailed partitions go New → Failed/UpstreamFailed.
3. **Keep WantSchedulability building check**
**Important distinction:** Want state vs. schedulability are different concerns:
- **Want state** (New → Building): "Are my partitions currently being built?" - Reflects reality
- **Schedulability**: "Should the orchestrator start a NEW job for this want?" - Prevents duplicate jobs
Example scenario:
```
Want 1: Idle → schedules job → partition becomes Building → want becomes Building
Want 2 arrives → sees partition Building → New → Building
Orchestrator polls: both wants are Building, but should NOT schedule another job
```
The `building` field in `WantUpstreamStatus` remains necessary to prevent duplicate job scheduling. A want can be in Building state but not schedulable if partitions are already being built by another job.
Keep the existing schedulability logic that checks `building.is_empty()`.
4. **Update derivative want handling**
Modify `handle_derivative_want_creation()` to handle wants in their appropriate states:
- **Building → UpstreamBuilding:** Want is Building when dep miss occurs (normal case)
- **UpstreamBuilding → UpstreamBuilding:** Want already waiting on upstreams, add another (additional dep miss)
Note: Idle wants should NOT be present during derivative want creation. If partitions are building (which they must be for a job to report dep miss), wants would have been created in Building state via New → Building transition.
5. **Add required state transitions** in want_state.rs
New transitions needed:
- **New → Failed:** Any partition failed
- **New → UpstreamFailed:** Any partition upstream failed
- **New → Successful:** All partitions live
- **New → Idle:** Normal case, partitions don't exist
- **New → Building:** Partitions already building when want created
- **Building → UpstreamBuilding:** Job reports dep miss (first time)
- **UpstreamBuilding → UpstreamBuilding:** Additional upstreams added
Note: New → UpstreamBuilding is not needed - wants go New → Building first, then Building → UpstreamBuilding when dep miss occurs.
### Phase 3: Update Job Events
**Goals:**
- Jobs reference partition UUIDs, not just refs
- Remove WantAttributedPartitions redundancy
**Changes:**
1. **Update JobRunBufferEventV1** in databuild.proto
Add new message and field:
```protobuf
message PartitionInstanceRef {
PartitionRef partition_ref = 1;
string uuid = 2; // UUID as string
}
message JobRunBufferEventV1 {
// ... existing fields ...
repeated PartitionInstanceRef building_partitions_v2 = 6; // NEW
repeated PartitionRef building_partitions = 4; // DEPRECATED
repeated WantAttributedPartitions servicing_wants = 5; // DEPRECATED
}
```
This pairs each partition ref with its UUID, solving the mapping problem.
2. **Update handle_job_run_buffer()** in build_state.rs
Change partition and want lookup logic:
- Parse UUIDs from event (need partition refs too - consider adding to event or deriving from wants)
- **Create partitions directly in Building state** with these UUIDs (no Missing state)
- Update `canonical_partitions` to point refs to these new UUIDs
- Use inverted index (`wants_for_partition`) to find wants for each partition ref
- Transition those wants: Idle → Building (or stay Building if already there)
- Create job run in Queued state
**Key changes:**
- Partitions created here, not during want creation
- No Missing → Building transition, created directly as Building
- Use inverted index for want discovery (not stored on partition or in event)
3. **Update Orchestrator's queue_job()** in orchestrator.rs
When creating JobRunBufferEventV1:
- Get partition refs from wants (existing logic)
- **Generate fresh UUIDs** for each unique partition ref (one UUID per ref)
- Include UUID list in event along with refs (may need to update event schema)
- Orchestrator no longer needs to track or snapshot want-partition relationships
**Key change:** Orchestrator generates UUIDs at job queue time, not looking up canonical partitions. Each job attempt gets fresh UUIDs. The event handler will create partitions in Building state with these UUIDs and update canonical pointers.
This eliminates WantAttributedPartitions entirely - relationships are discoverable via inverted index.
### Phase 4: Partition Lifecycle Management
**Goals:**
- Define when new partition UUIDs are created
- Handle canonical partition transitions
**Canonical Partition Transitions:**
New partition UUID created when:
1. **First build**: Orchestrator queues job → generates UUID → partition created directly as Building
2. **Taint**: Partition tainted → transition current to Tainted state (keeps UUID, stays canonical so readers can see it's tainted)
3. **Rebuild after taint**: Existing want (still within TTL) sees tainted partition → triggers new job → orchestrator generates fresh UUID → new partition replaces tainted one in canonical_partitions
**Note on TTL/SLA:** These are **want properties**, not partition properties. TTL defines how long after want creation the orchestrator should keep attempting to build partitions. When a partition is tainted, wants within TTL will keep retrying. SLA is an alarm threshold. Partitions don't expire - they stay Live until explicitly tainted or replaced by a new build.
**Key principles:**
- **Building state as lease:** The Building state serves as a lease mechanism. While a partition is in Building state, the orchestrator will not attempt to schedule additional jobs to build that partition. This prevents concurrent/duplicate builds. The lease is released when the partition transitions to Live, Failed, or when a new partition instance with a fresh UUID is created and becomes canonical (e.g., after the building job reports dep miss and a new job is queued).
- When canonical pointer is updated (e.g., new build replaces tainted partition), old partition UUID remains in `partitions_by_uuid` for historical queries
- Canonical pointer always points to current/active partition instance (Building, Live, Failed, or Tainted)
- Tainted partitions stay canonical until replaced - readers need to see they're tainted
- Old instances become immutable historical records
- **No Missing state** - partitions only exist when jobs are actively building them or completed
**Partition Creation:**
Partitions created during `handle_job_run_buffer()`:
- UUIDs come from the event (generated by orchestrator)
- Create partition directly in Building state with job_run_id
- Update `canonical_partitions` map to point ref → UUID
- Store in `partitions_by_uuid`
- If replacing a tainted/failed partition, old one remains in `partitions_by_uuid` by its UUID
**Dep Miss Handling:**
Complete flow when a job has dependency miss:
1. **Job reports dep miss:**
- Job building partition uuid-1 encounters missing upstream deps
- JobRunDepMissEventV1 emitted with MissingDeps (partition refs needed)
- Derivative wants created for missing upstream partitions
2. **Partition transitions to UpstreamBuilding:**
- Partition uuid-1: Building → UpstreamBuilding
- Store MissingDeps in partition state (which upstream refs it's waiting for)
- **Update inverted index:** For each missing dep ref, add uuid-1 to `downstream_waiting[missing_dep_ref]`
- Partition remains canonical (holds lease - prevents concurrent retry attempts)
- Job run transitions to DepMissed state
3. **Want transitions:**
- Wants for partition: Building → UpstreamBuilding
- Wants track the derivative want IDs in their UpstreamBuildingState
4. **Upstream builds complete or fail:**
- **Success case:** Derivative wants build upstream partitions → upstream partition becomes Live
- **Lookup downstream_waiting:** Get `downstream_waiting[upstream_partition_ref]` → list of UUIDs waiting for this upstream
- For each waiting partition UUID:
- Get partition from `partitions_by_uuid[uuid]`
- Check if ALL its MissingDeps are now satisfied (canonical partitions for all refs are Live)
- If satisfied: transition partition UpstreamBuilding → UpForRetry
- Remove uuid from `downstream_waiting` entries (cleanup)
- **Failure case:** Upstream partition transitions to Failed (hard failure)
- **Lookup downstream_waiting:** Get `downstream_waiting[failed_partition_ref]` → list of UUIDs waiting for this upstream
- For each waiting partition UUID in UpstreamBuilding state:
- Transition partition: UpstreamBuilding → UpstreamFailed
- Transition associated wants: UpstreamBuilding → UpstreamFailed
- Remove uuid from `downstream_waiting` entries (cleanup)
- This propagates failure information down the dependency chain
5. **Want becomes schedulable:**
- When partition transitions to UpForRetry, wants transition: UpstreamBuilding → Idle
- Orchestrator sees Idle wants with UpForRetry canonical partitions → schedulable
- New job queued → fresh UUID (uuid-2) generated
- Partition uuid-2 created as Building, replaces uuid-1 in canonical_partitions
- Partition uuid-1 (UpForRetry) remains in partitions_by_uuid as historical record
6. **New wants during dep miss:**
- Want arrives while partition is UpstreamBuilding → New → UpstreamBuilding (correctly waits)
- Want arrives while partition is UpForRetry → New → Idle (correctly schedulable)
**Key properties:**
- Building state acts as lease (no concurrent builds)
- UpstreamBuilding also acts as lease (upstreams not ready, can't retry yet)
- UpForRetry releases lease (upstreams ready, safe to schedule)
- Failed releases lease but blocks new wants (hard failure, shouldn't retry)
- UpstreamFailed releases lease and blocks new wants (upstream deps failed, can't succeed)
- `downstream_waiting` index enables O(1) lookup of affected partitions when upstreams complete or fail
**Taint Handling:**
When partition is tainted (via TaintCreateEvent):
- Find current canonical UUID for the ref
- Transition that partition instance to Tainted state (preserves history)
- **Keep in `canonical_partitions`** - readers need to see it's tainted
- Wants within TTL will see partition is tainted (not Live)
- Orchestrator will schedule new jobs for those wants
- New partition created with fresh UUID when next job starts
- New partition replaces tainted one in `canonical_partitions`
### Phase 5: Migration and Cleanup
**Goals:**
- Remove deprecated fields
- Update API responses
- Complete migration
**Changes:**
1. **Remove deprecated fields from protobuf**
- `building_partitions` from `JobRunBufferEventV1`
- `servicing_wants` from `JobRunBufferEventV1`
- `WantAttributedPartitions` message
2. **Remove backward compatibility code**
- `partitions: BTreeMap<String, Partition>` from `BuildState`
- Dual writes/reads
3. **Update API responses** to include UUIDs where relevant
- JobRunDetail can include partition UUIDs built
- PartitionDetail can include UUID for debugging
4. **Update tests** to use UUID-based assertions
## Design Decisions & Trade-offs
### 1. Wants Reference Refs, Not UUIDs
**Decision:** Wants always reference partition refs (e.g., "data/beta"), not UUIDs.
**Rationale:**
- User requests "data/beta" - the current/canonical partition for that ref
- Want state is based on canonical partition: "is the current partition for my ref being built?"
- If partition gets tainted/rebuilt, wants see the new canonical partition automatically
- Simpler mental model: want doesn't care about historical instances
**How it works:**
```rust
// Want creation
want.partitions = ["data/beta"] // ref, not UUID
// Want state determination
if let Some(canonical_uuid) = canonical_partitions.get("data/beta") {
let partition = partitions_by_uuid[canonical_uuid];
match partition.state {
Building => want.state = Building,
Live => want can complete,
...
}
} else {
// No canonical partition exists yet → Idle
}
```
### 2. Jobs Reference UUIDs, Not Refs
**Decision:** Jobs reference the specific partition UUIDs they built.
**Rationale:**
- Jobs build specific partition instances
- Historical record: "Job J built Partition(uuid-1)"
- Even if partition is later tainted/rebuilt, job's record is immutable
- Enables provenance: "Which job built this specific partition?"
**How it works:**
```rust
JobRunBufferEventV1 {
building_partition_uuids: [uuid-1, uuid-2] // Specific instances
}
```
### 3. UUID Generation: When?
**Decision:** Orchestrator generates UUIDs when queuing jobs, includes them in JobRunBufferEventV1.
**Rationale:**
- UUIDs represent specific build attempts, not partition refs
- Orchestrator is source of truth for "start building these partitions"
- Event contains UUIDs, making replay deterministic (same UUIDs in event)
- No UUID generation during event processing - UUIDs are in the event itself
**Key insight:** The orchestrator generates UUIDs (not BuildState during event handling). This makes UUIDs part of the immutable event log.
### 4. Canonical Partition: One at a Time
**Decision:** Only one canonical partition per ref at a time.
**Scenario handling:**
- Partition(uuid-1, "data/beta") is Building
- User requests rebuild → new want sees uuid-1 is Building → want becomes Building
- Want waits for uuid-1 to complete
- If uuid-1 completes successfully → want completes
- If uuid-1 fails or is tainted → new partition instance created (uuid-2), canonical updated
**Alternative considered:** Multiple concurrent builds with versioning
- Significantly more complex
- No existing need for this
### 5. Event Format: UUID as String
**Decision:** Store UUIDs as strings in protobuf events.
**Rationale:**
- Human-readable in logs/debugging
- Standard UUID string format (36 chars)
- Protobuf has no native UUID type
**Trade-off:** Larger event size (36 bytes vs 16 bytes) - acceptable for debuggability.
## Testing Strategy
### Unit Tests
1. **Partition UUID uniqueness**
- Creating partitions generates unique UUIDs
- Same ref at different times gets different UUIDs
2. **Canonical partition tracking**
- canonical_partitions always points to current instance
- Old instances remain in partitions_by_uuid
3. **Want state determination**
- Want checks canonical partition state
- Multiple wants see same canonical partition
### Integration Tests
1. **Multi-want scenario** (reproduces original bug)
- Want 1 created → New → no partition exists → Idle
- Job scheduled → orchestrator generates uuid-1 → partition created Building
- Want 1 transitions Idle → Building (via job buffer event)
- Wants 2-4 created → New → partition Building (uuid-1) → Building
- All 4 wants reference same canonical partition uuid-1
- Job dep miss → all transition to UpstreamBuilding correctly
- Verifies New state transitions and state sensing work correctly
2. **Rebuild scenario**
- Partition built → Live (uuid-1)
- Partition tainted → new instance created (uuid-2), canonical updated
- New wants reference uuid-2
- Old partition uuid-1 still queryable for history
### End-to-End Tests
1. **Full lifecycle**
- Want created → canonical partition determined
- Job runs → partition transitions through states
- Want completes → partition remains in history
- Partition expires → new UUID for rebuild, canonical updated
## Implementation FAQs
### Q: Do we need to maintain backwards compatibility with existing events?
**A:** No. We can assume no need to maintain backwards compatibility or retain data produced before this change. This simplifies the implementation significantly - no need to handle old event formats or generate UUIDs for replayed pre-UUID events.
### Q: How should we handle reference errors and index inconsistencies?
**A:** Panic on any reference issues with contextual information. This includes:
- Missing partition UUIDs in `partitions_by_uuid`
- Missing canonical pointers in `canonical_partitions`
- Inverted index inconsistencies (wants_for_partition, downstream_waiting)
- Invalid state transitions
Add assertions and validation throughout to catch these issues immediately rather than failing silently.
### Q: What about cleanup of the `wants_for_partition` inverted index?
**A:** Don't remove wants from the index when they complete. This is acceptable for the initial implementation. Building of years of partitions for a mature data platform would still represent less than a million entries, which is manageable. We can add cleanup later if needed.
### Q: What happens when an upstream partition is Tainted instead of becoming Live?
**A:** Tainting of an upstream means it is no longer live, and the downstream job should dep miss. The system will operate correctly:
1. Downstream job discovers upstream is Tainted (not Live) → dep miss
2. Derivative want created for tainted upstream
3. Tainted upstream triggers rebuild (new UUID, replaces canonical)
4. Derivative want succeeds → downstream can resume
### Q: How should UUIDs be generated? Should the Orchestrator calculate them?
**A:** Use deterministic derivation instead of orchestrator generation:
```rust
fn derive_partition_uuid(job_run_id: &str, partition_ref: &str) -> Uuid {
// Hash job_run_id + partition_ref bytes
let mut hasher = Sha256::new();
hasher.update(job_run_id.as_bytes());
hasher.update(partition_ref.as_bytes());
let hash = hasher.finalize();
// Convert first 16 bytes to UUID
Uuid::from_slice(&hash[0..16]).unwrap()
}
```
**Benefits:**
- No orchestrator UUID state/generation needed
- Deterministic replay (same job + ref = same UUID)
- Event schema stays simple (job_run_id + partition refs)
- Build state derives UUIDs in `handle_job_run_buffer()`
- No need for `PartitionInstanceRef` message in protobuf
### Q: How do we enforce safe canonical partition access?
**A:** Add and use helper methods in BuildState to enforce correct access patterns:
- `get_canonical_partition(ref)` - lookup canonical partition for a ref
- `get_canonical_partition_uuid(ref)` - get UUID of canonical partition
- `get_partition_by_uuid(uuid)` - direct UUID lookup
- `get_wants_for_partition(ref)` - query inverted index
Existing `get_partition()` function should be updated to use canonical lookup. Code should always access "current state" via canonical_partitions, not by ref lookup in the deprecated partitions map.
### Q: What is the want schedulability check logic?
**A:** A want is schedulable if:
- The canonical partition doesn't exist for any of its partition refs, OR
- The canonical partition exists and is in Tainted or UpForRetry state
In other words: `!exists || Tainted || UpForRetry`
Building and UpstreamBuilding partitions act as leases (not schedulable).
### Q: Should we implement phases strictly sequentially?
**A:** No. Proceed in the most efficient and productive manner possible. Phases can be combined or reordered as makes sense. For example, Phase 1 + Phase 2 can be done together since want state sensing depends on the new partition states.
### Q: Should we write tests incrementally or implement everything first?
**A:** Implement tests as we go. Write unit tests for each component as it's implemented, then integration tests for full scenarios.
### Q: Should wants reference partition UUIDs or partition refs?
**A:** Wants should NEVER reference partition instances (via UUID). Wants should ONLY reference canonical partitions via partition ref strings. This is already the case - wants include partition refs, which allows the orchestrator to resolve partition info for want state updates. The separation is:
- Wants → Partition Refs (canonical, user-facing)
- Jobs → Partition UUIDs (specific instances, historical)
### Q: Should we add UpstreamFailed state for partitions?
**A:** Yes. This provides symmetry with want semantics and clear terminal state propagation:
**Scenario:**
1. Partition A: Building → Failed (hard failure)
2. Partition B needs A, dep misses → UpstreamBuilding
3. Derivative want created for A, immediately fails (A is Failed)
4. Partition B: UpstreamBuilding → UpstreamFailed
**Benefits:**
- Clear signal that partition can never succeed (upstreams failed)
- Mirrors Want UpstreamFailed semantics (consistency)
- Useful for UIs and debugging
- Prevents indefinite waiting in UpstreamBuilding state
**Transition logic:**
- When partition transitions to Failed, lookup `downstream_waiting[failed_partition_ref]`
- For each downstream partition UUID in UpstreamBuilding state, transition to UpstreamFailed
- This propagates failure information down the dependency chain
**Add to Phase 1 partition states:**
- **UpstreamFailed**: Partition failed because upstream dependencies failed (terminal state)
**Add transition:**
- UpstreamBuilding → UpstreamFailed (upstream dependency hard failure)
### Q: Can a job build the same partition ref multiple times?
**A:** No, this is invalid. A job run cannot build the same partition multiple times. Each partition ref should appear at most once in a job's building_partitions list.
## Summary
Adding partition UUIDs solves fundamental architectural problems:
- **Temporal identity**: Distinguish partition instances over time
- **Stable job references**: Jobs reference immutable partition UUIDs they built
- **Wants reference refs**: Want state based on canonical partition for their ref
- **Discoverable relationships**: Remove redundant snapshot data (WantAttributedPartitions)
- **Proper semantics**: Want state reflects actual canonical partition state
**Key principle:** Wants care about "what's the current state of data/beta?" (refs), while jobs and historical queries care about "what happened to this specific partition instance?" (UUIDs).
This refactor enables cleaner code, better observability, and proper event sourcing semantics throughout the system.