implemented api phase 4

This commit is contained in:
Stuart Axelbrooke 2025-11-22 21:42:27 +08:00
parent 6f7c6b3318
commit da23af3227
9 changed files with 2057 additions and 45 deletions

View file

@ -91,6 +91,16 @@ crate.spec(
package = "tracing-subscriber",
version = "0.3",
)
crate.spec(
features = ["derive"],
package = "clap",
version = "4.0",
)
crate.spec(
features = ["blocking", "json"],
package = "reqwest",
version = "0.11",
)
crate.from_specs()
use_repo(crate, "crates")

File diff suppressed because one or more lines are too long

View file

@ -62,6 +62,24 @@ rust_binary(
],
)
# DataBuild CLI binary
rust_binary(
name = "databuild_cli",
srcs = ["cli_main.rs"],
edition = "2021",
visibility = ["//visibility:public"],
deps = [
":databuild",
"@crates//:axum",
"@crates//:clap",
"@crates//:reqwest",
"@crates//:serde_json",
"@crates//:tokio",
"@crates//:tracing",
"@crates//:tracing-subscriber",
],
)
# Legacy filegroup for backwards compatibility
filegroup(
name = "proto",

307
databuild/cli_main.rs Normal file
View file

@ -0,0 +1,307 @@
use clap::{Parser, Subcommand};
use databuild::build_event_log::SqliteBELStorage;
use databuild::http_server::{create_router, AppState};
use reqwest::blocking::Client;
use std::sync::Arc;
use tokio::sync::mpsc;
#[derive(Parser)]
#[command(name = "databuild")]
#[command(about = "DataBuild CLI - Build system for data pipelines", long_about = None)]
struct Cli {
/// Server URL (default: http://localhost:3000)
#[arg(long, default_value = "http://localhost:3000", global = true)]
server: String,
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
/// Start the DataBuild HTTP server
Serve {
/// Port to listen on
#[arg(long, default_value = "3000")]
port: u16,
/// Database URL (default: :memory: for in-memory SQLite)
#[arg(long, default_value = ":memory:")]
database: String,
},
/// Create a new want (trigger partition builds)
Want {
/// Partition references to build (e.g., "data/users", "metrics/daily")
partitions: Vec<String>,
},
/// List and manage wants
Wants {
#[command(subcommand)]
command: WantsCommands,
},
/// List and inspect partitions
Partitions {
#[command(subcommand)]
command: Option<PartitionsCommands>,
},
/// List and inspect job runs
JobRuns {
#[command(subcommand)]
command: Option<JobRunsCommands>,
},
}
#[derive(Subcommand)]
enum WantsCommands {
/// List all wants
List,
}
#[derive(Subcommand)]
enum PartitionsCommands {
/// List all partitions
List,
}
#[derive(Subcommand)]
enum JobRunsCommands {
/// List all job runs
List,
}
fn main() {
let cli = Cli::parse();
match cli.command {
Commands::Serve { port, database } => {
cmd_serve(port, &database);
}
Commands::Want { partitions } => {
cmd_want(&cli.server, partitions);
}
Commands::Wants { command } => match command {
WantsCommands::List => {
cmd_wants_list(&cli.server);
}
},
Commands::Partitions { command } => match command {
Some(PartitionsCommands::List) | None => {
cmd_partitions_list(&cli.server);
}
},
Commands::JobRuns { command } => match command {
Some(JobRunsCommands::List) | None => {
cmd_job_runs_list(&cli.server);
}
},
}
}
// ============================================================================
// Command Implementations
// ============================================================================
#[tokio::main]
async fn cmd_serve(port: u16, database: &str) {
use databuild::build_event_log::BELStorage;
// Initialize logging
tracing_subscriber::fmt::init();
// Create SQLite BEL storage (shared between orchestrator and HTTP server)
let bel_storage = Arc::new(
SqliteBELStorage::create(database).expect("Failed to create BEL storage"),
);
// Create command channel for orchestrator communication
let (command_tx, command_rx) = mpsc::channel(100);
// Spawn orchestrator in background thread
// Note: Orchestrator needs its own BEL storage instance for writes
let orch_bel_storage = SqliteBELStorage::create(database).expect("Failed to create BEL storage");
let orch_handle = std::thread::spawn(move || {
use databuild::orchestrator::{Orchestrator, OrchestratorConfig};
// Create orchestrator with command channel
let config = OrchestratorConfig::default();
let mut orchestrator = Orchestrator::new_with_commands(orch_bel_storage, config, command_rx);
// Run orchestrator loop
loop {
if let Err(e) = orchestrator.step() {
eprintln!("Orchestrator error: {}", e);
}
// Small sleep to avoid busy-waiting
std::thread::sleep(std::time::Duration::from_millis(10));
}
});
// Create app state with shared storage and command sender
let state = AppState::new(bel_storage, command_tx);
// Create router
let app = create_router(state);
// Bind to specified port
let addr = format!("127.0.0.1:{}", port);
let listener = tokio::net::TcpListener::bind(&addr)
.await
.unwrap_or_else(|_| panic!("Failed to bind to {}", addr));
println!("DataBuild server listening on http://{}", addr);
println!(" GET /health");
println!(" GET /api/wants");
println!(" POST /api/wants");
println!(" GET /api/wants/:id");
println!(" GET /api/partitions");
println!(" GET /api/job_runs");
// Run the server
axum::serve(listener, app)
.await
.expect("Server error");
// Wait for orchestrator (this will never actually happen in normal operation)
let _ = orch_handle.join();
}
fn cmd_want(server_url: &str, partitions: Vec<String>) {
let client = Client::new();
// Convert partition strings to PartitionRef objects
let partition_refs: Vec<serde_json::Value> = partitions
.iter()
.map(|p| serde_json::json!({"ref": p}))
.collect();
// Get current timestamp (milliseconds since epoch)
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let request = serde_json::json!({
"partitions": partition_refs,
"data_timestamp": now,
"ttl_seconds": 3600, // 1 hour default
"sla_seconds": 300 // 5 minutes default
});
let url = format!("{}/api/wants", server_url);
match client.post(&url)
.json(&request)
.send()
{
Ok(response) => {
if response.status().is_success() {
println!("Want created successfully");
if let Ok(body) = response.text() {
println!("{}", body);
}
} else {
eprintln!("Failed to create want: {}", response.status());
if let Ok(body) = response.text() {
eprintln!("{}", body);
}
std::process::exit(1);
}
}
Err(e) => {
eprintln!("Failed to connect to server: {}", e);
eprintln!("Is the server running? Try: databuild serve");
std::process::exit(1);
}
}
}
fn cmd_wants_list(server_url: &str) {
let client = Client::new();
let url = format!("{}/api/wants", server_url);
match client.get(&url).send() {
Ok(response) => {
if response.status().is_success() {
match response.json::<serde_json::Value>() {
Ok(json) => {
println!("{}", serde_json::to_string_pretty(&json).unwrap());
}
Err(e) => {
eprintln!("Failed to parse response: {}", e);
std::process::exit(1);
}
}
} else {
eprintln!("Request failed: {}", response.status());
std::process::exit(1);
}
}
Err(e) => {
eprintln!("Failed to connect to server: {}", e);
eprintln!("Is the server running? Try: databuild serve");
std::process::exit(1);
}
}
}
fn cmd_partitions_list(server_url: &str) {
let client = Client::new();
let url = format!("{}/api/partitions", server_url);
match client.get(&url).send() {
Ok(response) => {
if response.status().is_success() {
match response.json::<serde_json::Value>() {
Ok(json) => {
println!("{}", serde_json::to_string_pretty(&json).unwrap());
}
Err(e) => {
eprintln!("Failed to parse response: {}", e);
std::process::exit(1);
}
}
} else {
eprintln!("Request failed: {}", response.status());
std::process::exit(1);
}
}
Err(e) => {
eprintln!("Failed to connect to server: {}", e);
eprintln!("Is the server running? Try: databuild serve");
std::process::exit(1);
}
}
}
fn cmd_job_runs_list(server_url: &str) {
let client = Client::new();
let url = format!("{}/api/job_runs", server_url);
match client.get(&url).send() {
Ok(response) => {
if response.status().is_success() {
match response.json::<serde_json::Value>() {
Ok(json) => {
println!("{}", serde_json::to_string_pretty(&json).unwrap());
}
Err(e) => {
eprintln!("Failed to parse response: {}", e);
std::process::exit(1);
}
}
} else {
eprintln!("Request failed: {}", response.status());
std::process::exit(1);
}
}
Err(e) => {
eprintln!("Failed to connect to server: {}", e);
eprintln!("Is the server running? Try: databuild serve");
std::process::exit(1);
}
}
}

19
databuild/commands.rs Normal file
View file

@ -0,0 +1,19 @@
use crate::util::DatabuildError;
use crate::{CancelWantRequest, CancelWantResponse, CreateWantRequest, CreateWantResponse};
use tokio::sync::oneshot;
/// Commands that can be sent to the orchestrator via the command channel.
/// Only write operations need commands; reads go directly to BEL storage.
pub enum Command {
/// Create a new want
CreateWant {
request: CreateWantRequest,
reply: oneshot::Sender<Result<CreateWantResponse, DatabuildError>>,
},
/// Cancel an existing want
CancelWant {
request: CancelWantRequest,
reply: oneshot::Sender<Result<CancelWantResponse, DatabuildError>>,
},
}

View file

@ -1,9 +1,10 @@
use crate::build_event_log::BELStorage;
use crate::build_state::BuildState;
use crate::commands::Command;
use crate::{
CreateWantRequest, CreateWantResponse, GetWantRequest, GetWantResponse, ListJobRunsRequest,
ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListWantsRequest,
ListWantsResponse,
CancelWantRequest, CreateWantRequest, CreateWantResponse, GetWantRequest, GetWantResponse,
ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse,
ListWantsRequest, ListWantsResponse,
};
use axum::{
extract::{Path, Query, State},
@ -13,20 +14,24 @@ use axum::{
Json, Router,
};
use std::sync::{atomic::AtomicU64, Arc};
use tokio::sync::{mpsc, oneshot};
/// Shared application state for HTTP handlers
#[derive(Clone)]
pub struct AppState {
/// Shared read-only access to BEL storage (for reconstructing state)
pub bel_storage: Arc<dyn BELStorage>,
/// Command sender for write operations (sends to orchestrator)
pub command_tx: mpsc::Sender<Command>,
/// For idle timeout tracking (Phase 5)
pub last_request_time: Arc<AtomicU64>,
}
impl AppState {
pub fn new(bel_storage: Arc<dyn BELStorage>) -> Self {
pub fn new(bel_storage: Arc<dyn BELStorage>, command_tx: mpsc::Sender<Command>) -> Self {
Self {
bel_storage,
command_tx,
last_request_time: Arc::new(AtomicU64::new(0)),
}
}
@ -119,36 +124,96 @@ async fn get_want(
}
}
/// Create a new want (stub for now - Phase 4 will implement actual command sending)
/// Create a new want
async fn create_want(
State(_state): State<AppState>,
State(state): State<AppState>,
Json(req): Json<CreateWantRequest>,
) -> impl IntoResponse {
// TODO Phase 4: Send command to orchestrator via channel
// For now, return a stub response
(
StatusCode::NOT_IMPLEMENTED,
// Create oneshot channel for reply
let (reply_tx, reply_rx) = oneshot::channel();
// Send command to orchestrator
let command = Command::CreateWant {
request: req,
reply: reply_tx,
};
if let Err(_) = state.command_tx.send(command).await {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": "Create want not yet implemented (Phase 4)",
"requested_partitions": req.partitions,
"error": "Failed to send command to orchestrator"
})),
)
.into_response();
}
/// Cancel a want (stub for now - Phase 4 will implement actual command sending)
async fn cancel_want(
State(_state): State<AppState>,
Path(want_id): Path<String>,
) -> impl IntoResponse {
// TODO Phase 4: Send command to orchestrator via channel
// For now, return a stub response
(
StatusCode::NOT_IMPLEMENTED,
// Wait for orchestrator reply
match reply_rx.await {
Ok(Ok(response)) => (StatusCode::OK, Json(response)).into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": "Cancel want not yet implemented (Phase 4)",
"want_id": want_id,
"error": format!("Failed to create want: {}", e)
})),
)
.into_response(),
Err(_) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": "Orchestrator did not respond"
})),
)
.into_response(),
}
}
/// Cancel a want
async fn cancel_want(
State(state): State<AppState>,
Path(want_id): Path<String>,
) -> impl IntoResponse {
// Create oneshot channel for reply
let (reply_tx, reply_rx) = oneshot::channel();
// Send command to orchestrator
let command = Command::CancelWant {
request: CancelWantRequest {
want_id,
source: None, // HTTP requests don't have a source
comment: None,
},
reply: reply_tx,
};
if let Err(_) = state.command_tx.send(command).await {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": "Failed to send command to orchestrator"
})),
)
.into_response();
}
// Wait for orchestrator reply
match reply_rx.await {
Ok(Ok(response)) => (StatusCode::OK, Json(response)).into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": format!("Failed to cancel want: {}", e)
})),
)
.into_response(),
Err(_) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": "Orchestrator did not respond"
})),
)
.into_response(),
}
}
/// List all partitions

View file

@ -1,5 +1,6 @@
pub mod build_event_log;
mod build_state;
pub mod commands;
mod data_deps;
mod event_transforms;
pub mod http_server;
@ -7,7 +8,7 @@ mod job;
mod job_run;
mod job_run_state;
mod mock_job_run;
mod orchestrator;
pub mod orchestrator;
mod partition_state;
mod util;
mod want_state;

View file

@ -1,4 +1,5 @@
use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage};
use crate::commands::Command;
use crate::data_build_event::Event;
use crate::job::JobConfiguration;
use crate::job_run::SubProcessBackend;
@ -6,6 +7,7 @@ use crate::util::DatabuildError;
use crate::{JobRunBufferEventV1, PartitionRef, WantDetail};
use std::collections::HashMap;
use std::fmt::Debug;
use tokio::sync::mpsc;
/**
Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads +
@ -17,10 +19,12 @@ JTBDs:
- Orchestrator polls queued and active job runs, keeping track of their state, and scheduling queued
jobs when possible
*/
struct Orchestrator<S: BELStorage + Debug> {
pub struct Orchestrator<S: BELStorage + Debug> {
pub bel: BuildEventLog<S>,
pub config: OrchestratorConfig,
pub job_runs: Vec<crate::job_run::JobRunHandle<SubProcessBackend>>,
/// Optional command receiver for write operations from HTTP server
pub command_rx: Option<mpsc::Receiver<Command>>,
}
impl Default for Orchestrator<MemoryBELStorage> {
@ -29,6 +33,7 @@ impl Default for Orchestrator<MemoryBELStorage> {
bel: Default::default(),
config: Default::default(),
job_runs: Default::default(),
command_rx: None,
}
}
}
@ -39,6 +44,7 @@ impl Orchestrator<MemoryBELStorage> {
bel: self.bel.clone(),
config: self.config.clone(),
job_runs: Default::default(),
command_rx: None,
}
}
}
@ -61,8 +67,8 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
}
#[derive(Debug, Clone)]
struct OrchestratorConfig {
jobs: Vec<JobConfiguration>,
pub struct OrchestratorConfig {
pub jobs: Vec<JobConfiguration>,
}
impl Default for OrchestratorConfig {
@ -121,6 +127,20 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
bel: BuildEventLog::new(storage, Default::default()),
config,
job_runs: Vec::new(),
command_rx: None,
}
}
pub fn new_with_commands(
storage: S,
config: OrchestratorConfig,
command_rx: mpsc::Receiver<Command>,
) -> Self {
Self {
bel: BuildEventLog::new(storage, Default::default()),
config,
job_runs: Vec::new(),
command_rx: Some(command_rx),
}
}
@ -272,7 +292,30 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
Ok(())
}
fn step(&mut self) -> Result<(), DatabuildError> {
/// Process all pending commands from the HTTP server
fn process_commands(&mut self) -> Result<(), DatabuildError> {
if let Some(ref mut rx) = self.command_rx {
// Process all pending commands (non-blocking)
while let Ok(cmd) = rx.try_recv() {
match cmd {
Command::CreateWant { request, reply } => {
let response = self.bel.api_handle_want_create(request);
let _ = reply.send(response); // Ignore send errors
}
Command::CancelWant { request, reply } => {
let response = self.bel.api_handle_want_cancel(request);
let _ = reply.send(response); // Ignore send errors
}
}
}
}
Ok(())
}
pub fn step(&mut self) -> Result<(), DatabuildError> {
// Process commands first (write operations)
self.process_commands()?;
// Then poll job runs and wants
self.poll_job_runs()?;
self.poll_wants()?;
Ok(())

View file

@ -1,18 +1,44 @@
use databuild::build_event_log::SqliteBELStorage;
use databuild::http_server::{create_router, AppState};
use std::sync::Arc;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
use databuild::orchestrator::{Orchestrator, OrchestratorConfig};
// Initialize logging
tracing_subscriber::fmt::init();
// Create SQLite BEL storage (in-memory for now)
let bel_storage = SqliteBELStorage::create(":memory:")
.expect("Failed to create BEL storage");
let database = ":memory:";
// Create app state with shared storage
let state = AppState::new(Arc::new(bel_storage));
// Create SQLite BEL storage (shared between orchestrator and HTTP server)
let bel_storage = Arc::new(
SqliteBELStorage::create(database).expect("Failed to create BEL storage"),
);
// Create command channel for orchestrator communication
let (command_tx, command_rx) = mpsc::channel(100);
// Spawn orchestrator in background thread
let orch_bel_storage = SqliteBELStorage::create(database).expect("Failed to create BEL storage");
let orch_handle = std::thread::spawn(move || {
// Create orchestrator with command channel
let config = OrchestratorConfig::default();
let mut orchestrator = Orchestrator::new_with_commands(orch_bel_storage, config, command_rx);
// Run orchestrator loop
loop {
if let Err(e) = orchestrator.step() {
eprintln!("Orchestrator error: {}", e);
}
// Small sleep to avoid busy-waiting
std::thread::sleep(std::time::Duration::from_millis(10));
}
});
// Create app state with shared storage and command sender
let state = AppState::new(bel_storage, command_tx);
// Create router
let app = create_router(state);
@ -34,4 +60,7 @@ async fn main() {
axum::serve(listener, app)
.await
.expect("Server error");
// Wait for orchestrator (this will never actually happen in normal operation)
let _ = orch_handle.join();
}