Add partition refactor plan

This commit is contained in:
Stuart Axelbrooke 2025-11-24 10:19:24 +08:00
parent 7846cd6b86
commit 6508809745
2 changed files with 766 additions and 577 deletions

View file

@ -1,577 +0,0 @@
# Web Server Implementation Plan
## Architecture Summary
**Concurrency Model: Event Log Separation**
- Orchestrator runs synchronously in dedicated thread, owns BEL exclusively
- Web server reads from shared BEL storage, sends write commands via channel
- No locks on hot path, orchestrator stays single-threaded
- Eventual consistency for reads (acceptable since builds take time anyway)
**Daemon Model:**
- Server binary started manually from workspace root (for now)
- Server tracks last request time, shuts down after idle timeout (default: 3 hours)
- HTTP REST on localhost random port
- Future: CLI can auto-discover/start server
---
## Thread Model
```
Main Process
├─ HTTP Server (tokio multi-threaded runtime)
│ ├─ Request handlers (async, read from BEL storage)
│ └─ Command sender (send writes to orchestrator)
└─ Orchestrator Thread (std::thread, synchronous)
├─ Receives commands via mpsc channel
├─ Owns BEL (exclusive mutable access)
└─ Runs existing step() loop
```
**Read Path (Low Latency):**
1. HTTP request → Axum handler
2. Read events from shared BEL storage (no lock contention)
3. Reconstruct BuildState from events (can cache this)
4. Return response
**Write Path (Strong Consistency):**
1. HTTP request → Axum handler
2. Send command via channel to orchestrator
3. Orchestrator processes command in its thread
4. Reply sent back via oneshot channel
5. Return response
**Why This Works:**
- Orchestrator remains completely synchronous (no refactoring needed)
- Reads scale horizontally (multiple handlers, no locks)
- Writes are serialized through orchestrator (consistent with current model)
- Event sourcing means reads can be eventually consistent
---
## Phase 1: Foundation - Make BEL Storage Thread-Safe
**Goal:** Allow BEL storage to be safely shared between orchestrator and web server
**Tasks:**
1. Add `Send + Sync` bounds to `BELStorage` trait
2. Wrap `SqliteBELStorage::connection` in `Arc<Mutex<Connection>>` or use r2d2 pool
3. Add read-only methods to BELStorage:
- `list_events(offset: usize, limit: usize) -> Vec<DataBuildEvent>`
- `get_event(event_id: u64) -> Option<DataBuildEvent>`
- `latest_event_id() -> u64`
4. Add builder method to reconstruct BuildState from events:
- `BuildState::from_events(events: &[DataBuildEvent]) -> Self`
**Files Modified:**
- `databuild/build_event_log.rs` - update trait and storage impls
- `databuild/build_state.rs` - add `from_events()` builder
**Acceptance Criteria:**
- `BELStorage` trait has `Send + Sync` bounds
- Can clone `Arc<SqliteBELStorage>` and use from multiple threads
- Can reconstruct BuildState from events without mutating storage
---
## Phase 2: Web Server - HTTP API with Axum
**Goal:** HTTP server serving read/write APIs
**Tasks:**
1. Add dependencies to MODULE.bazel:
```python
crate.spec(package = "tokio", features = ["full"], version = "1.0")
crate.spec(package = "axum", version = "0.7")
crate.spec(package = "tower", version = "0.4")
crate.spec(package = "tower-http", features = ["trace", "cors"], version = "0.5")
```
2. Create `databuild/http_server.rs` module with:
- `AppState` struct holding:
- `bel_storage: Arc<dyn BELStorage>` - shared read access
- `command_tx: mpsc::Sender<Command>` - channel to orchestrator
- `last_request_time: Arc<AtomicU64>` - for idle tracking
- Axum router with all endpoints
- Handler functions delegating to existing `api_handle_*` methods
3. API Endpoints:
```
GET /health → health check
GET /api/wants → list_wants
POST /api/wants → create_want
GET /api/wants/:id → get_want
DELETE /api/wants/:id → cancel_want
GET /api/partitions → list_partitions
GET /api/job_runs → list_job_runs
GET /api/job_runs/:id/logs/stdout → stream_logs (stub)
```
4. Handler pattern (reads):
```rust
async fn list_wants(
State(state): State<AppState>,
Query(params): Query<ListWantsParams>,
) -> Json<ListWantsResponse> {
// Read events from storage
let events = state.bel_storage.list_events(0, 10000)?;
// Reconstruct state
let build_state = BuildState::from_events(&events);
// Use existing API method
Json(build_state.list_wants(&params.into()))
}
```
5. Handler pattern (writes):
```rust
async fn create_want(
State(state): State<AppState>,
Json(req): Json<CreateWantRequest>,
) -> Json<CreateWantResponse> {
// Send command to orchestrator
let (reply_tx, reply_rx) = oneshot::channel();
state.command_tx.send(Command::CreateWant(req, reply_tx)).await?;
// Wait for orchestrator reply
let response = reply_rx.await?;
Json(response)
}
```
**Files Created:**
- `databuild/http_server.rs` - new module
**Files Modified:**
- `databuild/lib.rs` - add `pub mod http_server;`
- `MODULE.bazel` - add dependencies
**Acceptance Criteria:**
- Server starts on localhost random port, prints "Listening on http://127.0.0.1:XXXXX"
- All read endpoints return correct JSON responses
- Write endpoints return stub responses (Phase 4 will connect to orchestrator)
---
## Phase 3: CLI - HTTP Client
**Goal:** CLI that sends HTTP requests to running server
**Tasks:**
1. Add dependencies to MODULE.bazel:
```python
crate.spec(package = "clap", features = ["derive"], version = "4.0")
crate.spec(package = "reqwest", features = ["blocking", "json"], version = "0.11")
```
2. Create `databuild/bin/databuild.rs` main binary:
```rust
#[derive(Parser)]
#[command(name = "databuild")]
enum Cli {
/// Start the databuild server
Serve(ServeArgs),
/// Create a want for partitions
Build(BuildArgs),
/// Want operations
Want(WantCommand),
/// Stream job run logs
Logs(LogsArgs),
}
#[derive(Args)]
struct ServeArgs {
#[arg(long, default_value = "8080")]
port: u16,
}
#[derive(Subcommand)]
enum WantCommand {
Create(CreateWantArgs),
List,
Get { want_id: String },
Cancel { want_id: String },
}
```
3. Server address discovery:
- For now: hardcode `http://localhost:8080` or accept `--server-url` flag
- Future: read from `.databuild/server.json` file
4. HTTP client implementation:
```rust
fn list_wants(server_url: &str) -> Result<Vec<WantDetail>> {
let client = reqwest::blocking::Client::new();
let resp = client.get(&format!("{}/api/wants", server_url))
.send()?
.json::<ListWantsResponse>()?;
Ok(resp.data)
}
```
5. Commands:
- `databuild serve --port 8080` - Start server (blocks)
- `databuild build part1 part2` - Create want for partitions
- `databuild want list` - List all wants
- `databuild want get <id>` - Get specific want
- `databuild want cancel <id>` - Cancel want
- `databuild logs <job_run_id>` - Stream logs (stub)
**Files Created:**
- `databuild/bin/databuild.rs` - new CLI binary
**Files Modified:**
- `databuild/BUILD.bazel` - add `rust_binary` target for databuild CLI
**Acceptance Criteria:**
- Can run `databuild serve` to start server
- Can run `databuild want list` in another terminal and see wants
- Commands print pretty JSON or formatted tables
---
## Phase 4: Orchestrator Integration - Command Channel
**Goal:** Connect orchestrator to web server via message passing
**Tasks:**
1. Create `databuild/commands.rs` with command enum:
```rust
pub enum Command {
CreateWant(CreateWantRequest, oneshot::Sender<CreateWantResponse>),
CancelWant(CancelWantRequest, oneshot::Sender<CancelWantResponse>),
// Only write operations need commands
}
```
2. Update `Orchestrator`:
- Add `command_rx: mpsc::Receiver<Command>` field
- In `step()` method, before polling:
```rust
// Process all pending commands
while let Ok(cmd) = self.command_rx.try_recv() {
match cmd {
Command::CreateWant(req, reply) => {
let resp = self.bel.api_handle_want_create(req);
let _ = reply.send(resp); // Ignore send errors
}
// ... other commands
}
}
```
3. Create server startup function in `http_server.rs`:
```rust
pub fn start_server(
bel_storage: Arc<dyn BELStorage>,
port: u16,
) -> (JoinHandle<()>, mpsc::Sender<Command>) {
let (cmd_tx, cmd_rx) = mpsc::channel(100);
// Spawn orchestrator in background thread
let orch_bel = bel_storage.clone();
let orch_handle = std::thread::spawn(move || {
let mut orch = Orchestrator::new_with_commands(orch_bel, cmd_rx);
orch.join().unwrap();
});
// Start HTTP server in tokio runtime
let runtime = tokio::runtime::Runtime::new().unwrap();
let http_handle = runtime.spawn(async move {
let app_state = AppState {
bel_storage,
command_tx: cmd_tx.clone(),
last_request_time: Arc::new(AtomicU64::new(0)),
};
let app = create_router(app_state);
let addr = SocketAddr::from(([127, 0, 0, 1], port));
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
});
(http_handle, cmd_tx)
}
```
4. Update `databuild serve` command to use `start_server()`
**Files Created:**
- `databuild/commands.rs` - new module
**Files Modified:**
- `databuild/orchestrator.rs` - accept command channel, process in `step()`
- `databuild/http_server.rs` - send commands for writes
- `databuild/bin/databuild.rs` - use `start_server()` in `serve` command
**Acceptance Criteria:**
- Creating a want via HTTP actually creates it in BuildState
- Orchestrator processes commands without blocking its main loop
- Can observe wants being scheduled into job runs
---
## Phase 5: Daemon Lifecycle - Auto-Shutdown
**Goal:** Server shuts down gracefully after idle timeout
**Tasks:**
1. Update AppState to track last request time:
```rust
pub struct AppState {
bel_storage: Arc<dyn BELStorage>,
command_tx: mpsc::Sender<Command>,
last_request_time: Arc<AtomicU64>, // epoch millis
shutdown_tx: broadcast::Sender<()>,
}
```
2. Add Tower middleware to update timestamp:
```rust
async fn update_last_request_time<B>(
State(state): State<AppState>,
req: Request<B>,
next: Next<B>,
) -> Response {
state.last_request_time.store(
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
Ordering::Relaxed,
);
next.run(req).await
}
```
3. Background idle checker task:
```rust
tokio::spawn(async move {
let idle_timeout = Duration::from_secs(3 * 60 * 60); // 3 hours
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
let last_request = state.last_request_time.load(Ordering::Relaxed);
let now = SystemTime::now()...;
if now - last_request > idle_timeout.as_millis() as u64 {
eprintln!("Server idle for {} hours, shutting down",
idle_timeout.as_secs() / 3600);
shutdown_tx.send(()).unwrap();
break;
}
}
});
```
4. Graceful shutdown handling:
```rust
let app = create_router(state);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.with_graceful_shutdown(async {
shutdown_rx.recv().await.ok();
})
.await?;
```
5. Cleanup on shutdown:
- Orchestrator: finish current step, don't start new one
- HTTP server: stop accepting new connections, finish in-flight requests
- Log: "Shutdown complete"
**Files Modified:**
- `databuild/http_server.rs` - add idle tracking, shutdown logic
- `databuild/orchestrator.rs` - accept shutdown signal, check before each step
**Acceptance Criteria:**
- Server shuts down after configured idle timeout
- In-flight requests complete successfully during shutdown
- Shutdown is logged clearly
---
## Phase 6: Testing & Polish
**Goal:** End-to-end testing and production readiness
**Tasks:**
1. Integration tests:
```rust
#[test]
fn test_server_lifecycle() {
// Start server
let (handle, port) = start_test_server();
// Make requests
let wants = reqwest::blocking::get(
&format!("http://localhost:{}/api/wants", port)
).unwrap().json::<ListWantsResponse>().unwrap();
// Stop server
handle.shutdown();
}
```
2. Error handling improvements:
- Proper HTTP status codes (400, 404, 500)
- Structured error responses:
```json
{"error": "Want not found", "want_id": "abc123"}
```
- Add `tracing` crate for structured logging
3. Add CORS middleware for web app:
```rust
let cors = CorsLayer::new()
.allow_origin("http://localhost:3000".parse::<HeaderValue>().unwrap())
.allow_methods([Method::GET, Method::POST, Method::DELETE]);
app.layer(cors)
```
4. Health check endpoint:
```rust
async fn health() -> &'static str {
"OK"
}
```
5. Optional: Metrics endpoint (prometheus format):
```rust
async fn metrics() -> String {
format!(
"# HELP databuild_wants_total Total number of wants\n\
databuild_wants_total {}\n\
# HELP databuild_job_runs_total Total number of job runs\n\
databuild_job_runs_total {}\n",
want_count, job_run_count
)
}
```
**Files Created:**
- `databuild/tests/http_integration_test.rs` - integration tests
**Files Modified:**
- `databuild/http_server.rs` - add CORS, health, metrics, better errors
- `MODULE.bazel` - add `tracing` dependency
**Acceptance Criteria:**
- All endpoints have proper error handling
- CORS works for web app development
- Health check returns 200 OK
- Integration tests pass
---
## Future Enhancements (Not in Initial Plan)
### Workspace Auto-Discovery
- Walk up directory tree looking for `.databuild/` marker
- Store server metadata in `.databuild/server.json`:
```json
{
"pid": 12345,
"port": 54321,
"started_at": "2025-01-22T10:30:00Z",
"workspace_root": "/Users/stuart/Projects/databuild"
}
```
- CLI auto-starts server if not running
### Log Streaming (SSE)
- Implement `GET /api/job_runs/:id/logs/stdout?follow=true`
- Use Server-Sent Events for streaming
- Integrate with FileLogStore from logging.md plan
### State Caching
- Cache reconstructed BuildState for faster reads
- Invalidate cache when new events arrive
- Use `tokio::sync::RwLock<Option<(u64, BuildState)>>` where u64 is latest_event_id
### gRPC Support (If Needed)
- Add Tonic alongside Axum
- Share same orchestrator/command channel
- Useful for language-agnostic clients
---
## Dependencies Summary
New dependencies to add to `MODULE.bazel`:
```python
# Async runtime
crate.spec(package = "tokio", features = ["full"], version = "1.0")
# Web framework
crate.spec(package = "axum", version = "0.7")
crate.spec(package = "tower", version = "0.4")
crate.spec(package = "tower-http", features = ["trace", "cors"], version = "0.5")
# CLI
crate.spec(package = "clap", features = ["derive"], version = "4.0")
# HTTP client for CLI
crate.spec(package = "reqwest", features = ["blocking", "json"], version = "0.11")
# Logging
crate.spec(package = "tracing", version = "0.1")
crate.spec(package = "tracing-subscriber", version = "0.3")
```
---
## Estimated Timeline
- **Phase 1:** 2-3 hours (thread-safe BEL storage)
- **Phase 2:** 4-6 hours (HTTP server with Axum)
- **Phase 3:** 3-4 hours (basic CLI)
- **Phase 4:** 3-4 hours (orchestrator integration)
- **Phase 5:** 2-3 hours (idle shutdown)
- **Phase 6:** 4-6 hours (testing and polish)
**Total:** ~18-26 hours for complete implementation
---
## Design Rationale
### Why Event Log Separation?
**Alternatives Considered:**
1. **Shared State with RwLock**: Orchestrator holds write lock during `step()`, blocking all reads
2. **Actor Model**: Extra overhead from message passing for all operations
**Why Event Log Separation Wins:**
- Orchestrator stays completely synchronous (no refactoring)
- Reads don't block writes (eventual consistency acceptable for build system)
- Natural fit with event sourcing architecture
- Can cache reconstructed state for even better read performance
### Why Not gRPC?
- User requirement: "JSON is a must"
- REST is more debuggable (curl, browser dev tools)
- gRPC adds complexity without clear benefit
- Can add gRPC later if needed (both can coexist)
### Why Axum Over Actix?
- Better compile-time type safety (extractors)
- Cleaner middleware composition (Tower)
- Native async/await (Actix uses actor model internally)
- More ergonomic for this use case
### Why Per-Workspace Server?
- Isolation: builds in different projects don't interfere
- Simpler: no need to route requests by workspace
- Matches Bazel's model (users already understand it)
- Easier to reason about resource usage

View file

@ -0,0 +1,766 @@
# Partition Identity Refactor: Adding UUIDs for Temporal Consistency
## Problem Statement
### Current Architecture
Partitions are currently keyed only by their reference string (e.g., "data/beta"):
```rust
partitions: HashMap<String, Partition> // ref → partition
```
When a partition transitions through states (Missing → Building → Live → Tainted), **it's the same object mutating**. This creates several architectural problems:
### Core Issue: Lack of Temporal Identity
**The fundamental problem:** We cannot distinguish between "the partition being built now" and "the partition built yesterday" or "the partition that will be built tomorrow."
This manifests in several ways:
1. **Ambiguous Job-Partition Relationships**
- When job J completes, which partition instance did it build?
- If partition is rebuilt, we lose information about previous builds
- Can't answer: "What was the state of data/beta when job Y ran?"
2. **State Mutation Loss**
- Once a partition transitions Live → Tainted → Missing, the Live state information is lost
- Can't track "Partition P was built successfully by job J at time T"
- Lineage and provenance information disappears on each rebuild
3. **Redundant Data Structures** (Symptoms)
- `WantAttributedPartitions` in `JobRunDetail` exists to snapshot want-partition relationships
- Partitions carry `want_ids: Vec<String>` that get cleared/modified as partitions transition
- Jobs need to capture relationships at creation time because they can't be reliably reconstructed later
### Concrete Bug Example
The bug that led to this design discussion illustrates the problem:
```
1. Want 1 created for "data/beta" → partition becomes Building
2. Want 2 created for "data/beta" → but partition is ALREADY Building
3. Job has dep miss → creates derivative want
4. System expects all wants to be Building/UpstreamBuilding, but Want 2 is Idle → panic
```
**Root cause:** All wants reference the same mutable partition object. We can't distinguish:
- "The partition instance Want 1 triggered"
- "The partition instance Want 2 is waiting for"
- They're the same object, but semantically they represent different temporal relationships
## Proposed Solution: Partition UUIDs
### Architecture Changes
**Two-level indexing:**
```rust
// All partition instances, keyed by UUID
partitions_by_uuid: HashMap<Uuid, Partition>
// Current/canonical partition for each ref
canonical_partitions: HashMap<String, Uuid>
```
### Key Properties
1. **Immutable Identity**: Each partition build gets a unique UUID
- `Partition(uuid-1, ref="data/beta", state=Building)` is a distinct entity
- When rebuilt, create `Partition(uuid-2, ref="data/beta", state=Missing)`
- Both can coexist; uuid-1 represents historical fact, uuid-2 is current state
2. **Stable Job References**: Jobs reference the specific partition UUIDs they built
```rust
JobRunBufferEventV1 {
building_partition_uuids: [uuid-1, uuid-2] // Specific instances being built
}
```
3. **Wants Reference Refs**: Wants continue to reference partition refs, not UUIDs
```rust
WantCreateEventV1 {
partitions: ["data/beta"] // User-facing reference
}
// Want's state determined by canonical partition for "data/beta"
```
4. **Temporal Queries**: Can reconstruct state at any point
- "What was partition uuid-1's state when job J ran?" → Look up uuid-1, it's immutable
- "Which wants were waiting for data/beta at time T?" → Check canonical partition at T
- "What's the current state of data/beta?" → canonical_partitions["data/beta"] → uuid-2
## Benefits
### 1. Removes WantAttributedPartitions Redundancy
**Before:**
```rust
JobRunBufferEventV1 {
building_partitions: [PartitionRef("data/beta")],
// Redundant: snapshot want-partition relationship
servicing_wants: [WantAttributedPartitions {
want_id: "w1",
partitions: ["data/beta"]
}]
}
```
**After:**
```rust
JobRunBufferEventV1 {
building_partition_uuids: [uuid-1, uuid-2]
}
// To find serviced wants:
for uuid in job.building_partition_uuids {
let partition = partitions_by_uuid[uuid];
for want_id in partition.want_ids {
// transition want
}
}
```
The relationship is **discoverable** via stable partition UUID, not **baked-in** at event creation.
### 2. Proper State Semantics for Wants
**Current (problematic):**
```
Want 1 → triggers build → Building (owns the job somehow?)
Want 2 → sees partition Building → stays Idle (different from Want 1?)
Want 3 → same partition → also Idle
```
**With UUIDs:**
```
Partition(uuid-1, "data/beta") created as Missing
Want 1 arrives → checks canonical["data/beta"] = uuid-1 (Missing) → Idle → schedules job
Job starts → uuid-1 becomes Building, canonical still points to uuid-1
Want 2 arrives → checks canonical["data/beta"] = uuid-1 (Building) → directly to Building
Want 3 arrives → checks canonical["data/beta"] = uuid-1 (Building) → directly to Building
Want 4 arrives → checks canonical["data/beta"] = uuid-1 (Building) → directly to Building
```
All 4 wants have **identical relationship** to the canonical partition. The state reflects reality: "is the canonical partition for my ref being built?"
**Key insight:** Wants don't bind to UUIDs. They look up the canonical partition for their ref and base their state on that.
### 3. Historical Lineage
```rust
// Track partition lineage over time
Partition {
uuid: uuid-3,
partition_ref: "data/beta",
previous_uuid: Some(uuid-2), // Link to previous instance
created_at: 1234567890,
state: Live,
produced_by_job: Some("job-xyz"),
}
```
Can answer:
- "What partitions existed for this ref over time?"
- "Which job produced this specific partition instance?"
- "What was the dependency chain when this partition was built?"
## Implementation Plan
### Phase 1: Add UUID Infrastructure (Non-Breaking)
**Goals:**
- Add UUID field to Partition
- Create dual indexing (by UUID and by ref)
- Maintain backward compatibility
**Changes:**
1. **Update Partition struct** (databuild/partition_state.rs)
```rust
pub struct PartitionWithState<S> {
pub uuid: Uuid, // NEW
pub partition_ref: PartitionRef,
pub want_ids: Vec<String>,
pub state: S,
}
```
2. **Add dual indexing** (databuild/build_state.rs)
```rust
pub struct BuildState {
partitions_by_uuid: BTreeMap<Uuid, Partition>, // NEW
canonical_partitions: BTreeMap<String, Uuid>, // NEW
partitions: BTreeMap<String, Partition>, // DEPRECATED, keep for now
// ...
}
```
3. **Update partition creation**
- When creating partition (Missing state), generate UUID
- Store in both maps: `partitions_by_uuid[uuid]` and `canonical_partitions[ref] = uuid`
- Keep `partitions[ref]` updated for backward compatibility
4. **Add helper methods**
```rust
impl BuildState {
fn get_canonical_partition(&self, ref: &str) -> Option<&Partition> {
self.canonical_partitions
.get(ref)
.and_then(|uuid| self.partitions_by_uuid.get(uuid))
}
fn get_canonical_partition_uuid(&self, ref: &str) -> Option<Uuid> {
self.canonical_partitions.get(ref).copied()
}
fn get_partition_by_uuid(&self, uuid: &Uuid) -> Option<&Partition> {
self.partitions_by_uuid.get(uuid)
}
}
```
### Phase 2: Update Want State Logic
**Goals:**
- Wants determine state based on canonical partition
- Remove schedulability check for building partitions (no longer needed)
**Changes:**
1. **Update handle_want_create()** (databuild/build_state.rs)
```rust
fn handle_want_create(&mut self, event: &WantCreateEventV1) -> Vec<Event> {
// Create want in Idle state initially
let want_idle: WantWithState<IdleState> = event.clone().into();
// Check canonical partition states to determine want's actual initial state
let has_building_partitions = event.partitions.iter().any(|pref| {
matches!(
self.get_canonical_partition(&pref.r#ref),
Some(Partition::Building(_))
)
});
let want = if has_building_partitions {
// Canonical partition is Building → Want starts in Building
tracing::info!(
want_id = %event.want_id,
"Want created in Building state (canonical partition is building)"
);
Want::Building(want_idle.start_building(current_timestamp()))
} else {
// Canonical partition not Building → Want starts in Idle
tracing::info!(
want_id = %event.want_id,
"Want created in Idle state"
);
Want::Idle(want_idle)
};
self.wants.insert(event.want_id.clone(), want);
// Register want with partitions
for pref in &event.partitions {
self.add_want_to_partition(pref, &event.want_id);
}
// Handle derivative wants if applicable
if let Some(source) = &event.source {
if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source {
self.handle_derivative_want_creation(
&event.want_id,
&event.partitions,
&job_triggered.job_run_id,
);
}
}
vec![]
}
```
2. **Simplify WantSchedulability** (databuild/build_state.rs)
```rust
// Remove `building` field from WantUpstreamStatus
pub struct WantUpstreamStatus {
pub live: Vec<LivePartitionRef>,
pub tainted: Vec<TaintedPartitionRef>,
pub missing: Vec<MissingPartitionRef>,
// REMOVED: pub building: Vec<BuildingPartitionRef>,
}
impl WantSchedulability {
pub fn is_schedulable(&self) -> bool {
// Simplified: only check upstreams
// Building partitions now handled at want creation
self.status.missing.is_empty() && self.status.tainted.is_empty()
}
}
```
3. **Update derivative want handling** (databuild/build_state.rs)
```rust
fn handle_derivative_want_creation(...) {
// ...existing logic...
for want_id in impacted_want_ids {
let want = self.wants.remove(&want_id).expect(...);
let transitioned = match want {
// Idle wants can exist if they arrived after job started but before dep miss
Want::Idle(idle) => {
tracing::info!(
want_id = %want_id,
derivative_want_id = %derivative_want_id,
"Want: Idle → UpstreamBuilding (partition dep miss detected)"
);
Want::UpstreamBuilding(
idle.detect_missing_deps(vec![derivative_want_id.to_string()])
)
}
Want::Building(building) => {
// Building → UpstreamBuilding
// ... existing logic ...
}
Want::UpstreamBuilding(upstream) => {
// UpstreamBuilding → UpstreamBuilding (add another upstream)
// ... existing logic ...
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?}. Should be Idle, Building, or UpstreamBuilding.",
want_id, want
);
}
};
self.wants.insert(want_id, transitioned);
}
}
```
4. **Add Idle → UpstreamBuilding transition** (databuild/want_state.rs)
```rust
impl WantWithState<IdleState> {
// ... existing methods ...
/// Transition from Idle to UpstreamBuilding when dependencies are missing
/// This can happen if want arrives while partition is building, then job has dep miss
pub fn detect_missing_deps(
self,
upstream_want_ids: Vec<String>,
) -> WantWithState<UpstreamBuildingState> {
WantWithState {
want: self.want.updated_timestamp(),
state: UpstreamBuildingState { upstream_want_ids },
}
}
}
```
### Phase 3: Update Job Events
**Goals:**
- Jobs reference partition UUIDs, not just refs
- Remove WantAttributedPartitions redundancy
**Changes:**
1. **Update JobRunBufferEventV1** (databuild/databuild.proto)
```protobuf
message JobRunBufferEventV1 {
string job_run_id = 1;
string job_label = 2;
repeated string building_partition_uuids = 3; // NEW: UUIDs instead of refs
repeated PartitionRef building_partitions = 4; // DEPRECATED: keep for migration
repeated WantAttributedPartitions servicing_wants = 5; // DEPRECATED: remove later
}
```
2. **Update handle_job_run_buffer()** (databuild/build_state.rs)
```rust
fn handle_job_run_buffer(&mut self, event: &JobRunBufferEventV1) -> Vec<Event> {
// Parse UUIDs from event
let building_uuids: Vec<Uuid> = event.building_partition_uuids
.iter()
.map(|s| Uuid::parse_str(s).expect("Valid UUID"))
.collect();
// Find all wants for these partition UUIDs
let mut impacted_want_ids: HashSet<String> = HashSet::new();
for uuid in &building_uuids {
if let Some(partition) = self.partitions_by_uuid.get(uuid) {
for want_id in partition.want_ids() {
impacted_want_ids.insert(want_id.clone());
}
}
}
// Transition wants to Building
for want_id in impacted_want_ids {
let want = self.wants.remove(&want_id).expect("Want must exist");
let transitioned = match want {
Want::Idle(idle) => Want::Building(idle.start_building(current_timestamp())),
Want::Building(building) => Want::Building(building), // Already building
_ => panic!("Invalid state for job buffer: {:?}", want),
};
self.wants.insert(want_id, transitioned);
}
// Transition partitions to Building by UUID
for uuid in building_uuids {
if let Some(partition) = self.partitions_by_uuid.remove(&uuid) {
let building = match partition {
Partition::Missing(missing) => {
Partition::Building(missing.start_building(event.job_run_id.clone()))
}
_ => panic!("Partition {:?} not in Missing state", uuid),
};
self.partitions_by_uuid.insert(uuid, building);
}
}
// Create job run
let queued: JobRunWithState<JobQueuedState> = event.clone().into();
self.job_runs.insert(event.job_run_id.clone(), JobRun::Queued(queued));
vec![]
}
```
3. **Update Orchestrator** (databuild/orchestrator.rs)
```rust
fn queue_job(&mut self, wg: WantGroup) -> Result<(), DatabuildError> {
// Get partition refs from wants
let wanted_refs: Vec<PartitionRef> = wg.wants
.iter()
.flat_map(|want| want.partitions.clone())
.collect();
// Resolve refs to canonical UUIDs
let building_partition_uuids: Vec<String> = wanted_refs
.iter()
.filter_map(|pref| {
self.bel.state.get_canonical_partition_uuid(&pref.r#ref)
.map(|uuid| uuid.to_string())
})
.collect();
let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 {
job_run_id: job_run_id.to_string(),
job_label: wg.job.label,
building_partition_uuids, // Use canonical UUIDs
building_partitions: vec![], // Deprecated
servicing_wants: vec![], // Deprecated
});
self.append_and_broadcast(&job_buffer_event)?;
self.job_runs.push(job_run);
Ok(())
}
```
### Phase 4: Partition Lifecycle Management
**Goals:**
- Define when new partition UUIDs are created
- Handle canonical partition transitions
- Implement cleanup/GC
**Canonical Partition Transitions:**
New partition UUID created when:
1. **First build**: Partition doesn't exist → create Partition(uuid, Missing)
2. **Taint**: Partition tainted → create new Partition(uuid-new, Missing), update canonical
3. **Expiration**: TTL exceeded → create new Partition(uuid-new, Missing), update canonical
4. **Manual rebuild**: Explicit rebuild request → create new Partition(uuid-new, Missing), update canonical
**Implementation:**
```rust
impl BuildState {
/// Create a new partition instance for a ref, updating canonical pointer
fn create_new_partition_instance(&mut self, partition_ref: &PartitionRef) -> Uuid {
let new_uuid = Uuid::new_v4();
let new_partition = Partition::new_missing_with_uuid(
new_uuid,
partition_ref.clone()
);
// Update canonical pointer (old UUID becomes historical)
self.canonical_partitions.insert(
partition_ref.r#ref.clone(),
new_uuid
);
// Store new partition
self.partitions_by_uuid.insert(new_uuid, new_partition);
// Old partition remains in partitions_by_uuid for historical queries
new_uuid
}
/// Handle partition taint - creates new instance
fn taint_partition(&mut self, partition_ref: &str) -> Uuid {
// Mark current partition as Tainted
if let Some(current_uuid) = self.canonical_partitions.get(partition_ref) {
if let Some(partition) = self.partitions_by_uuid.get_mut(current_uuid) {
// Transition to Tainted state (keep UUID)
*partition = match partition {
Partition::Live(live) => {
Partition::Tainted(live.clone().mark_tainted())
}
_ => panic!("Can only taint Live partitions"),
};
}
}
// Create new partition instance for rebuilding
self.create_new_partition_instance(&PartitionRef {
r#ref: partition_ref.to_string()
})
}
}
```
**GC Strategy:**
Time-based retention (recommended):
- Keep partition UUIDs for N days (default 30)
- Enables historical queries within retention window
- Predictable storage growth
```rust
impl BuildState {
/// Remove partition UUIDs older than retention window
fn gc_old_partitions(&mut self, retention_days: u64) {
let cutoff = current_timestamp() - (retention_days * 86400 * 1_000_000_000);
// Find UUIDs to remove (not canonical + older than cutoff)
let canonical_uuids: HashSet<Uuid> = self.canonical_partitions
.values()
.copied()
.collect();
let to_remove: Vec<Uuid> = self.partitions_by_uuid
.iter()
.filter_map(|(uuid, partition)| {
if !canonical_uuids.contains(uuid) && partition.created_at() < cutoff {
Some(*uuid)
} else {
None
}
})
.collect();
for uuid in to_remove {
self.partitions_by_uuid.remove(&uuid);
}
}
}
```
### Phase 5: Migration and Cleanup
**Goals:**
- Remove deprecated fields
- Update API responses
- Complete migration
**Changes:**
1. **Remove deprecated fields from protobuf**
- `building_partitions` from `JobRunBufferEventV1`
- `servicing_wants` from `JobRunBufferEventV1`
- `WantAttributedPartitions` message
2. **Remove backward compatibility code**
- `partitions: BTreeMap<String, Partition>` from `BuildState`
- Dual writes/reads
3. **Update API responses** to include UUIDs where relevant
- JobRunDetail can include partition UUIDs built
- PartitionDetail can include UUID for debugging
4. **Update tests** to use UUID-based assertions
## Design Decisions & Trade-offs
### 1. Wants Reference Refs, Not UUIDs
**Decision:** Wants always reference partition refs (e.g., "data/beta"), not UUIDs.
**Rationale:**
- User requests "data/beta" - the current/canonical partition for that ref
- Want state is based on canonical partition: "is the current partition for my ref being built?"
- If partition gets tainted/rebuilt, wants see the new canonical partition automatically
- Simpler mental model: want doesn't care about historical instances
**How it works:**
```rust
// Want creation
want.partitions = ["data/beta"] // ref, not UUID
// Want state determination
let canonical_uuid = canonical_partitions["data/beta"];
let partition = partitions_by_uuid[canonical_uuid];
match partition.state {
Building => want.state = Building,
Live => want can complete,
...
}
```
### 2. Jobs Reference UUIDs, Not Refs
**Decision:** Jobs reference the specific partition UUIDs they built.
**Rationale:**
- Jobs build specific partition instances
- Historical record: "Job J built Partition(uuid-1)"
- Even if partition is later tainted/rebuilt, job's record is immutable
- Enables provenance: "Which job built this specific partition?"
**How it works:**
```rust
JobRunBufferEventV1 {
building_partition_uuids: [uuid-1, uuid-2] // Specific instances
}
```
### 3. UUID Generation: When?
**Decision:** Generate UUID during event processing (in handle_want_create, when partition created).
**Rationale:**
- Events remain deterministic
- UUID generation during replay works correctly
- Maintains event sourcing principles
**Not in the event itself:** Would require client-side UUID generation, breaks deterministic replay.
### 4. Canonical Partition: One at a Time
**Decision:** Only one canonical partition per ref at a time.
**Scenario handling:**
- Partition(uuid-1, "data/beta") is Building
- User requests rebuild → new want sees uuid-1 is Building → want becomes Building
- Want waits for uuid-1 to complete
- If uuid-1 completes successfully → want completes
- If uuid-1 fails or is tainted → new partition instance created (uuid-2), canonical updated
**Alternative considered:** Multiple concurrent builds with versioning
- Significantly more complex
- Defer to future work
### 5. Event Format: UUID as String
**Decision:** Store UUIDs as strings in protobuf events.
**Rationale:**
- Human-readable in logs/debugging
- Standard UUID string format (36 chars)
- Protobuf has no native UUID type
**Trade-off:** Larger event size (36 bytes vs 16 bytes) - acceptable for debuggability.
## Testing Strategy
### Unit Tests
1. **Partition UUID uniqueness**
- Creating partitions generates unique UUIDs
- Same ref at different times gets different UUIDs
2. **Canonical partition tracking**
- canonical_partitions always points to current instance
- Old instances remain in partitions_by_uuid
3. **Want state determination**
- Want checks canonical partition state
- Multiple wants see same canonical partition
### Integration Tests
1. **Multi-want scenario** (reproduces original bug)
- Want 1 created → partition Missing → Idle
- Job scheduled → partition Building (uuid-1)
- Wants 2-4 created → see partition Building → directly to Building
- All 4 wants reference same canonical partition uuid-1
- Job dep miss → all transition to UpstreamBuilding correctly
2. **Rebuild scenario**
- Partition built → Live (uuid-1)
- Partition tainted → new instance created (uuid-2), canonical updated
- New wants reference uuid-2
- Old partition uuid-1 still queryable for history
### End-to-End Tests
1. **Full lifecycle**
- Want created → canonical partition determined
- Job runs → partition transitions through states
- Want completes → partition remains in history
- Partition expires → new UUID for rebuild, canonical updated
## Future Work
### 1. Partition Lineage Graph
Build explicit lineage tracking:
```rust
Partition {
uuid: uuid-3,
partition_ref: "data/beta",
previous_uuid: Some(uuid-2),
derived_from: vec![uuid-4, uuid-5], // Upstream dependencies
}
```
Enables:
- "What was the full dependency graph when this partition was built?"
- "How did data propagate through the system over time?"
### 2. Partition Provenance
Track complete build history:
```rust
Partition {
uuid: uuid-1,
provenance: Provenance {
built_by_job: "job-123",
source_code_version: "abc123",
build_timestamp: 1234567890,
input_partitions: vec![uuid-2, uuid-3],
}
}
```
### 3. Multi-Generation Partitions
Support concurrent builds of different generations:
```rust
canonical_partitions: HashMap<String, Vec<(Generation, Uuid)>>
// "data/beta" → [(v1, uuid-1), (v2, uuid-2)]
```
Users can request specific generations or "latest."
## Summary
Adding partition UUIDs solves fundamental architectural problems:
- **Temporal identity**: Distinguish partition instances over time
- **Stable job references**: Jobs reference immutable partition UUIDs they built
- **Wants reference refs**: Want state based on canonical partition for their ref
- **Discoverable relationships**: Remove redundant snapshot data (WantAttributedPartitions)
- **Proper semantics**: Want state reflects actual canonical partition state
- **Historical queries**: Can query past partition states via UUID
**Key principle:** Wants care about "what's the current state of data/beta?" (refs), while jobs and historical queries care about "what happened to this specific partition instance?" (UUIDs).
This refactor enables cleaner code, better observability, and proper event sourcing semantics throughout the system.