From 650880974521ea5df153d1b951825bb3ff2b8221 Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Mon, 24 Nov 2025 10:19:24 +0800 Subject: [PATCH] Add partition refactor plan --- docs/plans/api.md | 577 ---------------------- docs/plans/partitions-refactor.md | 766 ++++++++++++++++++++++++++++++ 2 files changed, 766 insertions(+), 577 deletions(-) delete mode 100644 docs/plans/api.md create mode 100644 docs/plans/partitions-refactor.md diff --git a/docs/plans/api.md b/docs/plans/api.md deleted file mode 100644 index 0059233..0000000 --- a/docs/plans/api.md +++ /dev/null @@ -1,577 +0,0 @@ -# Web Server Implementation Plan - -## Architecture Summary - -**Concurrency Model: Event Log Separation** -- Orchestrator runs synchronously in dedicated thread, owns BEL exclusively -- Web server reads from shared BEL storage, sends write commands via channel -- No locks on hot path, orchestrator stays single-threaded -- Eventual consistency for reads (acceptable since builds take time anyway) - -**Daemon Model:** -- Server binary started manually from workspace root (for now) -- Server tracks last request time, shuts down after idle timeout (default: 3 hours) -- HTTP REST on localhost random port -- Future: CLI can auto-discover/start server - ---- - -## Thread Model - -``` -Main Process -├─ HTTP Server (tokio multi-threaded runtime) -│ ├─ Request handlers (async, read from BEL storage) -│ └─ Command sender (send writes to orchestrator) -└─ Orchestrator Thread (std::thread, synchronous) - ├─ Receives commands via mpsc channel - ├─ Owns BEL (exclusive mutable access) - └─ Runs existing step() loop -``` - -**Read Path (Low Latency):** -1. HTTP request → Axum handler -2. Read events from shared BEL storage (no lock contention) -3. Reconstruct BuildState from events (can cache this) -4. Return response - -**Write Path (Strong Consistency):** -1. HTTP request → Axum handler -2. Send command via channel to orchestrator -3. Orchestrator processes command in its thread -4. Reply sent back via oneshot channel -5. Return response - -**Why This Works:** -- Orchestrator remains completely synchronous (no refactoring needed) -- Reads scale horizontally (multiple handlers, no locks) -- Writes are serialized through orchestrator (consistent with current model) -- Event sourcing means reads can be eventually consistent - ---- - -## Phase 1: Foundation - Make BEL Storage Thread-Safe - -**Goal:** Allow BEL storage to be safely shared between orchestrator and web server - -**Tasks:** -1. Add `Send + Sync` bounds to `BELStorage` trait -2. Wrap `SqliteBELStorage::connection` in `Arc>` or use r2d2 pool -3. Add read-only methods to BELStorage: - - `list_events(offset: usize, limit: usize) -> Vec` - - `get_event(event_id: u64) -> Option` - - `latest_event_id() -> u64` -4. Add builder method to reconstruct BuildState from events: - - `BuildState::from_events(events: &[DataBuildEvent]) -> Self` - -**Files Modified:** -- `databuild/build_event_log.rs` - update trait and storage impls -- `databuild/build_state.rs` - add `from_events()` builder - -**Acceptance Criteria:** -- `BELStorage` trait has `Send + Sync` bounds -- Can clone `Arc` and use from multiple threads -- Can reconstruct BuildState from events without mutating storage - ---- - -## Phase 2: Web Server - HTTP API with Axum - -**Goal:** HTTP server serving read/write APIs - -**Tasks:** -1. Add dependencies to MODULE.bazel: - ```python - crate.spec(package = "tokio", features = ["full"], version = "1.0") - crate.spec(package = "axum", version = "0.7") - crate.spec(package = "tower", version = "0.4") - crate.spec(package = "tower-http", features = ["trace", "cors"], version = "0.5") - ``` - -2. Create `databuild/http_server.rs` module with: - - `AppState` struct holding: - - `bel_storage: Arc` - shared read access - - `command_tx: mpsc::Sender` - channel to orchestrator - - `last_request_time: Arc` - for idle tracking - - Axum router with all endpoints - - Handler functions delegating to existing `api_handle_*` methods - -3. API Endpoints: - ``` - GET /health → health check - GET /api/wants → list_wants - POST /api/wants → create_want - GET /api/wants/:id → get_want - DELETE /api/wants/:id → cancel_want - GET /api/partitions → list_partitions - GET /api/job_runs → list_job_runs - GET /api/job_runs/:id/logs/stdout → stream_logs (stub) - ``` - -4. Handler pattern (reads): - ```rust - async fn list_wants( - State(state): State, - Query(params): Query, - ) -> Json { - // Read events from storage - let events = state.bel_storage.list_events(0, 10000)?; - - // Reconstruct state - let build_state = BuildState::from_events(&events); - - // Use existing API method - Json(build_state.list_wants(¶ms.into())) - } - ``` - -5. Handler pattern (writes): - ```rust - async fn create_want( - State(state): State, - Json(req): Json, - ) -> Json { - // Send command to orchestrator - let (reply_tx, reply_rx) = oneshot::channel(); - state.command_tx.send(Command::CreateWant(req, reply_tx)).await?; - - // Wait for orchestrator reply - let response = reply_rx.await?; - Json(response) - } - ``` - -**Files Created:** -- `databuild/http_server.rs` - new module - -**Files Modified:** -- `databuild/lib.rs` - add `pub mod http_server;` -- `MODULE.bazel` - add dependencies - -**Acceptance Criteria:** -- Server starts on localhost random port, prints "Listening on http://127.0.0.1:XXXXX" -- All read endpoints return correct JSON responses -- Write endpoints return stub responses (Phase 4 will connect to orchestrator) - ---- - -## Phase 3: CLI - HTTP Client - -**Goal:** CLI that sends HTTP requests to running server - -**Tasks:** -1. Add dependencies to MODULE.bazel: - ```python - crate.spec(package = "clap", features = ["derive"], version = "4.0") - crate.spec(package = "reqwest", features = ["blocking", "json"], version = "0.11") - ``` - -2. Create `databuild/bin/databuild.rs` main binary: - ```rust - #[derive(Parser)] - #[command(name = "databuild")] - enum Cli { - /// Start the databuild server - Serve(ServeArgs), - - /// Create a want for partitions - Build(BuildArgs), - - /// Want operations - Want(WantCommand), - - /// Stream job run logs - Logs(LogsArgs), - } - - #[derive(Args)] - struct ServeArgs { - #[arg(long, default_value = "8080")] - port: u16, - } - - #[derive(Subcommand)] - enum WantCommand { - Create(CreateWantArgs), - List, - Get { want_id: String }, - Cancel { want_id: String }, - } - ``` - -3. Server address discovery: - - For now: hardcode `http://localhost:8080` or accept `--server-url` flag - - Future: read from `.databuild/server.json` file - -4. HTTP client implementation: - ```rust - fn list_wants(server_url: &str) -> Result> { - let client = reqwest::blocking::Client::new(); - let resp = client.get(&format!("{}/api/wants", server_url)) - .send()? - .json::()?; - Ok(resp.data) - } - ``` - -5. Commands: - - `databuild serve --port 8080` - Start server (blocks) - - `databuild build part1 part2` - Create want for partitions - - `databuild want list` - List all wants - - `databuild want get ` - Get specific want - - `databuild want cancel ` - Cancel want - - `databuild logs ` - Stream logs (stub) - -**Files Created:** -- `databuild/bin/databuild.rs` - new CLI binary - -**Files Modified:** -- `databuild/BUILD.bazel` - add `rust_binary` target for databuild CLI - -**Acceptance Criteria:** -- Can run `databuild serve` to start server -- Can run `databuild want list` in another terminal and see wants -- Commands print pretty JSON or formatted tables - ---- - -## Phase 4: Orchestrator Integration - Command Channel - -**Goal:** Connect orchestrator to web server via message passing - -**Tasks:** -1. Create `databuild/commands.rs` with command enum: - ```rust - pub enum Command { - CreateWant(CreateWantRequest, oneshot::Sender), - CancelWant(CancelWantRequest, oneshot::Sender), - // Only write operations need commands - } - ``` - -2. Update `Orchestrator`: - - Add `command_rx: mpsc::Receiver` field - - In `step()` method, before polling: - ```rust - // Process all pending commands - while let Ok(cmd) = self.command_rx.try_recv() { - match cmd { - Command::CreateWant(req, reply) => { - let resp = self.bel.api_handle_want_create(req); - let _ = reply.send(resp); // Ignore send errors - } - // ... other commands - } - } - ``` - -3. Create server startup function in `http_server.rs`: - ```rust - pub fn start_server( - bel_storage: Arc, - port: u16, - ) -> (JoinHandle<()>, mpsc::Sender) { - let (cmd_tx, cmd_rx) = mpsc::channel(100); - - // Spawn orchestrator in background thread - let orch_bel = bel_storage.clone(); - let orch_handle = std::thread::spawn(move || { - let mut orch = Orchestrator::new_with_commands(orch_bel, cmd_rx); - orch.join().unwrap(); - }); - - // Start HTTP server in tokio runtime - let runtime = tokio::runtime::Runtime::new().unwrap(); - let http_handle = runtime.spawn(async move { - let app_state = AppState { - bel_storage, - command_tx: cmd_tx.clone(), - last_request_time: Arc::new(AtomicU64::new(0)), - }; - - let app = create_router(app_state); - let addr = SocketAddr::from(([127, 0, 0, 1], port)); - axum::Server::bind(&addr) - .serve(app.into_make_service()) - .await - .unwrap(); - }); - - (http_handle, cmd_tx) - } - ``` - -4. Update `databuild serve` command to use `start_server()` - -**Files Created:** -- `databuild/commands.rs` - new module - -**Files Modified:** -- `databuild/orchestrator.rs` - accept command channel, process in `step()` -- `databuild/http_server.rs` - send commands for writes -- `databuild/bin/databuild.rs` - use `start_server()` in `serve` command - -**Acceptance Criteria:** -- Creating a want via HTTP actually creates it in BuildState -- Orchestrator processes commands without blocking its main loop -- Can observe wants being scheduled into job runs - ---- - -## Phase 5: Daemon Lifecycle - Auto-Shutdown - -**Goal:** Server shuts down gracefully after idle timeout - -**Tasks:** -1. Update AppState to track last request time: - ```rust - pub struct AppState { - bel_storage: Arc, - command_tx: mpsc::Sender, - last_request_time: Arc, // epoch millis - shutdown_tx: broadcast::Sender<()>, - } - ``` - -2. Add Tower middleware to update timestamp: - ```rust - async fn update_last_request_time( - State(state): State, - req: Request, - next: Next, - ) -> Response { - state.last_request_time.store( - SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64, - Ordering::Relaxed, - ); - next.run(req).await - } - ``` - -3. Background idle checker task: - ```rust - tokio::spawn(async move { - let idle_timeout = Duration::from_secs(3 * 60 * 60); // 3 hours - - loop { - tokio::time::sleep(Duration::from_secs(60)).await; - - let last_request = state.last_request_time.load(Ordering::Relaxed); - let now = SystemTime::now()...; - - if now - last_request > idle_timeout.as_millis() as u64 { - eprintln!("Server idle for {} hours, shutting down", - idle_timeout.as_secs() / 3600); - shutdown_tx.send(()).unwrap(); - break; - } - } - }); - ``` - -4. Graceful shutdown handling: - ```rust - let app = create_router(state); - axum::Server::bind(&addr) - .serve(app.into_make_service()) - .with_graceful_shutdown(async { - shutdown_rx.recv().await.ok(); - }) - .await?; - ``` - -5. Cleanup on shutdown: - - Orchestrator: finish current step, don't start new one - - HTTP server: stop accepting new connections, finish in-flight requests - - Log: "Shutdown complete" - -**Files Modified:** -- `databuild/http_server.rs` - add idle tracking, shutdown logic -- `databuild/orchestrator.rs` - accept shutdown signal, check before each step - -**Acceptance Criteria:** -- Server shuts down after configured idle timeout -- In-flight requests complete successfully during shutdown -- Shutdown is logged clearly - ---- - -## Phase 6: Testing & Polish - -**Goal:** End-to-end testing and production readiness - -**Tasks:** -1. Integration tests: - ```rust - #[test] - fn test_server_lifecycle() { - // Start server - let (handle, port) = start_test_server(); - - // Make requests - let wants = reqwest::blocking::get( - &format!("http://localhost:{}/api/wants", port) - ).unwrap().json::().unwrap(); - - // Stop server - handle.shutdown(); - } - ``` - -2. Error handling improvements: - - Proper HTTP status codes (400, 404, 500) - - Structured error responses: - ```json - {"error": "Want not found", "want_id": "abc123"} - ``` - - Add `tracing` crate for structured logging - -3. Add CORS middleware for web app: - ```rust - let cors = CorsLayer::new() - .allow_origin("http://localhost:3000".parse::().unwrap()) - .allow_methods([Method::GET, Method::POST, Method::DELETE]); - - app.layer(cors) - ``` - -4. Health check endpoint: - ```rust - async fn health() -> &'static str { - "OK" - } - ``` - -5. Optional: Metrics endpoint (prometheus format): - ```rust - async fn metrics() -> String { - format!( - "# HELP databuild_wants_total Total number of wants\n\ - databuild_wants_total {}\n\ - # HELP databuild_job_runs_total Total number of job runs\n\ - databuild_job_runs_total {}\n", - want_count, job_run_count - ) - } - ``` - -**Files Created:** -- `databuild/tests/http_integration_test.rs` - integration tests - -**Files Modified:** -- `databuild/http_server.rs` - add CORS, health, metrics, better errors -- `MODULE.bazel` - add `tracing` dependency - -**Acceptance Criteria:** -- All endpoints have proper error handling -- CORS works for web app development -- Health check returns 200 OK -- Integration tests pass - ---- - -## Future Enhancements (Not in Initial Plan) - -### Workspace Auto-Discovery -- Walk up directory tree looking for `.databuild/` marker -- Store server metadata in `.databuild/server.json`: - ```json - { - "pid": 12345, - "port": 54321, - "started_at": "2025-01-22T10:30:00Z", - "workspace_root": "/Users/stuart/Projects/databuild" - } - ``` -- CLI auto-starts server if not running - -### Log Streaming (SSE) -- Implement `GET /api/job_runs/:id/logs/stdout?follow=true` -- Use Server-Sent Events for streaming -- Integrate with FileLogStore from logging.md plan - -### State Caching -- Cache reconstructed BuildState for faster reads -- Invalidate cache when new events arrive -- Use `tokio::sync::RwLock>` where u64 is latest_event_id - -### gRPC Support (If Needed) -- Add Tonic alongside Axum -- Share same orchestrator/command channel -- Useful for language-agnostic clients - ---- - -## Dependencies Summary - -New dependencies to add to `MODULE.bazel`: - -```python -# Async runtime -crate.spec(package = "tokio", features = ["full"], version = "1.0") - -# Web framework -crate.spec(package = "axum", version = "0.7") -crate.spec(package = "tower", version = "0.4") -crate.spec(package = "tower-http", features = ["trace", "cors"], version = "0.5") - -# CLI -crate.spec(package = "clap", features = ["derive"], version = "4.0") - -# HTTP client for CLI -crate.spec(package = "reqwest", features = ["blocking", "json"], version = "0.11") - -# Logging -crate.spec(package = "tracing", version = "0.1") -crate.spec(package = "tracing-subscriber", version = "0.3") -``` - ---- - -## Estimated Timeline - -- **Phase 1:** 2-3 hours (thread-safe BEL storage) -- **Phase 2:** 4-6 hours (HTTP server with Axum) -- **Phase 3:** 3-4 hours (basic CLI) -- **Phase 4:** 3-4 hours (orchestrator integration) -- **Phase 5:** 2-3 hours (idle shutdown) -- **Phase 6:** 4-6 hours (testing and polish) - -**Total:** ~18-26 hours for complete implementation - ---- - -## Design Rationale - -### Why Event Log Separation? - -**Alternatives Considered:** -1. **Shared State with RwLock**: Orchestrator holds write lock during `step()`, blocking all reads -2. **Actor Model**: Extra overhead from message passing for all operations - -**Why Event Log Separation Wins:** -- Orchestrator stays completely synchronous (no refactoring) -- Reads don't block writes (eventual consistency acceptable for build system) -- Natural fit with event sourcing architecture -- Can cache reconstructed state for even better read performance - -### Why Not gRPC? - -- User requirement: "JSON is a must" -- REST is more debuggable (curl, browser dev tools) -- gRPC adds complexity without clear benefit -- Can add gRPC later if needed (both can coexist) - -### Why Axum Over Actix? - -- Better compile-time type safety (extractors) -- Cleaner middleware composition (Tower) -- Native async/await (Actix uses actor model internally) -- More ergonomic for this use case - -### Why Per-Workspace Server? - -- Isolation: builds in different projects don't interfere -- Simpler: no need to route requests by workspace -- Matches Bazel's model (users already understand it) -- Easier to reason about resource usage diff --git a/docs/plans/partitions-refactor.md b/docs/plans/partitions-refactor.md new file mode 100644 index 0000000..90bafa3 --- /dev/null +++ b/docs/plans/partitions-refactor.md @@ -0,0 +1,766 @@ +# Partition Identity Refactor: Adding UUIDs for Temporal Consistency + +## Problem Statement + +### Current Architecture + +Partitions are currently keyed only by their reference string (e.g., "data/beta"): + +```rust +partitions: HashMap // ref → partition +``` + +When a partition transitions through states (Missing → Building → Live → Tainted), **it's the same object mutating**. This creates several architectural problems: + +### Core Issue: Lack of Temporal Identity + +**The fundamental problem:** We cannot distinguish between "the partition being built now" and "the partition built yesterday" or "the partition that will be built tomorrow." + +This manifests in several ways: + +1. **Ambiguous Job-Partition Relationships** + - When job J completes, which partition instance did it build? + - If partition is rebuilt, we lose information about previous builds + - Can't answer: "What was the state of data/beta when job Y ran?" + +2. **State Mutation Loss** + - Once a partition transitions Live → Tainted → Missing, the Live state information is lost + - Can't track "Partition P was built successfully by job J at time T" + - Lineage and provenance information disappears on each rebuild + +3. **Redundant Data Structures** (Symptoms) + - `WantAttributedPartitions` in `JobRunDetail` exists to snapshot want-partition relationships + - Partitions carry `want_ids: Vec` that get cleared/modified as partitions transition + - Jobs need to capture relationships at creation time because they can't be reliably reconstructed later + +### Concrete Bug Example + +The bug that led to this design discussion illustrates the problem: + +``` +1. Want 1 created for "data/beta" → partition becomes Building +2. Want 2 created for "data/beta" → but partition is ALREADY Building +3. Job has dep miss → creates derivative want +4. System expects all wants to be Building/UpstreamBuilding, but Want 2 is Idle → panic +``` + +**Root cause:** All wants reference the same mutable partition object. We can't distinguish: +- "The partition instance Want 1 triggered" +- "The partition instance Want 2 is waiting for" +- They're the same object, but semantically they represent different temporal relationships + +## Proposed Solution: Partition UUIDs + +### Architecture Changes + +**Two-level indexing:** + +```rust +// All partition instances, keyed by UUID +partitions_by_uuid: HashMap + +// Current/canonical partition for each ref +canonical_partitions: HashMap +``` + +### Key Properties + +1. **Immutable Identity**: Each partition build gets a unique UUID + - `Partition(uuid-1, ref="data/beta", state=Building)` is a distinct entity + - When rebuilt, create `Partition(uuid-2, ref="data/beta", state=Missing)` + - Both can coexist; uuid-1 represents historical fact, uuid-2 is current state + +2. **Stable Job References**: Jobs reference the specific partition UUIDs they built + ```rust + JobRunBufferEventV1 { + building_partition_uuids: [uuid-1, uuid-2] // Specific instances being built + } + ``` + +3. **Wants Reference Refs**: Wants continue to reference partition refs, not UUIDs + ```rust + WantCreateEventV1 { + partitions: ["data/beta"] // User-facing reference + } + // Want's state determined by canonical partition for "data/beta" + ``` + +4. **Temporal Queries**: Can reconstruct state at any point + - "What was partition uuid-1's state when job J ran?" → Look up uuid-1, it's immutable + - "Which wants were waiting for data/beta at time T?" → Check canonical partition at T + - "What's the current state of data/beta?" → canonical_partitions["data/beta"] → uuid-2 + +## Benefits + +### 1. Removes WantAttributedPartitions Redundancy + +**Before:** +```rust +JobRunBufferEventV1 { + building_partitions: [PartitionRef("data/beta")], + // Redundant: snapshot want-partition relationship + servicing_wants: [WantAttributedPartitions { + want_id: "w1", + partitions: ["data/beta"] + }] +} +``` + +**After:** +```rust +JobRunBufferEventV1 { + building_partition_uuids: [uuid-1, uuid-2] +} + +// To find serviced wants: +for uuid in job.building_partition_uuids { + let partition = partitions_by_uuid[uuid]; + for want_id in partition.want_ids { + // transition want + } +} +``` + +The relationship is **discoverable** via stable partition UUID, not **baked-in** at event creation. + +### 2. Proper State Semantics for Wants + +**Current (problematic):** +``` +Want 1 → triggers build → Building (owns the job somehow?) +Want 2 → sees partition Building → stays Idle (different from Want 1?) +Want 3 → same partition → also Idle +``` + +**With UUIDs:** +``` +Partition(uuid-1, "data/beta") created as Missing +Want 1 arrives → checks canonical["data/beta"] = uuid-1 (Missing) → Idle → schedules job +Job starts → uuid-1 becomes Building, canonical still points to uuid-1 +Want 2 arrives → checks canonical["data/beta"] = uuid-1 (Building) → directly to Building +Want 3 arrives → checks canonical["data/beta"] = uuid-1 (Building) → directly to Building +Want 4 arrives → checks canonical["data/beta"] = uuid-1 (Building) → directly to Building +``` + +All 4 wants have **identical relationship** to the canonical partition. The state reflects reality: "is the canonical partition for my ref being built?" + +**Key insight:** Wants don't bind to UUIDs. They look up the canonical partition for their ref and base their state on that. + +### 3. Historical Lineage + +```rust +// Track partition lineage over time +Partition { + uuid: uuid-3, + partition_ref: "data/beta", + previous_uuid: Some(uuid-2), // Link to previous instance + created_at: 1234567890, + state: Live, + produced_by_job: Some("job-xyz"), +} +``` + +Can answer: +- "What partitions existed for this ref over time?" +- "Which job produced this specific partition instance?" +- "What was the dependency chain when this partition was built?" + +## Implementation Plan + +### Phase 1: Add UUID Infrastructure (Non-Breaking) + +**Goals:** +- Add UUID field to Partition +- Create dual indexing (by UUID and by ref) +- Maintain backward compatibility + +**Changes:** + +1. **Update Partition struct** (databuild/partition_state.rs) + ```rust + pub struct PartitionWithState { + pub uuid: Uuid, // NEW + pub partition_ref: PartitionRef, + pub want_ids: Vec, + pub state: S, + } + ``` + +2. **Add dual indexing** (databuild/build_state.rs) + ```rust + pub struct BuildState { + partitions_by_uuid: BTreeMap, // NEW + canonical_partitions: BTreeMap, // NEW + partitions: BTreeMap, // DEPRECATED, keep for now + // ... + } + ``` + +3. **Update partition creation** + - When creating partition (Missing state), generate UUID + - Store in both maps: `partitions_by_uuid[uuid]` and `canonical_partitions[ref] = uuid` + - Keep `partitions[ref]` updated for backward compatibility + +4. **Add helper methods** + ```rust + impl BuildState { + fn get_canonical_partition(&self, ref: &str) -> Option<&Partition> { + self.canonical_partitions + .get(ref) + .and_then(|uuid| self.partitions_by_uuid.get(uuid)) + } + + fn get_canonical_partition_uuid(&self, ref: &str) -> Option { + self.canonical_partitions.get(ref).copied() + } + + fn get_partition_by_uuid(&self, uuid: &Uuid) -> Option<&Partition> { + self.partitions_by_uuid.get(uuid) + } + } + ``` + +### Phase 2: Update Want State Logic + +**Goals:** +- Wants determine state based on canonical partition +- Remove schedulability check for building partitions (no longer needed) + +**Changes:** + +1. **Update handle_want_create()** (databuild/build_state.rs) + ```rust + fn handle_want_create(&mut self, event: &WantCreateEventV1) -> Vec { + // Create want in Idle state initially + let want_idle: WantWithState = event.clone().into(); + + // Check canonical partition states to determine want's actual initial state + let has_building_partitions = event.partitions.iter().any(|pref| { + matches!( + self.get_canonical_partition(&pref.r#ref), + Some(Partition::Building(_)) + ) + }); + + let want = if has_building_partitions { + // Canonical partition is Building → Want starts in Building + tracing::info!( + want_id = %event.want_id, + "Want created in Building state (canonical partition is building)" + ); + Want::Building(want_idle.start_building(current_timestamp())) + } else { + // Canonical partition not Building → Want starts in Idle + tracing::info!( + want_id = %event.want_id, + "Want created in Idle state" + ); + Want::Idle(want_idle) + }; + + self.wants.insert(event.want_id.clone(), want); + + // Register want with partitions + for pref in &event.partitions { + self.add_want_to_partition(pref, &event.want_id); + } + + // Handle derivative wants if applicable + if let Some(source) = &event.source { + if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source { + self.handle_derivative_want_creation( + &event.want_id, + &event.partitions, + &job_triggered.job_run_id, + ); + } + } + + vec![] + } + ``` + +2. **Simplify WantSchedulability** (databuild/build_state.rs) + ```rust + // Remove `building` field from WantUpstreamStatus + pub struct WantUpstreamStatus { + pub live: Vec, + pub tainted: Vec, + pub missing: Vec, + // REMOVED: pub building: Vec, + } + + impl WantSchedulability { + pub fn is_schedulable(&self) -> bool { + // Simplified: only check upstreams + // Building partitions now handled at want creation + self.status.missing.is_empty() && self.status.tainted.is_empty() + } + } + ``` + +3. **Update derivative want handling** (databuild/build_state.rs) + ```rust + fn handle_derivative_want_creation(...) { + // ...existing logic... + + for want_id in impacted_want_ids { + let want = self.wants.remove(&want_id).expect(...); + let transitioned = match want { + // Idle wants can exist if they arrived after job started but before dep miss + Want::Idle(idle) => { + tracing::info!( + want_id = %want_id, + derivative_want_id = %derivative_want_id, + "Want: Idle → UpstreamBuilding (partition dep miss detected)" + ); + Want::UpstreamBuilding( + idle.detect_missing_deps(vec![derivative_want_id.to_string()]) + ) + } + Want::Building(building) => { + // Building → UpstreamBuilding + // ... existing logic ... + } + Want::UpstreamBuilding(upstream) => { + // UpstreamBuilding → UpstreamBuilding (add another upstream) + // ... existing logic ... + } + _ => { + panic!( + "BUG: Want {} in invalid state {:?}. Should be Idle, Building, or UpstreamBuilding.", + want_id, want + ); + } + }; + self.wants.insert(want_id, transitioned); + } + } + ``` + +4. **Add Idle → UpstreamBuilding transition** (databuild/want_state.rs) + ```rust + impl WantWithState { + // ... existing methods ... + + /// Transition from Idle to UpstreamBuilding when dependencies are missing + /// This can happen if want arrives while partition is building, then job has dep miss + pub fn detect_missing_deps( + self, + upstream_want_ids: Vec, + ) -> WantWithState { + WantWithState { + want: self.want.updated_timestamp(), + state: UpstreamBuildingState { upstream_want_ids }, + } + } + } + ``` + +### Phase 3: Update Job Events + +**Goals:** +- Jobs reference partition UUIDs, not just refs +- Remove WantAttributedPartitions redundancy + +**Changes:** + +1. **Update JobRunBufferEventV1** (databuild/databuild.proto) + ```protobuf + message JobRunBufferEventV1 { + string job_run_id = 1; + string job_label = 2; + repeated string building_partition_uuids = 3; // NEW: UUIDs instead of refs + repeated PartitionRef building_partitions = 4; // DEPRECATED: keep for migration + repeated WantAttributedPartitions servicing_wants = 5; // DEPRECATED: remove later + } + ``` + +2. **Update handle_job_run_buffer()** (databuild/build_state.rs) + ```rust + fn handle_job_run_buffer(&mut self, event: &JobRunBufferEventV1) -> Vec { + // Parse UUIDs from event + let building_uuids: Vec = event.building_partition_uuids + .iter() + .map(|s| Uuid::parse_str(s).expect("Valid UUID")) + .collect(); + + // Find all wants for these partition UUIDs + let mut impacted_want_ids: HashSet = HashSet::new(); + for uuid in &building_uuids { + if let Some(partition) = self.partitions_by_uuid.get(uuid) { + for want_id in partition.want_ids() { + impacted_want_ids.insert(want_id.clone()); + } + } + } + + // Transition wants to Building + for want_id in impacted_want_ids { + let want = self.wants.remove(&want_id).expect("Want must exist"); + let transitioned = match want { + Want::Idle(idle) => Want::Building(idle.start_building(current_timestamp())), + Want::Building(building) => Want::Building(building), // Already building + _ => panic!("Invalid state for job buffer: {:?}", want), + }; + self.wants.insert(want_id, transitioned); + } + + // Transition partitions to Building by UUID + for uuid in building_uuids { + if let Some(partition) = self.partitions_by_uuid.remove(&uuid) { + let building = match partition { + Partition::Missing(missing) => { + Partition::Building(missing.start_building(event.job_run_id.clone())) + } + _ => panic!("Partition {:?} not in Missing state", uuid), + }; + self.partitions_by_uuid.insert(uuid, building); + } + } + + // Create job run + let queued: JobRunWithState = event.clone().into(); + self.job_runs.insert(event.job_run_id.clone(), JobRun::Queued(queued)); + + vec![] + } + ``` + +3. **Update Orchestrator** (databuild/orchestrator.rs) + ```rust + fn queue_job(&mut self, wg: WantGroup) -> Result<(), DatabuildError> { + // Get partition refs from wants + let wanted_refs: Vec = wg.wants + .iter() + .flat_map(|want| want.partitions.clone()) + .collect(); + + // Resolve refs to canonical UUIDs + let building_partition_uuids: Vec = wanted_refs + .iter() + .filter_map(|pref| { + self.bel.state.get_canonical_partition_uuid(&pref.r#ref) + .map(|uuid| uuid.to_string()) + }) + .collect(); + + let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 { + job_run_id: job_run_id.to_string(), + job_label: wg.job.label, + building_partition_uuids, // Use canonical UUIDs + building_partitions: vec![], // Deprecated + servicing_wants: vec![], // Deprecated + }); + + self.append_and_broadcast(&job_buffer_event)?; + self.job_runs.push(job_run); + Ok(()) + } + ``` + +### Phase 4: Partition Lifecycle Management + +**Goals:** +- Define when new partition UUIDs are created +- Handle canonical partition transitions +- Implement cleanup/GC + +**Canonical Partition Transitions:** + +New partition UUID created when: +1. **First build**: Partition doesn't exist → create Partition(uuid, Missing) +2. **Taint**: Partition tainted → create new Partition(uuid-new, Missing), update canonical +3. **Expiration**: TTL exceeded → create new Partition(uuid-new, Missing), update canonical +4. **Manual rebuild**: Explicit rebuild request → create new Partition(uuid-new, Missing), update canonical + +**Implementation:** + +```rust +impl BuildState { + /// Create a new partition instance for a ref, updating canonical pointer + fn create_new_partition_instance(&mut self, partition_ref: &PartitionRef) -> Uuid { + let new_uuid = Uuid::new_v4(); + let new_partition = Partition::new_missing_with_uuid( + new_uuid, + partition_ref.clone() + ); + + // Update canonical pointer (old UUID becomes historical) + self.canonical_partitions.insert( + partition_ref.r#ref.clone(), + new_uuid + ); + + // Store new partition + self.partitions_by_uuid.insert(new_uuid, new_partition); + + // Old partition remains in partitions_by_uuid for historical queries + + new_uuid + } + + /// Handle partition taint - creates new instance + fn taint_partition(&mut self, partition_ref: &str) -> Uuid { + // Mark current partition as Tainted + if let Some(current_uuid) = self.canonical_partitions.get(partition_ref) { + if let Some(partition) = self.partitions_by_uuid.get_mut(current_uuid) { + // Transition to Tainted state (keep UUID) + *partition = match partition { + Partition::Live(live) => { + Partition::Tainted(live.clone().mark_tainted()) + } + _ => panic!("Can only taint Live partitions"), + }; + } + } + + // Create new partition instance for rebuilding + self.create_new_partition_instance(&PartitionRef { + r#ref: partition_ref.to_string() + }) + } +} +``` + +**GC Strategy:** + +Time-based retention (recommended): +- Keep partition UUIDs for N days (default 30) +- Enables historical queries within retention window +- Predictable storage growth + +```rust +impl BuildState { + /// Remove partition UUIDs older than retention window + fn gc_old_partitions(&mut self, retention_days: u64) { + let cutoff = current_timestamp() - (retention_days * 86400 * 1_000_000_000); + + // Find UUIDs to remove (not canonical + older than cutoff) + let canonical_uuids: HashSet = self.canonical_partitions + .values() + .copied() + .collect(); + + let to_remove: Vec = self.partitions_by_uuid + .iter() + .filter_map(|(uuid, partition)| { + if !canonical_uuids.contains(uuid) && partition.created_at() < cutoff { + Some(*uuid) + } else { + None + } + }) + .collect(); + + for uuid in to_remove { + self.partitions_by_uuid.remove(&uuid); + } + } +} +``` + +### Phase 5: Migration and Cleanup + +**Goals:** +- Remove deprecated fields +- Update API responses +- Complete migration + +**Changes:** + +1. **Remove deprecated fields from protobuf** + - `building_partitions` from `JobRunBufferEventV1` + - `servicing_wants` from `JobRunBufferEventV1` + - `WantAttributedPartitions` message + +2. **Remove backward compatibility code** + - `partitions: BTreeMap` from `BuildState` + - Dual writes/reads + +3. **Update API responses** to include UUIDs where relevant + - JobRunDetail can include partition UUIDs built + - PartitionDetail can include UUID for debugging + +4. **Update tests** to use UUID-based assertions + +## Design Decisions & Trade-offs + +### 1. Wants Reference Refs, Not UUIDs + +**Decision:** Wants always reference partition refs (e.g., "data/beta"), not UUIDs. + +**Rationale:** +- User requests "data/beta" - the current/canonical partition for that ref +- Want state is based on canonical partition: "is the current partition for my ref being built?" +- If partition gets tainted/rebuilt, wants see the new canonical partition automatically +- Simpler mental model: want doesn't care about historical instances + +**How it works:** +```rust +// Want creation +want.partitions = ["data/beta"] // ref, not UUID + +// Want state determination +let canonical_uuid = canonical_partitions["data/beta"]; +let partition = partitions_by_uuid[canonical_uuid]; +match partition.state { + Building => want.state = Building, + Live => want can complete, + ... +} +``` + +### 2. Jobs Reference UUIDs, Not Refs + +**Decision:** Jobs reference the specific partition UUIDs they built. + +**Rationale:** +- Jobs build specific partition instances +- Historical record: "Job J built Partition(uuid-1)" +- Even if partition is later tainted/rebuilt, job's record is immutable +- Enables provenance: "Which job built this specific partition?" + +**How it works:** +```rust +JobRunBufferEventV1 { + building_partition_uuids: [uuid-1, uuid-2] // Specific instances +} +``` + +### 3. UUID Generation: When? + +**Decision:** Generate UUID during event processing (in handle_want_create, when partition created). + +**Rationale:** +- Events remain deterministic +- UUID generation during replay works correctly +- Maintains event sourcing principles + +**Not in the event itself:** Would require client-side UUID generation, breaks deterministic replay. + +### 4. Canonical Partition: One at a Time + +**Decision:** Only one canonical partition per ref at a time. + +**Scenario handling:** +- Partition(uuid-1, "data/beta") is Building +- User requests rebuild → new want sees uuid-1 is Building → want becomes Building +- Want waits for uuid-1 to complete +- If uuid-1 completes successfully → want completes +- If uuid-1 fails or is tainted → new partition instance created (uuid-2), canonical updated + +**Alternative considered:** Multiple concurrent builds with versioning +- Significantly more complex +- Defer to future work + +### 5. Event Format: UUID as String + +**Decision:** Store UUIDs as strings in protobuf events. + +**Rationale:** +- Human-readable in logs/debugging +- Standard UUID string format (36 chars) +- Protobuf has no native UUID type + +**Trade-off:** Larger event size (36 bytes vs 16 bytes) - acceptable for debuggability. + +## Testing Strategy + +### Unit Tests + +1. **Partition UUID uniqueness** + - Creating partitions generates unique UUIDs + - Same ref at different times gets different UUIDs + +2. **Canonical partition tracking** + - canonical_partitions always points to current instance + - Old instances remain in partitions_by_uuid + +3. **Want state determination** + - Want checks canonical partition state + - Multiple wants see same canonical partition + +### Integration Tests + +1. **Multi-want scenario** (reproduces original bug) + - Want 1 created → partition Missing → Idle + - Job scheduled → partition Building (uuid-1) + - Wants 2-4 created → see partition Building → directly to Building + - All 4 wants reference same canonical partition uuid-1 + - Job dep miss → all transition to UpstreamBuilding correctly + +2. **Rebuild scenario** + - Partition built → Live (uuid-1) + - Partition tainted → new instance created (uuid-2), canonical updated + - New wants reference uuid-2 + - Old partition uuid-1 still queryable for history + +### End-to-End Tests + +1. **Full lifecycle** + - Want created → canonical partition determined + - Job runs → partition transitions through states + - Want completes → partition remains in history + - Partition expires → new UUID for rebuild, canonical updated + +## Future Work + +### 1. Partition Lineage Graph + +Build explicit lineage tracking: + +```rust +Partition { + uuid: uuid-3, + partition_ref: "data/beta", + previous_uuid: Some(uuid-2), + derived_from: vec![uuid-4, uuid-5], // Upstream dependencies +} +``` + +Enables: +- "What was the full dependency graph when this partition was built?" +- "How did data propagate through the system over time?" + +### 2. Partition Provenance + +Track complete build history: + +```rust +Partition { + uuid: uuid-1, + provenance: Provenance { + built_by_job: "job-123", + source_code_version: "abc123", + build_timestamp: 1234567890, + input_partitions: vec![uuid-2, uuid-3], + } +} +``` + +### 3. Multi-Generation Partitions + +Support concurrent builds of different generations: + +```rust +canonical_partitions: HashMap> +// "data/beta" → [(v1, uuid-1), (v2, uuid-2)] +``` + +Users can request specific generations or "latest." + +## Summary + +Adding partition UUIDs solves fundamental architectural problems: + +- **Temporal identity**: Distinguish partition instances over time +- **Stable job references**: Jobs reference immutable partition UUIDs they built +- **Wants reference refs**: Want state based on canonical partition for their ref +- **Discoverable relationships**: Remove redundant snapshot data (WantAttributedPartitions) +- **Proper semantics**: Want state reflects actual canonical partition state +- **Historical queries**: Can query past partition states via UUID + +**Key principle:** Wants care about "what's the current state of data/beta?" (refs), while jobs and historical queries care about "what happened to this specific partition instance?" (UUIDs). + +This refactor enables cleaner code, better observability, and proper event sourcing semantics throughout the system.