databuild/databuild/cli_main.rs
2025-11-23 11:19:02 +08:00

402 lines
12 KiB
Rust

use clap::{Parser, Subcommand};
use reqwest::blocking::Client;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::sync::{broadcast, mpsc};
use lib::build_event_log::SqliteBELStorage;
use lib::build_state::BuildState;
use lib::config::DatabuildConfig;
use lib::http_server::{create_router, AppState};
use lib::orchestrator::{Orchestrator, OrchestratorConfig};
#[derive(Parser)]
#[command(name = "databuild")]
#[command(about = "DataBuild CLI - Build system for data pipelines", long_about = None)]
struct Cli {
/// Server URL (default: http://localhost:3000)
#[arg(long, default_value = "http://localhost:3000", global = true)]
server: String,
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
/// Start the DataBuild HTTP server
Serve {
/// Port to listen on
#[arg(long, default_value = "3000")]
port: u16,
/// Database URL (default: :memory: for in-memory SQLite)
#[arg(long, default_value = ":memory:")]
database: String,
/// Path to configuration file (JSON or TOML)
#[arg(long)]
config: Option<String>,
},
/// Create a new want (trigger partition builds)
Want {
/// Partition references to build (e.g., "data/users", "metrics/daily")
partitions: Vec<String>,
},
/// List and manage wants
Wants {
#[command(subcommand)]
command: WantsCommands,
},
/// List and inspect partitions
Partitions {
#[command(subcommand)]
command: Option<PartitionsCommands>,
},
/// List and inspect job runs
JobRuns {
#[command(subcommand)]
command: Option<JobRunsCommands>,
},
}
#[derive(Subcommand)]
enum WantsCommands {
/// List all wants
List,
}
#[derive(Subcommand)]
enum PartitionsCommands {
/// List all partitions
List,
}
#[derive(Subcommand)]
enum JobRunsCommands {
/// List all job runs
List,
}
fn main() {
let cli = Cli::parse();
match cli.command {
Commands::Serve { port, database, config } => {
cmd_serve(port, &database, config.as_deref());
}
Commands::Want { partitions } => {
cmd_want(&cli.server, partitions);
}
Commands::Wants { command } => match command {
WantsCommands::List => {
cmd_wants_list(&cli.server);
}
},
Commands::Partitions { command } => match command {
Some(PartitionsCommands::List) | None => {
cmd_partitions_list(&cli.server);
}
},
Commands::JobRuns { command } => match command {
Some(JobRunsCommands::List) | None => {
cmd_job_runs_list(&cli.server);
}
},
}
}
// ============================================================================
// Command Implementations
// ============================================================================
#[tokio::main]
async fn cmd_serve(port: u16, database: &str, config_path: Option<&str>) {
// Initialize logging
tracing_subscriber::fmt::init();
// Load configuration if provided
let jobs = if let Some(path) = config_path {
match DatabuildConfig::from_file(path) {
Ok(config) => {
println!("Loaded configuration from: {}", path);
println!(" Jobs: {}", config.jobs.len());
config.into_job_configurations()
}
Err(e) => {
eprintln!("Failed to load configuration from {}: {}", path, e);
std::process::exit(1);
}
}
} else {
Vec::new()
};
// Create SQLite BEL storage (shared between orchestrator and HTTP server)
let bel_storage = Arc::new(
SqliteBELStorage::create(database).expect("Failed to create BEL storage"),
);
// Create command channel for orchestrator communication
let (command_tx, command_rx) = mpsc::channel(100);
// Create event broadcast channel (orchestrator -> HTTP server)
let (event_tx, _event_rx) = broadcast::channel(1000);
// Create shutdown broadcast channel
let (shutdown_tx, _shutdown_rx) = broadcast::channel(1);
// Create shared mirrored build state for HTTP server
let mirrored_state = Arc::new(RwLock::new(BuildState::default()));
// Spawn state-mirror task to keep HTTP server's build state in sync
let mirror_clone = mirrored_state.clone();
let mut mirror_rx = event_tx.subscribe();
tokio::spawn(async move {
while let Ok(event) = mirror_rx.recv().await {
match mirror_clone.write() {
Ok(mut state) => {
state.handle_event(&event);
}
Err(e) => {
eprintln!("State mirror task: RwLock poisoned, cannot update state: {}", e);
break;
}
}
}
});
// Spawn orchestrator in background thread
// Note: Orchestrator needs its own BEL storage instance for writes
let orch_bel_storage = SqliteBELStorage::create(database).expect("Failed to create BEL storage");
let orch_shutdown_rx = shutdown_tx.subscribe();
let orch_handle = std::thread::spawn(move || {
// Create orchestrator with both channels and jobs from config
let config = OrchestratorConfig { jobs };
let mut orchestrator = Orchestrator::new_with_channels(
orch_bel_storage,
config,
command_rx,
event_tx,
);
let mut shutdown_rx = orch_shutdown_rx;
// Run orchestrator loop
loop {
// Check for shutdown signal
if shutdown_rx.try_recv().is_ok() {
println!("Orchestrator received shutdown signal");
break;
}
if let Err(e) = orchestrator.step() {
eprintln!("Orchestrator error: {}", e);
}
// Small sleep to avoid busy-waiting
std::thread::sleep(std::time::Duration::from_millis(10));
}
});
// Create app state with mirrored state, shared storage, command sender, and shutdown channel
let state = AppState::new(mirrored_state, bel_storage, command_tx, shutdown_tx.clone());
// Spawn idle timeout checker task
let idle_state = state.clone();
let idle_shutdown_tx = shutdown_tx.clone();
tokio::spawn(async move {
let idle_timeout = Duration::from_secs(3 * 60 * 60); // 3 hours
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
let last_request = idle_state.last_request_time.load(std::sync::atomic::Ordering::Relaxed);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
if now - last_request > idle_timeout.as_millis() as u64 {
eprintln!(
"Server idle for {} hours, shutting down",
idle_timeout.as_secs() / 3600
);
let _ = idle_shutdown_tx.send(());
break;
}
}
});
// Create router
let app = create_router(state);
// Bind to specified port
let addr = format!("127.0.0.1:{}", port);
let listener = tokio::net::TcpListener::bind(&addr)
.await
.unwrap_or_else(|_| panic!("Failed to bind to {}", addr));
println!("DataBuild server listening on http://{}", addr);
println!(" GET /health");
println!(" GET /api/wants");
println!(" POST /api/wants");
println!(" GET /api/wants/:id");
println!(" GET /api/partitions");
println!(" GET /api/job_runs");
// Subscribe to shutdown signal for graceful shutdown
let mut server_shutdown_rx = shutdown_tx.subscribe();
// Run the server with graceful shutdown
axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = server_shutdown_rx.recv().await;
println!("HTTP server received shutdown signal");
})
.await
.expect("Server error");
// Wait for orchestrator to finish
let _ = orch_handle.join();
println!("Shutdown complete");
}
fn cmd_want(server_url: &str, partitions: Vec<String>) {
let client = Client::new();
// Convert partition strings to PartitionRef objects
let partition_refs: Vec<serde_json::Value> = partitions
.iter()
.map(|p| serde_json::json!({"ref": p}))
.collect();
// Get current timestamp (milliseconds since epoch)
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let request = serde_json::json!({
"partitions": partition_refs,
"data_timestamp": now,
"ttl_seconds": 3600, // 1 hour default
"sla_seconds": 300 // 5 minutes default
});
let url = format!("{}/api/wants", server_url);
match client.post(&url)
.json(&request)
.send()
{
Ok(response) => {
if response.status().is_success() {
println!("Want created successfully");
if let Ok(body) = response.text() {
println!("{}", body);
}
} else {
eprintln!("Failed to create want: {}", response.status());
if let Ok(body) = response.text() {
eprintln!("{}", body);
}
std::process::exit(1);
}
}
Err(e) => {
eprintln!("Failed to connect to server: {}", e);
eprintln!("Is the server running? Try: databuild serve");
std::process::exit(1);
}
}
}
fn cmd_wants_list(server_url: &str) {
let client = Client::new();
let url = format!("{}/api/wants", server_url);
match client.get(&url).send() {
Ok(response) => {
if response.status().is_success() {
match response.json::<serde_json::Value>() {
Ok(json) => {
println!("{}", serde_json::to_string_pretty(&json).unwrap());
}
Err(e) => {
eprintln!("Failed to parse response: {}", e);
std::process::exit(1);
}
}
} else {
eprintln!("Request failed: {}", response.status());
std::process::exit(1);
}
}
Err(e) => {
eprintln!("Failed to connect to server: {}", e);
eprintln!("Is the server running? Try: databuild serve");
std::process::exit(1);
}
}
}
fn cmd_partitions_list(server_url: &str) {
let client = Client::new();
let url = format!("{}/api/partitions", server_url);
match client.get(&url).send() {
Ok(response) => {
if response.status().is_success() {
match response.json::<serde_json::Value>() {
Ok(json) => {
println!("{}", serde_json::to_string_pretty(&json).unwrap());
}
Err(e) => {
eprintln!("Failed to parse response: {}", e);
std::process::exit(1);
}
}
} else {
eprintln!("Request failed: {}", response.status());
std::process::exit(1);
}
}
Err(e) => {
eprintln!("Failed to connect to server: {}", e);
eprintln!("Is the server running? Try: databuild serve");
std::process::exit(1);
}
}
}
fn cmd_job_runs_list(server_url: &str) {
let client = Client::new();
let url = format!("{}/api/job_runs", server_url);
match client.get(&url).send() {
Ok(response) => {
if response.status().is_success() {
match response.json::<serde_json::Value>() {
Ok(json) => {
println!("{}", serde_json::to_string_pretty(&json).unwrap());
}
Err(e) => {
eprintln!("Failed to parse response: {}", e);
std::process::exit(1);
}
}
} else {
eprintln!("Request failed: {}", response.status());
std::process::exit(1);
}
}
Err(e) => {
eprintln!("Failed to connect to server: {}", e);
eprintln!("Is the server running? Try: databuild serve");
std::process::exit(1);
}
}
}