update partitions-refactor.md
This commit is contained in:
parent
c556fec218
commit
61c13cdcc0
1 changed files with 285 additions and 379 deletions
|
|
@ -112,16 +112,30 @@ JobRunBufferEventV1 {
|
|||
building_partition_uuids: [uuid-1, uuid-2]
|
||||
}
|
||||
|
||||
// To find serviced wants:
|
||||
// To find serviced wants - use inverted index in BuildState
|
||||
for uuid in job.building_partition_uuids {
|
||||
let partition = partitions_by_uuid[uuid];
|
||||
for want_id in partition.want_ids {
|
||||
// transition want
|
||||
let partition_ref = &partition.partition_ref.r#ref;
|
||||
|
||||
// Look up wants via inverted index (not stored on partition)
|
||||
if let Some(want_ids) = wants_for_partition.get(partition_ref) {
|
||||
for want_id in want_ids {
|
||||
// transition want
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The relationship is **discoverable** via stable partition UUID, not **baked-in** at event creation.
|
||||
The relationship is **discoverable** via inverted index, not **baked-in** at event creation or **stored on partitions**.
|
||||
|
||||
**Key improvement:** Partitions don't store `want_ids`. This is cleaner separation of concerns:
|
||||
- Want → Partition: Inherent (want defines partitions it wants)
|
||||
- Partition → Want: Derived (maintained as inverted index in BuildState)
|
||||
|
||||
**Note on want state vs schedulability:**
|
||||
- Want state (Building) reflects current reality: "my partitions are being built"
|
||||
- Schedulability prevents duplicate jobs: "don't schedule another job if partitions already building"
|
||||
- Both mechanisms needed: state for correctness, schedulability for efficiency
|
||||
|
||||
### 2. Proper State Semantics for Wants
|
||||
|
||||
|
|
@ -132,19 +146,23 @@ Want 2 → sees partition Building → stays Idle (different from Want 1?)
|
|||
Want 3 → same partition → also Idle
|
||||
```
|
||||
|
||||
**With UUIDs:**
|
||||
**With UUIDs and New state:**
|
||||
```
|
||||
Partition(uuid-1, "data/beta") created as Missing
|
||||
Want 1 arrives → checks canonical["data/beta"] = uuid-1 (Missing) → Idle → schedules job
|
||||
Job starts → uuid-1 becomes Building, canonical still points to uuid-1
|
||||
Want 2 arrives → checks canonical["data/beta"] = uuid-1 (Building) → directly to Building
|
||||
Want 3 arrives → checks canonical["data/beta"] = uuid-1 (Building) → directly to Building
|
||||
Want 4 arrives → checks canonical["data/beta"] = uuid-1 (Building) → directly to Building
|
||||
Want 1 arrives → New → no canonical partition exists → Idle → schedulable
|
||||
Orchestrator queues job → generates uuid-1 for "data/beta"
|
||||
Job buffer event → creates Partition(uuid-1, "data/beta", Building)
|
||||
→ updates canonical["data/beta"] = uuid-1
|
||||
→ transitions Want 1: Idle → Building
|
||||
Want 2 arrives → New → canonical["data/beta"] = uuid-1 (Building) → Building
|
||||
Want 3 arrives → New → canonical["data/beta"] = uuid-1 (Building) → Building
|
||||
Want 4 arrives → New → canonical["data/beta"] = uuid-1 (Building) → Building
|
||||
```
|
||||
|
||||
All 4 wants have **identical relationship** to the canonical partition. The state reflects reality: "is the canonical partition for my ref being built?"
|
||||
|
||||
**Key insight:** Wants don't bind to UUIDs. They look up the canonical partition for their ref and base their state on that.
|
||||
**Key insights:**
|
||||
- Wants don't bind to UUIDs. They look up the canonical partition for their ref and base their state on that.
|
||||
- New state makes state determination explicit: want creation → observe world → transition to appropriate state
|
||||
|
||||
### 3. Historical Lineage
|
||||
|
||||
|
|
@ -177,185 +195,168 @@ Can answer:
|
|||
**Changes:**
|
||||
|
||||
1. **Update Partition struct** (databuild/partition_state.rs)
|
||||
```rust
|
||||
pub struct PartitionWithState<S> {
|
||||
pub uuid: Uuid, // NEW
|
||||
pub partition_ref: PartitionRef,
|
||||
pub want_ids: Vec<String>,
|
||||
pub state: S,
|
||||
}
|
||||
```
|
||||
|
||||
2. **Add dual indexing** (databuild/build_state.rs)
|
||||
Add UUID field to partition:
|
||||
- `uuid: Uuid` - Unique identifier for this partition instance
|
||||
- Remove `want_ids` field (now maintained as inverted index in BuildState)
|
||||
|
||||
**Update partition state machine:**
|
||||
|
||||
States:
|
||||
- **Building**: Job actively building this partition
|
||||
- **UpstreamBuilding**: Job had dep miss, partition waiting for upstream dependencies (stores MissingDeps)
|
||||
- **UpForRetry**: Upstream dependencies satisfied, partition ready to retry building
|
||||
- **Live**: Successfully built
|
||||
- **Failed**: Hard failure (shouldn't retry)
|
||||
- **Tainted**: Marked invalid by taint event
|
||||
|
||||
**Removed:** Missing state - partitions only exist when jobs start building them or are completed.
|
||||
|
||||
Key transitions:
|
||||
- Building → UpstreamBuilding (job reports dep miss)
|
||||
- UpstreamBuilding → UpForRetry (all upstream deps satisfied)
|
||||
- Building → Live (job succeeds)
|
||||
- Building → Failed (job hard failure)
|
||||
- UpForRetry → Building (new job queued for retry, creates fresh UUID)
|
||||
- Live → Tainted (partition tainted)
|
||||
|
||||
2. **Add dual indexing and inverted indexes** (databuild/build_state.rs)
|
||||
```rust
|
||||
pub struct BuildState {
|
||||
partitions_by_uuid: BTreeMap<Uuid, Partition>, // NEW
|
||||
canonical_partitions: BTreeMap<String, Uuid>, // NEW
|
||||
partitions: BTreeMap<String, Partition>, // DEPRECATED, keep for now
|
||||
partitions_by_uuid: BTreeMap<Uuid, Partition>, // NEW
|
||||
canonical_partitions: BTreeMap<String, Uuid>, // NEW
|
||||
wants_for_partition: BTreeMap<String, Vec<String>>, // NEW: partition ref → want IDs
|
||||
downstream_waiting: BTreeMap<String, Vec<Uuid>>, // NEW: partition ref → UUIDs waiting for it
|
||||
partitions: BTreeMap<String, Partition>, // DEPRECATED, keep for now
|
||||
// ...
|
||||
}
|
||||
```
|
||||
|
||||
3. **Update partition creation**
|
||||
- When creating partition (Missing state), generate UUID
|
||||
**Rationale for inverted indexes:**
|
||||
|
||||
**`wants_for_partition`:**
|
||||
- Partitions shouldn't know about wants (layering violation)
|
||||
- Want → Partition is inherent (want defines what it wants)
|
||||
- Partition → Want is derived (computed from wants, maintained as index)
|
||||
- BuildState owns this inverted relationship
|
||||
|
||||
**`downstream_waiting`:**
|
||||
- Enables efficient dep miss resolution: when partition becomes Live, directly find which partitions are waiting for it
|
||||
- Maps upstream partition ref → list of downstream partition UUIDs that have this ref in their MissingDeps
|
||||
- Avoids scanning all UpstreamBuilding partitions when upstreams complete
|
||||
- O(1) lookup to find affected partitions
|
||||
|
||||
3. **Partition creation happens at job buffer time**
|
||||
|
||||
Partitions are **only** created when a job starts building them:
|
||||
- Orchestrator generates fresh UUIDs when queuing job
|
||||
- `handle_job_run_buffer()` creates partitions directly in Building state with those UUIDs
|
||||
- Store in both maps: `partitions_by_uuid[uuid]` and `canonical_partitions[ref] = uuid`
|
||||
- Keep `partitions[ref]` updated for backward compatibility
|
||||
- Keep `partitions[ref]` updated for backward compatibility during migration
|
||||
|
||||
4. **Add helper methods**
|
||||
```rust
|
||||
impl BuildState {
|
||||
fn get_canonical_partition(&self, ref: &str) -> Option<&Partition> {
|
||||
self.canonical_partitions
|
||||
.get(ref)
|
||||
.and_then(|uuid| self.partitions_by_uuid.get(uuid))
|
||||
}
|
||||
No partitions created during want creation - wants just register in inverted index.
|
||||
|
||||
fn get_canonical_partition_uuid(&self, ref: &str) -> Option<Uuid> {
|
||||
self.canonical_partitions.get(ref).copied()
|
||||
}
|
||||
4. **Add helper methods** for accessing partitions by UUID and ref
|
||||
- `get_canonical_partition(ref)` - lookup canonical partition for a ref
|
||||
- `get_canonical_partition_uuid(ref)` - get UUID of canonical partition
|
||||
- `get_partition_by_uuid(uuid)` - direct UUID lookup
|
||||
- `get_wants_for_partition(ref)` - query inverted index
|
||||
|
||||
fn get_partition_by_uuid(&self, uuid: &Uuid) -> Option<&Partition> {
|
||||
self.partitions_by_uuid.get(uuid)
|
||||
}
|
||||
}
|
||||
```
|
||||
5. **Update inverted index maintenance**
|
||||
|
||||
### Phase 2: Update Want State Logic
|
||||
When wants are created, the `wants_for_partition` index must be updated:
|
||||
|
||||
- **Want creation:** Add want_id to index for each partition ref in the want
|
||||
- **Want completion/cancellation:** For now, do NOT remove from index. Cleanup can be added later if needed.
|
||||
|
||||
**No partition creation needed** - just update the index. Partitions are created later when jobs are queued.
|
||||
|
||||
**Rationale for not cleaning up:**
|
||||
- Index size should be manageable for now
|
||||
- Cleanup logic is straightforward to add later when needed
|
||||
- Avoids complexity around replay (removal operations not in event log)
|
||||
|
||||
Key consideration: The index maps partition refs (not UUIDs) to want IDs, since wants reference refs. When a partition is rebuilt with a new UUID, the same ref continues to map to the same wants until those wants complete.
|
||||
|
||||
### Phase 2: Add New State and Want State Sensing
|
||||
|
||||
**Goals:**
|
||||
- Wants determine state based on canonical partition
|
||||
- Remove schedulability check for building partitions (no longer needed)
|
||||
- Add explicit "New" state to Want state machine
|
||||
- Wants sense canonical partition state and transition appropriately
|
||||
- Clarify distinction between want state and schedulability
|
||||
|
||||
**Changes:**
|
||||
|
||||
1. **Update handle_want_create()** (databuild/build_state.rs)
|
||||
```rust
|
||||
fn handle_want_create(&mut self, event: &WantCreateEventV1) -> Vec<Event> {
|
||||
// Create want in Idle state initially
|
||||
let want_idle: WantWithState<IdleState> = event.clone().into();
|
||||
1. **Add New state** to want_state.rs
|
||||
|
||||
// Check canonical partition states to determine want's actual initial state
|
||||
let has_building_partitions = event.partitions.iter().any(|pref| {
|
||||
matches!(
|
||||
self.get_canonical_partition(&pref.r#ref),
|
||||
Some(Partition::Building(_))
|
||||
)
|
||||
});
|
||||
Add a new state that represents a want that has just been created but hasn't yet observed the world:
|
||||
|
||||
let want = if has_building_partitions {
|
||||
// Canonical partition is Building → Want starts in Building
|
||||
tracing::info!(
|
||||
want_id = %event.want_id,
|
||||
"Want created in Building state (canonical partition is building)"
|
||||
);
|
||||
Want::Building(want_idle.start_building(current_timestamp()))
|
||||
} else {
|
||||
// Canonical partition not Building → Want starts in Idle
|
||||
tracing::info!(
|
||||
want_id = %event.want_id,
|
||||
"Want created in Idle state"
|
||||
);
|
||||
Want::Idle(want_idle)
|
||||
};
|
||||
- `NewState` - Want has been created from event, state not yet determined
|
||||
- Transitions from New:
|
||||
- New → Failed (any partition failed)
|
||||
- New → Successful (all partitions live)
|
||||
- New → Building (any partition building)
|
||||
- New → Idle (partitions don't exist or other states)
|
||||
|
||||
self.wants.insert(event.want_id.clone(), want);
|
||||
This makes state determination explicit and observable in the event log.
|
||||
|
||||
// Register want with partitions
|
||||
for pref in &event.partitions {
|
||||
self.add_want_to_partition(pref, &event.want_id);
|
||||
}
|
||||
2. **Update handle_want_create()** to sense and transition
|
||||
|
||||
// Handle derivative wants if applicable
|
||||
if let Some(source) = &event.source {
|
||||
if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source {
|
||||
self.handle_derivative_want_creation(
|
||||
&event.want_id,
|
||||
&event.partitions,
|
||||
&job_triggered.job_run_id,
|
||||
);
|
||||
}
|
||||
}
|
||||
During want creation event processing:
|
||||
|
||||
vec![]
|
||||
}
|
||||
- Create want in New state from WantCreateEventV1
|
||||
- Register want in inverted index (`wants_for_partition`)
|
||||
- Check canonical partition states for all partition refs
|
||||
- Transition based on observation (in priority order):
|
||||
- If ANY canonical partition is Failed → New → Failed (job can't be safely retried)
|
||||
- If ALL canonical partitions exist AND are Live → New → Successful (already built!)
|
||||
- If ANY canonical partition is Building → New → Building (being built now)
|
||||
- If ANY canonical partition is UpstreamBuilding → New → UpstreamBuilding (waiting for deps)
|
||||
- If ANY canonical partition is UpForRetry → New → Idle (deps satisfied, ready to schedule)
|
||||
- Otherwise (partitions don't exist or other states) → New → Idle (need to schedule)
|
||||
- For derivative wants, additional logic may transition to UpstreamBuilding
|
||||
|
||||
Key insight: Most wants will go New → Idle because partitions won't exist yet (only created when jobs start). Subsequent wants for already-building partitions go New → Building. Wants arriving during dep miss go New → UpstreamBuilding. Wants for partitions ready to retry go New → Idle. Wants for already-Live partitions go New → Successful. Wants for Failed partitions go New → Failed.
|
||||
|
||||
3. **Keep WantSchedulability building check**
|
||||
|
||||
**Important distinction:** Want state vs. schedulability are different concerns:
|
||||
|
||||
- **Want state** (New → Building): "Are my partitions currently being built?" - Reflects reality
|
||||
- **Schedulability**: "Should the orchestrator start a NEW job for this want?" - Prevents duplicate jobs
|
||||
|
||||
Example scenario:
|
||||
```
|
||||
Want 1: Idle → schedules job → partition becomes Building → want becomes Building
|
||||
Want 2 arrives → sees partition Building → New → Building
|
||||
Orchestrator polls: both wants are Building, but should NOT schedule another job
|
||||
```
|
||||
|
||||
2. **Simplify WantSchedulability** (databuild/build_state.rs)
|
||||
```rust
|
||||
// Remove `building` field from WantUpstreamStatus
|
||||
pub struct WantUpstreamStatus {
|
||||
pub live: Vec<LivePartitionRef>,
|
||||
pub tainted: Vec<TaintedPartitionRef>,
|
||||
pub missing: Vec<MissingPartitionRef>,
|
||||
// REMOVED: pub building: Vec<BuildingPartitionRef>,
|
||||
}
|
||||
The `building` field in `WantUpstreamStatus` remains necessary to prevent duplicate job scheduling. A want can be in Building state but not schedulable if partitions are already being built by another job.
|
||||
|
||||
impl WantSchedulability {
|
||||
pub fn is_schedulable(&self) -> bool {
|
||||
// Simplified: only check upstreams
|
||||
// Building partitions now handled at want creation
|
||||
self.status.missing.is_empty() && self.status.tainted.is_empty()
|
||||
}
|
||||
}
|
||||
```
|
||||
Keep the existing schedulability logic that checks `building.is_empty()`.
|
||||
|
||||
3. **Update derivative want handling** (databuild/build_state.rs)
|
||||
```rust
|
||||
fn handle_derivative_want_creation(...) {
|
||||
// ...existing logic...
|
||||
4. **Update derivative want handling**
|
||||
|
||||
for want_id in impacted_want_ids {
|
||||
let want = self.wants.remove(&want_id).expect(...);
|
||||
let transitioned = match want {
|
||||
// Idle wants can exist if they arrived after job started but before dep miss
|
||||
Want::Idle(idle) => {
|
||||
tracing::info!(
|
||||
want_id = %want_id,
|
||||
derivative_want_id = %derivative_want_id,
|
||||
"Want: Idle → UpstreamBuilding (partition dep miss detected)"
|
||||
);
|
||||
Want::UpstreamBuilding(
|
||||
idle.detect_missing_deps(vec![derivative_want_id.to_string()])
|
||||
)
|
||||
}
|
||||
Want::Building(building) => {
|
||||
// Building → UpstreamBuilding
|
||||
// ... existing logic ...
|
||||
}
|
||||
Want::UpstreamBuilding(upstream) => {
|
||||
// UpstreamBuilding → UpstreamBuilding (add another upstream)
|
||||
// ... existing logic ...
|
||||
}
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Want {} in invalid state {:?}. Should be Idle, Building, or UpstreamBuilding.",
|
||||
want_id, want
|
||||
);
|
||||
}
|
||||
};
|
||||
self.wants.insert(want_id, transitioned);
|
||||
}
|
||||
}
|
||||
```
|
||||
Modify `handle_derivative_want_creation()` to handle wants in their appropriate states:
|
||||
|
||||
4. **Add Idle → UpstreamBuilding transition** (databuild/want_state.rs)
|
||||
```rust
|
||||
impl WantWithState<IdleState> {
|
||||
// ... existing methods ...
|
||||
- **Building → UpstreamBuilding:** Want is Building when dep miss occurs (normal case)
|
||||
- **UpstreamBuilding → UpstreamBuilding:** Want already waiting on upstreams, add another (additional dep miss)
|
||||
|
||||
/// Transition from Idle to UpstreamBuilding when dependencies are missing
|
||||
/// This can happen if want arrives while partition is building, then job has dep miss
|
||||
pub fn detect_missing_deps(
|
||||
self,
|
||||
upstream_want_ids: Vec<String>,
|
||||
) -> WantWithState<UpstreamBuildingState> {
|
||||
WantWithState {
|
||||
want: self.want.updated_timestamp(),
|
||||
state: UpstreamBuildingState { upstream_want_ids },
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
Note: Idle wants should NOT be present during derivative want creation. If partitions are building (which they must be for a job to report dep miss), wants would have been created in Building state via New → Building transition.
|
||||
|
||||
5. **Add required state transitions** in want_state.rs
|
||||
|
||||
New transitions needed:
|
||||
|
||||
- **New → Failed:** Any partition failed
|
||||
- **New → Successful:** All partitions live
|
||||
- **New → Idle:** Normal case, partitions don't exist
|
||||
- **New → Building:** Partitions already building when want created
|
||||
- **Building → UpstreamBuilding:** Job reports dep miss (first time)
|
||||
- **UpstreamBuilding → UpstreamBuilding:** Additional upstreams added
|
||||
|
||||
Note: New → UpstreamBuilding is not needed - wants go New → Building first, then Building → UpstreamBuilding when dep miss occurs.
|
||||
|
||||
### Phase 3: Update Job Events
|
||||
|
||||
|
|
@ -365,200 +366,146 @@ Can answer:
|
|||
|
||||
**Changes:**
|
||||
|
||||
1. **Update JobRunBufferEventV1** (databuild/databuild.proto)
|
||||
1. **Update JobRunBufferEventV1** in databuild.proto
|
||||
|
||||
Add new message and field:
|
||||
```protobuf
|
||||
message PartitionInstanceRef {
|
||||
PartitionRef partition_ref = 1;
|
||||
string uuid = 2; // UUID as string
|
||||
}
|
||||
|
||||
message JobRunBufferEventV1 {
|
||||
string job_run_id = 1;
|
||||
string job_label = 2;
|
||||
repeated string building_partition_uuids = 3; // NEW: UUIDs instead of refs
|
||||
repeated PartitionRef building_partitions = 4; // DEPRECATED: keep for migration
|
||||
repeated WantAttributedPartitions servicing_wants = 5; // DEPRECATED: remove later
|
||||
// ... existing fields ...
|
||||
repeated PartitionInstanceRef building_partitions_v2 = 6; // NEW
|
||||
repeated PartitionRef building_partitions = 4; // DEPRECATED
|
||||
repeated WantAttributedPartitions servicing_wants = 5; // DEPRECATED
|
||||
}
|
||||
```
|
||||
|
||||
2. **Update handle_job_run_buffer()** (databuild/build_state.rs)
|
||||
```rust
|
||||
fn handle_job_run_buffer(&mut self, event: &JobRunBufferEventV1) -> Vec<Event> {
|
||||
// Parse UUIDs from event
|
||||
let building_uuids: Vec<Uuid> = event.building_partition_uuids
|
||||
.iter()
|
||||
.map(|s| Uuid::parse_str(s).expect("Valid UUID"))
|
||||
.collect();
|
||||
This pairs each partition ref with its UUID, solving the mapping problem.
|
||||
|
||||
// Find all wants for these partition UUIDs
|
||||
let mut impacted_want_ids: HashSet<String> = HashSet::new();
|
||||
for uuid in &building_uuids {
|
||||
if let Some(partition) = self.partitions_by_uuid.get(uuid) {
|
||||
for want_id in partition.want_ids() {
|
||||
impacted_want_ids.insert(want_id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
2. **Update handle_job_run_buffer()** in build_state.rs
|
||||
|
||||
// Transition wants to Building
|
||||
for want_id in impacted_want_ids {
|
||||
let want = self.wants.remove(&want_id).expect("Want must exist");
|
||||
let transitioned = match want {
|
||||
Want::Idle(idle) => Want::Building(idle.start_building(current_timestamp())),
|
||||
Want::Building(building) => Want::Building(building), // Already building
|
||||
_ => panic!("Invalid state for job buffer: {:?}", want),
|
||||
};
|
||||
self.wants.insert(want_id, transitioned);
|
||||
}
|
||||
Change partition and want lookup logic:
|
||||
|
||||
// Transition partitions to Building by UUID
|
||||
for uuid in building_uuids {
|
||||
if let Some(partition) = self.partitions_by_uuid.remove(&uuid) {
|
||||
let building = match partition {
|
||||
Partition::Missing(missing) => {
|
||||
Partition::Building(missing.start_building(event.job_run_id.clone()))
|
||||
}
|
||||
_ => panic!("Partition {:?} not in Missing state", uuid),
|
||||
};
|
||||
self.partitions_by_uuid.insert(uuid, building);
|
||||
}
|
||||
}
|
||||
- Parse UUIDs from event (need partition refs too - consider adding to event or deriving from wants)
|
||||
- **Create partitions directly in Building state** with these UUIDs (no Missing state)
|
||||
- Update `canonical_partitions` to point refs to these new UUIDs
|
||||
- Use inverted index (`wants_for_partition`) to find wants for each partition ref
|
||||
- Transition those wants: Idle → Building (or stay Building if already there)
|
||||
- Create job run in Queued state
|
||||
|
||||
// Create job run
|
||||
let queued: JobRunWithState<JobQueuedState> = event.clone().into();
|
||||
self.job_runs.insert(event.job_run_id.clone(), JobRun::Queued(queued));
|
||||
**Key changes:**
|
||||
- Partitions created here, not during want creation
|
||||
- No Missing → Building transition, created directly as Building
|
||||
- Use inverted index for want discovery (not stored on partition or in event)
|
||||
|
||||
vec![]
|
||||
}
|
||||
```
|
||||
3. **Update Orchestrator's queue_job()** in orchestrator.rs
|
||||
|
||||
3. **Update Orchestrator** (databuild/orchestrator.rs)
|
||||
```rust
|
||||
fn queue_job(&mut self, wg: WantGroup) -> Result<(), DatabuildError> {
|
||||
// Get partition refs from wants
|
||||
let wanted_refs: Vec<PartitionRef> = wg.wants
|
||||
.iter()
|
||||
.flat_map(|want| want.partitions.clone())
|
||||
.collect();
|
||||
When creating JobRunBufferEventV1:
|
||||
|
||||
// Resolve refs to canonical UUIDs
|
||||
let building_partition_uuids: Vec<String> = wanted_refs
|
||||
.iter()
|
||||
.filter_map(|pref| {
|
||||
self.bel.state.get_canonical_partition_uuid(&pref.r#ref)
|
||||
.map(|uuid| uuid.to_string())
|
||||
})
|
||||
.collect();
|
||||
- Get partition refs from wants (existing logic)
|
||||
- **Generate fresh UUIDs** for each unique partition ref (one UUID per ref)
|
||||
- Include UUID list in event along with refs (may need to update event schema)
|
||||
- Orchestrator no longer needs to track or snapshot want-partition relationships
|
||||
|
||||
let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 {
|
||||
job_run_id: job_run_id.to_string(),
|
||||
job_label: wg.job.label,
|
||||
building_partition_uuids, // Use canonical UUIDs
|
||||
building_partitions: vec![], // Deprecated
|
||||
servicing_wants: vec![], // Deprecated
|
||||
});
|
||||
**Key change:** Orchestrator generates UUIDs at job queue time, not looking up canonical partitions. Each job attempt gets fresh UUIDs. The event handler will create partitions in Building state with these UUIDs and update canonical pointers.
|
||||
|
||||
self.append_and_broadcast(&job_buffer_event)?;
|
||||
self.job_runs.push(job_run);
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
This eliminates WantAttributedPartitions entirely - relationships are discoverable via inverted index.
|
||||
|
||||
### Phase 4: Partition Lifecycle Management
|
||||
|
||||
**Goals:**
|
||||
- Define when new partition UUIDs are created
|
||||
- Handle canonical partition transitions
|
||||
- Implement cleanup/GC
|
||||
|
||||
**Canonical Partition Transitions:**
|
||||
|
||||
New partition UUID created when:
|
||||
1. **First build**: Partition doesn't exist → create Partition(uuid, Missing)
|
||||
2. **Taint**: Partition tainted → create new Partition(uuid-new, Missing), update canonical
|
||||
3. **Expiration**: TTL exceeded → create new Partition(uuid-new, Missing), update canonical
|
||||
4. **Manual rebuild**: Explicit rebuild request → create new Partition(uuid-new, Missing), update canonical
|
||||
1. **First build**: Orchestrator queues job → generates UUID → partition created directly as Building
|
||||
2. **Taint**: Partition tainted → transition current to Tainted state (keeps UUID, stays canonical so readers can see it's tainted)
|
||||
3. **Rebuild after taint**: Existing want (still within TTL) sees tainted partition → triggers new job → orchestrator generates fresh UUID → new partition replaces tainted one in canonical_partitions
|
||||
|
||||
**Implementation:**
|
||||
**Note on TTL/SLA:** These are **want properties**, not partition properties. TTL defines how long after want creation the orchestrator should keep attempting to build partitions. When a partition is tainted, wants within TTL will keep retrying. SLA is an alarm threshold. Partitions don't expire - they stay Live until explicitly tainted or replaced by a new build.
|
||||
|
||||
```rust
|
||||
impl BuildState {
|
||||
/// Create a new partition instance for a ref, updating canonical pointer
|
||||
fn create_new_partition_instance(&mut self, partition_ref: &PartitionRef) -> Uuid {
|
||||
let new_uuid = Uuid::new_v4();
|
||||
let new_partition = Partition::new_missing_with_uuid(
|
||||
new_uuid,
|
||||
partition_ref.clone()
|
||||
);
|
||||
**Key principles:**
|
||||
|
||||
// Update canonical pointer (old UUID becomes historical)
|
||||
self.canonical_partitions.insert(
|
||||
partition_ref.r#ref.clone(),
|
||||
new_uuid
|
||||
);
|
||||
- **Building state as lease:** The Building state serves as a lease mechanism. While a partition is in Building state, the orchestrator will not attempt to schedule additional jobs to build that partition. This prevents concurrent/duplicate builds. The lease is released when the partition transitions to Live, Failed, or when a new partition instance with a fresh UUID is created and becomes canonical (e.g., after the building job reports dep miss and a new job is queued).
|
||||
- When canonical pointer is updated (e.g., new build replaces tainted partition), old partition UUID remains in `partitions_by_uuid` for historical queries
|
||||
- Canonical pointer always points to current/active partition instance (Building, Live, Failed, or Tainted)
|
||||
- Tainted partitions stay canonical until replaced - readers need to see they're tainted
|
||||
- Old instances become immutable historical records
|
||||
- **No Missing state** - partitions only exist when jobs are actively building them or completed
|
||||
|
||||
// Store new partition
|
||||
self.partitions_by_uuid.insert(new_uuid, new_partition);
|
||||
**Partition Creation:**
|
||||
|
||||
// Old partition remains in partitions_by_uuid for historical queries
|
||||
Partitions created during `handle_job_run_buffer()`:
|
||||
|
||||
new_uuid
|
||||
}
|
||||
- UUIDs come from the event (generated by orchestrator)
|
||||
- Create partition directly in Building state with job_run_id
|
||||
- Update `canonical_partitions` map to point ref → UUID
|
||||
- Store in `partitions_by_uuid`
|
||||
- If replacing a tainted/failed partition, old one remains in `partitions_by_uuid` by its UUID
|
||||
|
||||
/// Handle partition taint - creates new instance
|
||||
fn taint_partition(&mut self, partition_ref: &str) -> Uuid {
|
||||
// Mark current partition as Tainted
|
||||
if let Some(current_uuid) = self.canonical_partitions.get(partition_ref) {
|
||||
if let Some(partition) = self.partitions_by_uuid.get_mut(current_uuid) {
|
||||
// Transition to Tainted state (keep UUID)
|
||||
*partition = match partition {
|
||||
Partition::Live(live) => {
|
||||
Partition::Tainted(live.clone().mark_tainted())
|
||||
}
|
||||
_ => panic!("Can only taint Live partitions"),
|
||||
};
|
||||
}
|
||||
}
|
||||
**Dep Miss Handling:**
|
||||
|
||||
// Create new partition instance for rebuilding
|
||||
self.create_new_partition_instance(&PartitionRef {
|
||||
r#ref: partition_ref.to_string()
|
||||
})
|
||||
}
|
||||
}
|
||||
```
|
||||
Complete flow when a job has dependency miss:
|
||||
|
||||
**GC Strategy:**
|
||||
1. **Job reports dep miss:**
|
||||
- Job building partition uuid-1 encounters missing upstream deps
|
||||
- JobRunDepMissEventV1 emitted with MissingDeps (partition refs needed)
|
||||
- Derivative wants created for missing upstream partitions
|
||||
|
||||
Time-based retention (recommended):
|
||||
- Keep partition UUIDs for N days (default 30)
|
||||
- Enables historical queries within retention window
|
||||
- Predictable storage growth
|
||||
2. **Partition transitions to UpstreamBuilding:**
|
||||
- Partition uuid-1: Building → UpstreamBuilding
|
||||
- Store MissingDeps in partition state (which upstream refs it's waiting for)
|
||||
- **Update inverted index:** For each missing dep ref, add uuid-1 to `downstream_waiting[missing_dep_ref]`
|
||||
- Partition remains canonical (holds lease - prevents concurrent retry attempts)
|
||||
- Job run transitions to DepMissed state
|
||||
|
||||
```rust
|
||||
impl BuildState {
|
||||
/// Remove partition UUIDs older than retention window
|
||||
fn gc_old_partitions(&mut self, retention_days: u64) {
|
||||
let cutoff = current_timestamp() - (retention_days * 86400 * 1_000_000_000);
|
||||
3. **Want transitions:**
|
||||
- Wants for partition: Building → UpstreamBuilding
|
||||
- Wants track the derivative want IDs in their UpstreamBuildingState
|
||||
|
||||
// Find UUIDs to remove (not canonical + older than cutoff)
|
||||
let canonical_uuids: HashSet<Uuid> = self.canonical_partitions
|
||||
.values()
|
||||
.copied()
|
||||
.collect();
|
||||
4. **Upstream builds complete:**
|
||||
- Derivative wants build upstream partitions → upstream partition becomes Live
|
||||
- **Lookup downstream_waiting:** Get `downstream_waiting[upstream_partition_ref]` → list of UUIDs waiting for this upstream
|
||||
- For each waiting partition UUID:
|
||||
- Get partition from `partitions_by_uuid[uuid]`
|
||||
- Check if ALL its MissingDeps are now satisfied (canonical partitions for all refs are Live)
|
||||
- If satisfied: transition partition UpstreamBuilding → UpForRetry
|
||||
- Remove uuid from `downstream_waiting` entries (cleanup)
|
||||
|
||||
let to_remove: Vec<Uuid> = self.partitions_by_uuid
|
||||
.iter()
|
||||
.filter_map(|(uuid, partition)| {
|
||||
if !canonical_uuids.contains(uuid) && partition.created_at() < cutoff {
|
||||
Some(*uuid)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
5. **Want becomes schedulable:**
|
||||
- When partition transitions to UpForRetry, wants transition: UpstreamBuilding → Idle
|
||||
- Orchestrator sees Idle wants with UpForRetry canonical partitions → schedulable
|
||||
- New job queued → fresh UUID (uuid-2) generated
|
||||
- Partition uuid-2 created as Building, replaces uuid-1 in canonical_partitions
|
||||
- Partition uuid-1 (UpForRetry) remains in partitions_by_uuid as historical record
|
||||
|
||||
for uuid in to_remove {
|
||||
self.partitions_by_uuid.remove(&uuid);
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
6. **New wants during dep miss:**
|
||||
- Want arrives while partition is UpstreamBuilding → New → UpstreamBuilding (correctly waits)
|
||||
- Want arrives while partition is UpForRetry → New → Idle (correctly schedulable)
|
||||
|
||||
**Key properties:**
|
||||
- Building state acts as lease (no concurrent builds)
|
||||
- UpstreamBuilding also acts as lease (upstreams not ready, can't retry yet)
|
||||
- UpForRetry releases lease (upstreams ready, safe to schedule)
|
||||
- Failed releases lease but blocks new wants (hard failure, shouldn't retry)
|
||||
- `downstream_waiting` index enables O(1) lookup of affected partitions when upstreams complete
|
||||
|
||||
**Taint Handling:**
|
||||
|
||||
When partition is tainted (via TaintCreateEvent):
|
||||
|
||||
- Find current canonical UUID for the ref
|
||||
- Transition that partition instance to Tainted state (preserves history)
|
||||
- **Keep in `canonical_partitions`** - readers need to see it's tainted
|
||||
- Wants within TTL will see partition is tainted (not Live)
|
||||
- Orchestrator will schedule new jobs for those wants
|
||||
- New partition created with fresh UUID when next job starts
|
||||
- New partition replaces tainted one in `canonical_partitions`
|
||||
|
||||
### Phase 5: Migration and Cleanup
|
||||
|
||||
|
|
@ -602,12 +549,15 @@ impl BuildState {
|
|||
want.partitions = ["data/beta"] // ref, not UUID
|
||||
|
||||
// Want state determination
|
||||
let canonical_uuid = canonical_partitions["data/beta"];
|
||||
let partition = partitions_by_uuid[canonical_uuid];
|
||||
match partition.state {
|
||||
Building => want.state = Building,
|
||||
Live => want can complete,
|
||||
...
|
||||
if let Some(canonical_uuid) = canonical_partitions.get("data/beta") {
|
||||
let partition = partitions_by_uuid[canonical_uuid];
|
||||
match partition.state {
|
||||
Building => want.state = Building,
|
||||
Live => want can complete,
|
||||
...
|
||||
}
|
||||
} else {
|
||||
// No canonical partition exists yet → Idle
|
||||
}
|
||||
```
|
||||
|
||||
|
|
@ -630,14 +580,15 @@ JobRunBufferEventV1 {
|
|||
|
||||
### 3. UUID Generation: When?
|
||||
|
||||
**Decision:** Generate UUID during event processing (in handle_want_create, when partition created).
|
||||
**Decision:** Orchestrator generates UUIDs when queuing jobs, includes them in JobRunBufferEventV1.
|
||||
|
||||
**Rationale:**
|
||||
- Events remain deterministic
|
||||
- UUID generation during replay works correctly
|
||||
- Maintains event sourcing principles
|
||||
- UUIDs represent specific build attempts, not partition refs
|
||||
- Orchestrator is source of truth for "start building these partitions"
|
||||
- Event contains UUIDs, making replay deterministic (same UUIDs in event)
|
||||
- No UUID generation during event processing - UUIDs are in the event itself
|
||||
|
||||
**Not in the event itself:** Would require client-side UUID generation, breaks deterministic replay.
|
||||
**Key insight:** The orchestrator generates UUIDs (not BuildState during event handling). This makes UUIDs part of the immutable event log.
|
||||
|
||||
### 4. Canonical Partition: One at a Time
|
||||
|
||||
|
|
@ -652,7 +603,7 @@ JobRunBufferEventV1 {
|
|||
|
||||
**Alternative considered:** Multiple concurrent builds with versioning
|
||||
- Significantly more complex
|
||||
- Defer to future work
|
||||
- No existing need for this
|
||||
|
||||
### 5. Event Format: UUID as String
|
||||
|
||||
|
|
@ -684,11 +635,13 @@ JobRunBufferEventV1 {
|
|||
### Integration Tests
|
||||
|
||||
1. **Multi-want scenario** (reproduces original bug)
|
||||
- Want 1 created → partition Missing → Idle
|
||||
- Job scheduled → partition Building (uuid-1)
|
||||
- Wants 2-4 created → see partition Building → directly to Building
|
||||
- Want 1 created → New → no partition exists → Idle
|
||||
- Job scheduled → orchestrator generates uuid-1 → partition created Building
|
||||
- Want 1 transitions Idle → Building (via job buffer event)
|
||||
- Wants 2-4 created → New → partition Building (uuid-1) → Building
|
||||
- All 4 wants reference same canonical partition uuid-1
|
||||
- Job dep miss → all transition to UpstreamBuilding correctly
|
||||
- Verifies New state transitions and state sensing work correctly
|
||||
|
||||
2. **Rebuild scenario**
|
||||
- Partition built → Live (uuid-1)
|
||||
|
|
@ -704,52 +657,6 @@ JobRunBufferEventV1 {
|
|||
- Want completes → partition remains in history
|
||||
- Partition expires → new UUID for rebuild, canonical updated
|
||||
|
||||
## Future Work
|
||||
|
||||
### 1. Partition Lineage Graph
|
||||
|
||||
Build explicit lineage tracking:
|
||||
|
||||
```rust
|
||||
Partition {
|
||||
uuid: uuid-3,
|
||||
partition_ref: "data/beta",
|
||||
previous_uuid: Some(uuid-2),
|
||||
derived_from: vec![uuid-4, uuid-5], // Upstream dependencies
|
||||
}
|
||||
```
|
||||
|
||||
Enables:
|
||||
- "What was the full dependency graph when this partition was built?"
|
||||
- "How did data propagate through the system over time?"
|
||||
|
||||
### 2. Partition Provenance
|
||||
|
||||
Track complete build history:
|
||||
|
||||
```rust
|
||||
Partition {
|
||||
uuid: uuid-1,
|
||||
provenance: Provenance {
|
||||
built_by_job: "job-123",
|
||||
source_code_version: "abc123",
|
||||
build_timestamp: 1234567890,
|
||||
input_partitions: vec![uuid-2, uuid-3],
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 3. Multi-Generation Partitions
|
||||
|
||||
Support concurrent builds of different generations:
|
||||
|
||||
```rust
|
||||
canonical_partitions: HashMap<String, Vec<(Generation, Uuid)>>
|
||||
// "data/beta" → [(v1, uuid-1), (v2, uuid-2)]
|
||||
```
|
||||
|
||||
Users can request specific generations or "latest."
|
||||
|
||||
## Summary
|
||||
|
||||
Adding partition UUIDs solves fundamental architectural problems:
|
||||
|
|
@ -759,7 +666,6 @@ Adding partition UUIDs solves fundamental architectural problems:
|
|||
- **Wants reference refs**: Want state based on canonical partition for their ref
|
||||
- **Discoverable relationships**: Remove redundant snapshot data (WantAttributedPartitions)
|
||||
- **Proper semantics**: Want state reflects actual canonical partition state
|
||||
- **Historical queries**: Can query past partition states via UUID
|
||||
|
||||
**Key principle:** Wants care about "what's the current state of data/beta?" (refs), while jobs and historical queries care about "what happened to this specific partition instance?" (UUIDs).
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue