api phase 2 complete

This commit is contained in:
Stuart Axelbrooke 2025-11-22 21:24:12 +08:00
parent 2084fadbb6
commit 6f7c6b3318
9 changed files with 1980 additions and 49 deletions

View file

@ -65,6 +65,32 @@ crate.spec(
package = "regex",
version = "1.10",
)
crate.spec(
features = ["full"],
package = "tokio",
version = "1.0",
)
crate.spec(
package = "axum",
version = "0.7",
)
crate.spec(
package = "tower",
version = "0.4",
)
crate.spec(
features = ["trace", "cors"],
package = "tower-http",
version = "0.5",
)
crate.spec(
package = "tracing",
version = "0.1",
)
crate.spec(
package = "tracing-subscriber",
version = "0.3",
)
crate.from_specs()
use_repo(crate, "crates")

File diff suppressed because one or more lines are too long

View file

@ -25,6 +25,7 @@ rust_library(
edition = "2021",
visibility = ["//visibility:public"],
deps = [
"@crates//:axum",
"@crates//:prost",
"@crates//:prost-types",
"@crates//:regex",
@ -32,6 +33,9 @@ rust_library(
"@crates//:schemars",
"@crates//:serde",
"@crates//:serde_json",
"@crates//:tokio",
"@crates//:tower",
"@crates//:tower-http",
"@crates//:uuid",
],
)
@ -43,6 +47,21 @@ rust_test(
env = {"RUST_BACKTRACE": "1"},
)
# DataBuild HTTP server binary
rust_binary(
name = "databuild_server",
srcs = ["server_main.rs"],
edition = "2021",
visibility = ["//visibility:public"],
deps = [
":databuild",
"@crates//:axum",
"@crates//:tokio",
"@crates//:tracing",
"@crates//:tracing-subscriber",
],
)
# Legacy filegroup for backwards compatibility
filegroup(
name = "proto",

View file

@ -11,15 +11,18 @@ use crate::{
use prost::Message;
use rusqlite::Connection;
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
pub trait BELStorage {
pub trait BELStorage: Send + Sync {
fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError>;
fn list_events(
&self,
since_idx: u64,
limit: u64,
) -> Result<Vec<DataBuildEvent>, DatabuildError>;
fn get_event(&self, event_id: u64) -> Result<Option<DataBuildEvent>, DatabuildError>;
fn latest_event_id(&self) -> Result<u64, DatabuildError>;
}
#[derive(Debug, Clone)]
@ -64,15 +67,27 @@ impl BELStorage for MemoryBELStorage {
.take(limit as usize)
.collect())
}
fn get_event(&self, event_id: u64) -> Result<Option<DataBuildEvent>, DatabuildError> {
Ok(self
.events
.iter()
.find(|e| e.event_id == event_id)
.cloned())
}
#[derive(Debug)]
struct SqliteBELStorage {
connection: Connection,
fn latest_event_id(&self) -> Result<u64, DatabuildError> {
Ok(self.events.len().saturating_sub(1) as u64)
}
}
#[derive(Debug, Clone)]
pub struct SqliteBELStorage {
connection: Arc<Mutex<Connection>>,
}
impl SqliteBELStorage {
fn create(database_url: &str) -> Result<SqliteBELStorage, DatabuildError> {
pub fn create(database_url: &str) -> Result<SqliteBELStorage, DatabuildError> {
let connection = Connection::open(database_url)?;
// Create the events table
@ -85,7 +100,9 @@ impl SqliteBELStorage {
(),
)?;
Ok(SqliteBELStorage { connection })
Ok(SqliteBELStorage {
connection: Arc::new(Mutex::new(connection)),
})
}
}
@ -105,12 +122,17 @@ impl BELStorage for SqliteBELStorage {
let mut buf = Vec::new();
prost::Message::encode(&dbe, &mut buf)?;
self.connection.execute(
let connection = self
.connection
.lock()
.map_err(|e| format!("Failed to acquire lock: {}", e))?;
connection.execute(
"INSERT INTO events (timestamp, event_data) VALUES (?1, ?2)",
(&timestamp, &buf),
)?;
let event_id = self.connection.last_insert_rowid() as u64;
let event_id = connection.last_insert_rowid() as u64;
Ok(event_id)
}
@ -119,7 +141,12 @@ impl BELStorage for SqliteBELStorage {
since_idx: u64,
limit: u64,
) -> Result<Vec<DataBuildEvent>, DatabuildError> {
let mut stmt = self.connection.prepare(
let connection = self
.connection
.lock()
.map_err(|e| format!("Failed to acquire lock: {}", e))?;
let mut stmt = connection.prepare(
"SELECT event_id, timestamp, event_data FROM events
WHERE timestamp > ?1
ORDER BY event_id
@ -156,6 +183,60 @@ impl BELStorage for SqliteBELStorage {
Ok(events)
}
fn get_event(&self, event_id: u64) -> Result<Option<DataBuildEvent>, DatabuildError> {
let connection = self
.connection
.lock()
.map_err(|e| format!("Failed to acquire lock: {}", e))?;
let mut stmt = connection.prepare(
"SELECT event_id, timestamp, event_data FROM events WHERE event_id = ?1",
)?;
let result = stmt.query_row([event_id], |row| {
let event_id: u64 = row.get(0)?;
let timestamp: u64 = row.get(1)?;
let event_data: Vec<u8> = row.get(2)?;
// Deserialize the event using prost
let mut dbe = DataBuildEvent::decode(event_data.as_slice()).map_err(|_e| {
rusqlite::Error::InvalidColumnType(
0,
"event_data".to_string(),
rusqlite::types::Type::Blob,
)
})?;
// Update the event_id from the database
dbe.event_id = event_id;
dbe.timestamp = timestamp;
Ok(dbe)
});
match result {
Ok(event) => Ok(Some(event)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
fn latest_event_id(&self) -> Result<u64, DatabuildError> {
let connection = self
.connection
.lock()
.map_err(|e| format!("Failed to acquire lock: {}", e))?;
let result: Result<u64, rusqlite::Error> =
connection.query_row("SELECT MAX(event_id) FROM events", [], |row| row.get(0));
match result {
Ok(id) => Ok(id),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0),
Err(e) => Err(e.into()),
}
}
}
#[derive(Debug, Default)]

View file

@ -65,6 +65,20 @@ impl Default for BuildState {
}
impl BuildState {
/// Reconstruct BuildState from a sequence of events (for read path in web server)
/// This allows the web server to rebuild state from BEL storage without holding a lock
pub fn from_events(events: &[crate::DataBuildEvent]) -> Self {
let mut state = BuildState::default();
for event in events {
if let Some(ref inner_event) = event.event {
// handle_event returns Vec<Event> for cascading events, but we ignore them
// since we're replaying from a complete event log
let _ = state.handle_event(inner_event);
}
}
state
}
pub fn count_job_runs(&self) -> usize {
self.job_runs.len()
}

200
databuild/http_server.rs Normal file
View file

@ -0,0 +1,200 @@
use crate::build_event_log::BELStorage;
use crate::build_state::BuildState;
use crate::{
CreateWantRequest, CreateWantResponse, GetWantRequest, GetWantResponse, ListJobRunsRequest,
ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListWantsRequest,
ListWantsResponse,
};
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
routing::{delete, get, post},
Json, Router,
};
use std::sync::{atomic::AtomicU64, Arc};
/// Shared application state for HTTP handlers
#[derive(Clone)]
pub struct AppState {
/// Shared read-only access to BEL storage (for reconstructing state)
pub bel_storage: Arc<dyn BELStorage>,
/// For idle timeout tracking (Phase 5)
pub last_request_time: Arc<AtomicU64>,
}
impl AppState {
pub fn new(bel_storage: Arc<dyn BELStorage>) -> Self {
Self {
bel_storage,
last_request_time: Arc::new(AtomicU64::new(0)),
}
}
}
/// Create the Axum router with all endpoints
pub fn create_router(state: AppState) -> Router {
Router::new()
// Health check
.route("/health", get(health))
// Want endpoints
.route("/api/wants", get(list_wants))
.route("/api/wants", post(create_want))
.route("/api/wants/:id", get(get_want))
.route("/api/wants/:id", delete(cancel_want))
// Partition endpoints
.route("/api/partitions", get(list_partitions))
// Job run endpoints
.route("/api/job_runs", get(list_job_runs))
.with_state(state)
}
// ============================================================================
// Handlers
// ============================================================================
/// Health check endpoint
async fn health() -> impl IntoResponse {
(StatusCode::OK, "OK")
}
/// List all wants
async fn list_wants(
State(state): State<AppState>,
Query(params): Query<ListWantsRequest>,
) -> impl IntoResponse {
// Read all events from storage
let events = match state.bel_storage.list_events(0, 100000) {
Ok(events) => events,
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": format!("Failed to read events: {}", e)
})),
)
.into_response();
}
};
// Reconstruct state from events
let build_state = BuildState::from_events(&events);
// Use existing API method
let response = build_state.list_wants(&params);
(StatusCode::OK, Json(response)).into_response()
}
/// Get a specific want by ID
async fn get_want(
State(state): State<AppState>,
Path(want_id): Path<String>,
) -> impl IntoResponse {
let events = match state.bel_storage.list_events(0, 100000) {
Ok(events) => events,
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": format!("Failed to read events: {}", e)
})),
)
.into_response();
}
};
let build_state = BuildState::from_events(&events);
let req = GetWantRequest { want_id };
let response = build_state.get_want(&req.want_id);
match response {
Some(want) => (StatusCode::OK, Json(GetWantResponse { data: Some(want) })).into_response(),
None => (
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": "Want not found"})),
)
.into_response(),
}
}
/// Create a new want (stub for now - Phase 4 will implement actual command sending)
async fn create_want(
State(_state): State<AppState>,
Json(req): Json<CreateWantRequest>,
) -> impl IntoResponse {
// TODO Phase 4: Send command to orchestrator via channel
// For now, return a stub response
(
StatusCode::NOT_IMPLEMENTED,
Json(serde_json::json!({
"error": "Create want not yet implemented (Phase 4)",
"requested_partitions": req.partitions,
})),
)
}
/// Cancel a want (stub for now - Phase 4 will implement actual command sending)
async fn cancel_want(
State(_state): State<AppState>,
Path(want_id): Path<String>,
) -> impl IntoResponse {
// TODO Phase 4: Send command to orchestrator via channel
// For now, return a stub response
(
StatusCode::NOT_IMPLEMENTED,
Json(serde_json::json!({
"error": "Cancel want not yet implemented (Phase 4)",
"want_id": want_id,
})),
)
}
/// List all partitions
async fn list_partitions(
State(state): State<AppState>,
Query(params): Query<ListPartitionsRequest>,
) -> impl IntoResponse {
let events = match state.bel_storage.list_events(0, 100000) {
Ok(events) => events,
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": format!("Failed to read events: {}", e)
})),
)
.into_response();
}
};
let build_state = BuildState::from_events(&events);
let response = build_state.list_partitions(&params);
(StatusCode::OK, Json(response)).into_response()
}
/// List all job runs
async fn list_job_runs(
State(state): State<AppState>,
Query(params): Query<ListJobRunsRequest>,
) -> impl IntoResponse {
let events = match state.bel_storage.list_events(0, 100000) {
Ok(events) => events,
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": format!("Failed to read events: {}", e)
})),
)
.into_response();
}
};
let build_state = BuildState::from_events(&events);
let response = build_state.list_job_runs(&params);
(StatusCode::OK, Json(response)).into_response()
}

View file

@ -1,7 +1,8 @@
mod build_event_log;
pub mod build_event_log;
mod build_state;
mod data_deps;
mod event_transforms;
pub mod http_server;
mod job;
mod job_run;
mod job_run_state;

37
databuild/server_main.rs Normal file
View file

@ -0,0 +1,37 @@
use databuild::build_event_log::SqliteBELStorage;
use databuild::http_server::{create_router, AppState};
use std::sync::Arc;
#[tokio::main]
async fn main() {
// Initialize logging
tracing_subscriber::fmt::init();
// Create SQLite BEL storage (in-memory for now)
let bel_storage = SqliteBELStorage::create(":memory:")
.expect("Failed to create BEL storage");
// Create app state with shared storage
let state = AppState::new(Arc::new(bel_storage));
// Create router
let app = create_router(state);
// Bind to port 3000
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
.await
.expect("Failed to bind to port 3000");
println!("DataBuild server listening on http://127.0.0.1:3000");
println!(" GET /health");
println!(" GET /api/wants");
println!(" POST /api/wants");
println!(" GET /api/wants/:id");
println!(" GET /api/partitions");
println!(" GET /api/job_runs");
// Run the server
axum::serve(listener, app)
.await
.expect("Server error");
}

577
docs/plans/api.md Normal file
View file

@ -0,0 +1,577 @@
# Web Server Implementation Plan
## Architecture Summary
**Concurrency Model: Event Log Separation**
- Orchestrator runs synchronously in dedicated thread, owns BEL exclusively
- Web server reads from shared BEL storage, sends write commands via channel
- No locks on hot path, orchestrator stays single-threaded
- Eventual consistency for reads (acceptable since builds take time anyway)
**Daemon Model:**
- Server binary started manually from workspace root (for now)
- Server tracks last request time, shuts down after idle timeout (default: 3 hours)
- HTTP REST on localhost random port
- Future: CLI can auto-discover/start server
---
## Thread Model
```
Main Process
├─ HTTP Server (tokio multi-threaded runtime)
│ ├─ Request handlers (async, read from BEL storage)
│ └─ Command sender (send writes to orchestrator)
└─ Orchestrator Thread (std::thread, synchronous)
├─ Receives commands via mpsc channel
├─ Owns BEL (exclusive mutable access)
└─ Runs existing step() loop
```
**Read Path (Low Latency):**
1. HTTP request → Axum handler
2. Read events from shared BEL storage (no lock contention)
3. Reconstruct BuildState from events (can cache this)
4. Return response
**Write Path (Strong Consistency):**
1. HTTP request → Axum handler
2. Send command via channel to orchestrator
3. Orchestrator processes command in its thread
4. Reply sent back via oneshot channel
5. Return response
**Why This Works:**
- Orchestrator remains completely synchronous (no refactoring needed)
- Reads scale horizontally (multiple handlers, no locks)
- Writes are serialized through orchestrator (consistent with current model)
- Event sourcing means reads can be eventually consistent
---
## Phase 1: Foundation - Make BEL Storage Thread-Safe
**Goal:** Allow BEL storage to be safely shared between orchestrator and web server
**Tasks:**
1. Add `Send + Sync` bounds to `BELStorage` trait
2. Wrap `SqliteBELStorage::connection` in `Arc<Mutex<Connection>>` or use r2d2 pool
3. Add read-only methods to BELStorage:
- `list_events(offset: usize, limit: usize) -> Vec<DataBuildEvent>`
- `get_event(event_id: u64) -> Option<DataBuildEvent>`
- `latest_event_id() -> u64`
4. Add builder method to reconstruct BuildState from events:
- `BuildState::from_events(events: &[DataBuildEvent]) -> Self`
**Files Modified:**
- `databuild/build_event_log.rs` - update trait and storage impls
- `databuild/build_state.rs` - add `from_events()` builder
**Acceptance Criteria:**
- `BELStorage` trait has `Send + Sync` bounds
- Can clone `Arc<SqliteBELStorage>` and use from multiple threads
- Can reconstruct BuildState from events without mutating storage
---
## Phase 2: Web Server - HTTP API with Axum
**Goal:** HTTP server serving read/write APIs
**Tasks:**
1. Add dependencies to MODULE.bazel:
```python
crate.spec(package = "tokio", features = ["full"], version = "1.0")
crate.spec(package = "axum", version = "0.7")
crate.spec(package = "tower", version = "0.4")
crate.spec(package = "tower-http", features = ["trace", "cors"], version = "0.5")
```
2. Create `databuild/http_server.rs` module with:
- `AppState` struct holding:
- `bel_storage: Arc<dyn BELStorage>` - shared read access
- `command_tx: mpsc::Sender<Command>` - channel to orchestrator
- `last_request_time: Arc<AtomicU64>` - for idle tracking
- Axum router with all endpoints
- Handler functions delegating to existing `api_handle_*` methods
3. API Endpoints:
```
GET /health → health check
GET /api/wants → list_wants
POST /api/wants → create_want
GET /api/wants/:id → get_want
DELETE /api/wants/:id → cancel_want
GET /api/partitions → list_partitions
GET /api/job_runs → list_job_runs
GET /api/job_runs/:id/logs/stdout → stream_logs (stub)
```
4. Handler pattern (reads):
```rust
async fn list_wants(
State(state): State<AppState>,
Query(params): Query<ListWantsParams>,
) -> Json<ListWantsResponse> {
// Read events from storage
let events = state.bel_storage.list_events(0, 10000)?;
// Reconstruct state
let build_state = BuildState::from_events(&events);
// Use existing API method
Json(build_state.list_wants(&params.into()))
}
```
5. Handler pattern (writes):
```rust
async fn create_want(
State(state): State<AppState>,
Json(req): Json<CreateWantRequest>,
) -> Json<CreateWantResponse> {
// Send command to orchestrator
let (reply_tx, reply_rx) = oneshot::channel();
state.command_tx.send(Command::CreateWant(req, reply_tx)).await?;
// Wait for orchestrator reply
let response = reply_rx.await?;
Json(response)
}
```
**Files Created:**
- `databuild/http_server.rs` - new module
**Files Modified:**
- `databuild/lib.rs` - add `pub mod http_server;`
- `MODULE.bazel` - add dependencies
**Acceptance Criteria:**
- Server starts on localhost random port, prints "Listening on http://127.0.0.1:XXXXX"
- All read endpoints return correct JSON responses
- Write endpoints return stub responses (Phase 4 will connect to orchestrator)
---
## Phase 3: CLI - HTTP Client
**Goal:** CLI that sends HTTP requests to running server
**Tasks:**
1. Add dependencies to MODULE.bazel:
```python
crate.spec(package = "clap", features = ["derive"], version = "4.0")
crate.spec(package = "reqwest", features = ["blocking", "json"], version = "0.11")
```
2. Create `databuild/bin/databuild.rs` main binary:
```rust
#[derive(Parser)]
#[command(name = "databuild")]
enum Cli {
/// Start the databuild server
Serve(ServeArgs),
/// Create a want for partitions
Build(BuildArgs),
/// Want operations
Want(WantCommand),
/// Stream job run logs
Logs(LogsArgs),
}
#[derive(Args)]
struct ServeArgs {
#[arg(long, default_value = "8080")]
port: u16,
}
#[derive(Subcommand)]
enum WantCommand {
Create(CreateWantArgs),
List,
Get { want_id: String },
Cancel { want_id: String },
}
```
3. Server address discovery:
- For now: hardcode `http://localhost:8080` or accept `--server-url` flag
- Future: read from `.databuild/server.json` file
4. HTTP client implementation:
```rust
fn list_wants(server_url: &str) -> Result<Vec<WantDetail>> {
let client = reqwest::blocking::Client::new();
let resp = client.get(&format!("{}/api/wants", server_url))
.send()?
.json::<ListWantsResponse>()?;
Ok(resp.data)
}
```
5. Commands:
- `databuild serve --port 8080` - Start server (blocks)
- `databuild build part1 part2` - Create want for partitions
- `databuild want list` - List all wants
- `databuild want get <id>` - Get specific want
- `databuild want cancel <id>` - Cancel want
- `databuild logs <job_run_id>` - Stream logs (stub)
**Files Created:**
- `databuild/bin/databuild.rs` - new CLI binary
**Files Modified:**
- `databuild/BUILD.bazel` - add `rust_binary` target for databuild CLI
**Acceptance Criteria:**
- Can run `databuild serve` to start server
- Can run `databuild want list` in another terminal and see wants
- Commands print pretty JSON or formatted tables
---
## Phase 4: Orchestrator Integration - Command Channel
**Goal:** Connect orchestrator to web server via message passing
**Tasks:**
1. Create `databuild/commands.rs` with command enum:
```rust
pub enum Command {
CreateWant(CreateWantRequest, oneshot::Sender<CreateWantResponse>),
CancelWant(CancelWantRequest, oneshot::Sender<CancelWantResponse>),
// Only write operations need commands
}
```
2. Update `Orchestrator`:
- Add `command_rx: mpsc::Receiver<Command>` field
- In `step()` method, before polling:
```rust
// Process all pending commands
while let Ok(cmd) = self.command_rx.try_recv() {
match cmd {
Command::CreateWant(req, reply) => {
let resp = self.bel.api_handle_want_create(req);
let _ = reply.send(resp); // Ignore send errors
}
// ... other commands
}
}
```
3. Create server startup function in `http_server.rs`:
```rust
pub fn start_server(
bel_storage: Arc<dyn BELStorage>,
port: u16,
) -> (JoinHandle<()>, mpsc::Sender<Command>) {
let (cmd_tx, cmd_rx) = mpsc::channel(100);
// Spawn orchestrator in background thread
let orch_bel = bel_storage.clone();
let orch_handle = std::thread::spawn(move || {
let mut orch = Orchestrator::new_with_commands(orch_bel, cmd_rx);
orch.join().unwrap();
});
// Start HTTP server in tokio runtime
let runtime = tokio::runtime::Runtime::new().unwrap();
let http_handle = runtime.spawn(async move {
let app_state = AppState {
bel_storage,
command_tx: cmd_tx.clone(),
last_request_time: Arc::new(AtomicU64::new(0)),
};
let app = create_router(app_state);
let addr = SocketAddr::from(([127, 0, 0, 1], port));
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
});
(http_handle, cmd_tx)
}
```
4. Update `databuild serve` command to use `start_server()`
**Files Created:**
- `databuild/commands.rs` - new module
**Files Modified:**
- `databuild/orchestrator.rs` - accept command channel, process in `step()`
- `databuild/http_server.rs` - send commands for writes
- `databuild/bin/databuild.rs` - use `start_server()` in `serve` command
**Acceptance Criteria:**
- Creating a want via HTTP actually creates it in BuildState
- Orchestrator processes commands without blocking its main loop
- Can observe wants being scheduled into job runs
---
## Phase 5: Daemon Lifecycle - Auto-Shutdown
**Goal:** Server shuts down gracefully after idle timeout
**Tasks:**
1. Update AppState to track last request time:
```rust
pub struct AppState {
bel_storage: Arc<dyn BELStorage>,
command_tx: mpsc::Sender<Command>,
last_request_time: Arc<AtomicU64>, // epoch millis
shutdown_tx: broadcast::Sender<()>,
}
```
2. Add Tower middleware to update timestamp:
```rust
async fn update_last_request_time<B>(
State(state): State<AppState>,
req: Request<B>,
next: Next<B>,
) -> Response {
state.last_request_time.store(
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
Ordering::Relaxed,
);
next.run(req).await
}
```
3. Background idle checker task:
```rust
tokio::spawn(async move {
let idle_timeout = Duration::from_secs(3 * 60 * 60); // 3 hours
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
let last_request = state.last_request_time.load(Ordering::Relaxed);
let now = SystemTime::now()...;
if now - last_request > idle_timeout.as_millis() as u64 {
eprintln!("Server idle for {} hours, shutting down",
idle_timeout.as_secs() / 3600);
shutdown_tx.send(()).unwrap();
break;
}
}
});
```
4. Graceful shutdown handling:
```rust
let app = create_router(state);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.with_graceful_shutdown(async {
shutdown_rx.recv().await.ok();
})
.await?;
```
5. Cleanup on shutdown:
- Orchestrator: finish current step, don't start new one
- HTTP server: stop accepting new connections, finish in-flight requests
- Log: "Shutdown complete"
**Files Modified:**
- `databuild/http_server.rs` - add idle tracking, shutdown logic
- `databuild/orchestrator.rs` - accept shutdown signal, check before each step
**Acceptance Criteria:**
- Server shuts down after configured idle timeout
- In-flight requests complete successfully during shutdown
- Shutdown is logged clearly
---
## Phase 6: Testing & Polish
**Goal:** End-to-end testing and production readiness
**Tasks:**
1. Integration tests:
```rust
#[test]
fn test_server_lifecycle() {
// Start server
let (handle, port) = start_test_server();
// Make requests
let wants = reqwest::blocking::get(
&format!("http://localhost:{}/api/wants", port)
).unwrap().json::<ListWantsResponse>().unwrap();
// Stop server
handle.shutdown();
}
```
2. Error handling improvements:
- Proper HTTP status codes (400, 404, 500)
- Structured error responses:
```json
{"error": "Want not found", "want_id": "abc123"}
```
- Add `tracing` crate for structured logging
3. Add CORS middleware for web app:
```rust
let cors = CorsLayer::new()
.allow_origin("http://localhost:3000".parse::<HeaderValue>().unwrap())
.allow_methods([Method::GET, Method::POST, Method::DELETE]);
app.layer(cors)
```
4. Health check endpoint:
```rust
async fn health() -> &'static str {
"OK"
}
```
5. Optional: Metrics endpoint (prometheus format):
```rust
async fn metrics() -> String {
format!(
"# HELP databuild_wants_total Total number of wants\n\
databuild_wants_total {}\n\
# HELP databuild_job_runs_total Total number of job runs\n\
databuild_job_runs_total {}\n",
want_count, job_run_count
)
}
```
**Files Created:**
- `databuild/tests/http_integration_test.rs` - integration tests
**Files Modified:**
- `databuild/http_server.rs` - add CORS, health, metrics, better errors
- `MODULE.bazel` - add `tracing` dependency
**Acceptance Criteria:**
- All endpoints have proper error handling
- CORS works for web app development
- Health check returns 200 OK
- Integration tests pass
---
## Future Enhancements (Not in Initial Plan)
### Workspace Auto-Discovery
- Walk up directory tree looking for `.databuild/` marker
- Store server metadata in `.databuild/server.json`:
```json
{
"pid": 12345,
"port": 54321,
"started_at": "2025-01-22T10:30:00Z",
"workspace_root": "/Users/stuart/Projects/databuild"
}
```
- CLI auto-starts server if not running
### Log Streaming (SSE)
- Implement `GET /api/job_runs/:id/logs/stdout?follow=true`
- Use Server-Sent Events for streaming
- Integrate with FileLogStore from logging.md plan
### State Caching
- Cache reconstructed BuildState for faster reads
- Invalidate cache when new events arrive
- Use `tokio::sync::RwLock<Option<(u64, BuildState)>>` where u64 is latest_event_id
### gRPC Support (If Needed)
- Add Tonic alongside Axum
- Share same orchestrator/command channel
- Useful for language-agnostic clients
---
## Dependencies Summary
New dependencies to add to `MODULE.bazel`:
```python
# Async runtime
crate.spec(package = "tokio", features = ["full"], version = "1.0")
# Web framework
crate.spec(package = "axum", version = "0.7")
crate.spec(package = "tower", version = "0.4")
crate.spec(package = "tower-http", features = ["trace", "cors"], version = "0.5")
# CLI
crate.spec(package = "clap", features = ["derive"], version = "4.0")
# HTTP client for CLI
crate.spec(package = "reqwest", features = ["blocking", "json"], version = "0.11")
# Logging
crate.spec(package = "tracing", version = "0.1")
crate.spec(package = "tracing-subscriber", version = "0.3")
```
---
## Estimated Timeline
- **Phase 1:** 2-3 hours (thread-safe BEL storage)
- **Phase 2:** 4-6 hours (HTTP server with Axum)
- **Phase 3:** 3-4 hours (basic CLI)
- **Phase 4:** 3-4 hours (orchestrator integration)
- **Phase 5:** 2-3 hours (idle shutdown)
- **Phase 6:** 4-6 hours (testing and polish)
**Total:** ~18-26 hours for complete implementation
---
## Design Rationale
### Why Event Log Separation?
**Alternatives Considered:**
1. **Shared State with RwLock**: Orchestrator holds write lock during `step()`, blocking all reads
2. **Actor Model**: Extra overhead from message passing for all operations
**Why Event Log Separation Wins:**
- Orchestrator stays completely synchronous (no refactoring)
- Reads don't block writes (eventual consistency acceptable for build system)
- Natural fit with event sourcing architecture
- Can cache reconstructed state for even better read performance
### Why Not gRPC?
- User requirement: "JSON is a must"
- REST is more debuggable (curl, browser dev tools)
- gRPC adds complexity without clear benefit
- Can add gRPC later if needed (both can coexist)
### Why Axum Over Actix?
- Better compile-time type safety (extractors)
- Cleaner middleware composition (Tower)
- Native async/await (Actix uses actor model internally)
- More ergonomic for this use case
### Why Per-Workspace Server?
- Isolation: builds in different projects don't interfere
- Simpler: no need to route requests by workspace
- Matches Bazel's model (users already understand it)
- Easier to reason about resource usage