diff --git a/databuild/BUILD.bazel b/databuild/BUILD.bazel index 6920c15..7e08f59 100644 --- a/databuild/BUILD.bazel +++ b/databuild/BUILD.bazel @@ -18,7 +18,7 @@ rust_binary( # DataBuild library using generated prost code rust_library( - name = "databuild", + name = "lib", srcs = glob(["**/*.rs"]) + [ ":generate_databuild_rust", ], @@ -44,34 +44,19 @@ rust_library( rust_test( name = "databuild_test", - crate = ":databuild", + crate = ":lib", data = ["//databuild/test:test_job_helper"], env = {"RUST_BACKTRACE": "1"}, ) -# DataBuild HTTP server binary -rust_binary( - name = "databuild_server", - srcs = ["server_main.rs"], - edition = "2021", - visibility = ["//visibility:public"], - deps = [ - ":databuild", - "@crates//:axum", - "@crates//:tokio", - "@crates//:tracing", - "@crates//:tracing-subscriber", - ], -) - # DataBuild CLI binary rust_binary( - name = "databuild_cli", + name = "databuild", srcs = ["cli_main.rs"], edition = "2021", visibility = ["//visibility:public"], deps = [ - ":databuild", + ":lib", "@crates//:axum", "@crates//:clap", "@crates//:reqwest", diff --git a/databuild/cli_main.rs b/databuild/cli_main.rs index 6a7619b..5595a30 100644 --- a/databuild/cli_main.rs +++ b/databuild/cli_main.rs @@ -1,10 +1,13 @@ use clap::{Parser, Subcommand}; -use databuild::build_event_log::SqliteBELStorage; -use databuild::http_server::{create_router, AppState}; use reqwest::blocking::Client; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::Duration; use tokio::sync::{broadcast, mpsc}; +use lib::build_event_log::SqliteBELStorage; +use lib::build_state::BuildState; +use lib::config::DatabuildConfig; +use lib::http_server::{create_router, AppState}; +use lib::orchestrator::{Orchestrator, OrchestratorConfig}; #[derive(Parser)] #[command(name = "databuild")] @@ -112,14 +115,13 @@ fn main() { #[tokio::main] async fn cmd_serve(port: u16, database: &str, config_path: Option<&str>) { - use databuild::build_event_log::BELStorage; // Initialize logging tracing_subscriber::fmt::init(); // Load configuration if provided let jobs = if let Some(path) = config_path { - match databuild::config::DatabuildConfig::from_file(path) { + match DatabuildConfig::from_file(path) { Ok(config) => { println!("Loaded configuration from: {}", path); println!(" Jobs: {}", config.jobs.len()); @@ -142,19 +144,38 @@ async fn cmd_serve(port: u16, database: &str, config_path: Option<&str>) { // Create command channel for orchestrator communication let (command_tx, command_rx) = mpsc::channel(100); + // Create event broadcast channel (orchestrator -> HTTP server) + let (event_tx, _event_rx) = broadcast::channel(1000); + // Create shutdown broadcast channel let (shutdown_tx, _shutdown_rx) = broadcast::channel(1); + // Create shared mirrored build state for HTTP server + let mirrored_state = Arc::new(RwLock::new(BuildState::default())); + + // Spawn state-mirror task to keep HTTP server's build state in sync + let mirror_clone = mirrored_state.clone(); + let mut mirror_rx = event_tx.subscribe(); + tokio::spawn(async move { + while let Ok(event) = mirror_rx.recv().await { + let mut state = mirror_clone.write().unwrap(); + state.handle_event(&event); + } + }); + // Spawn orchestrator in background thread // Note: Orchestrator needs its own BEL storage instance for writes let orch_bel_storage = SqliteBELStorage::create(database).expect("Failed to create BEL storage"); let orch_shutdown_rx = shutdown_tx.subscribe(); let orch_handle = std::thread::spawn(move || { - use databuild::orchestrator::{Orchestrator, OrchestratorConfig}; - - // Create orchestrator with command channel and jobs from config + // Create orchestrator with both channels and jobs from config let config = OrchestratorConfig { jobs }; - let mut orchestrator = Orchestrator::new_with_commands(orch_bel_storage, config, command_rx); + let mut orchestrator = Orchestrator::new_with_channels( + orch_bel_storage, + config, + command_rx, + event_tx, + ); let mut shutdown_rx = orch_shutdown_rx; // Run orchestrator loop @@ -173,8 +194,8 @@ async fn cmd_serve(port: u16, database: &str, config_path: Option<&str>) { } }); - // Create app state with shared storage, command sender, and shutdown channel - let state = AppState::new(bel_storage, command_tx, shutdown_tx.clone()); + // Create app state with mirrored state, shared storage, command sender, and shutdown channel + let state = AppState::new(mirrored_state, bel_storage, command_tx, shutdown_tx.clone()); // Spawn idle timeout checker task let idle_state = state.clone(); diff --git a/databuild/http_server.rs b/databuild/http_server.rs index acc1db5..bffd884 100644 --- a/databuild/http_server.rs +++ b/databuild/http_server.rs @@ -25,9 +25,9 @@ use tower_http::cors::CorsLayer; /// Shared application state for HTTP handlers #[derive(Clone)] pub struct AppState { - /// Shared read-only access to build state (maintained by orchestrator) + /// Mirrored build state (updated via event broadcast from orchestrator) pub build_state: Arc>, - /// Shared read-only access to BEL storage (for event log queries) + /// Shared read-only access to BEL storage (for event log queries if needed) pub bel_storage: Arc, /// Command sender for write operations (sends to orchestrator) pub command_tx: mpsc::Sender, diff --git a/databuild/lib.rs b/databuild/lib.rs index 277de5e..4236a4b 100644 --- a/databuild/lib.rs +++ b/databuild/lib.rs @@ -1,5 +1,5 @@ pub mod build_event_log; -mod build_state; +pub mod build_state; pub mod commands; pub mod config; mod data_deps; diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index 38b19e5..27924cd 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -7,7 +7,7 @@ use crate::util::DatabuildError; use crate::{JobRunBufferEventV1, PartitionRef, WantDetail}; use std::collections::HashMap; use std::fmt::Debug; -use tokio::sync::mpsc; +use tokio::sync::{broadcast, mpsc}; use uuid::Uuid; /** @@ -26,6 +26,8 @@ pub struct Orchestrator { pub job_runs: Vec>, /// Optional command receiver for write operations from HTTP server pub command_rx: Option>, + /// Optional event broadcaster for state updates to HTTP server + pub event_tx: Option>, /// Environment variables for each job run (keyed by job_run_id) pub job_environments: HashMap>, } @@ -37,6 +39,7 @@ impl Default for Orchestrator { config: Default::default(), job_runs: Default::default(), command_rx: None, + event_tx: None, job_environments: HashMap::new(), } } @@ -49,6 +52,7 @@ impl Orchestrator { config: self.config.clone(), job_runs: Default::default(), command_rx: None, + event_tx: None, job_environments: HashMap::new(), } } @@ -133,6 +137,7 @@ impl Orchestrator { config, job_runs: Vec::new(), command_rx: None, + event_tx: None, job_environments: HashMap::new(), } } @@ -147,16 +152,44 @@ impl Orchestrator { config, job_runs: Vec::new(), command_rx: Some(command_rx), + event_tx: None, job_environments: HashMap::new(), } } + pub fn new_with_channels( + storage: S, + config: OrchestratorConfig, + command_rx: mpsc::Receiver, + event_tx: broadcast::Sender, + ) -> Self { + Self { + bel: BuildEventLog::new(storage, Default::default()), + config, + job_runs: Vec::new(), + command_rx: Some(command_rx), + event_tx: Some(event_tx), + job_environments: HashMap::new(), + } + } + + /// Append event to BEL and broadcast to HTTP server (if configured) + fn append_and_broadcast(&mut self, event: &Event) -> Result { + let idx = self.bel.append_event(event)?; + if let Some(tx) = &self.event_tx { + // Ignore send errors (no receivers is fine) + let _ = tx.send(event.clone()); + } + Ok(idx) + } + fn schedule_queued_jobs(&mut self) -> Result<(), DatabuildError> { use crate::JobRunHeartbeatEventV1; use crate::data_build_event::Event; use crate::job_run::JobRunHandle; let mut new_jobs = Vec::new(); + let mut events_to_append = Vec::new(); for job in self.job_runs.drain(..) { let transitioned = match job { JobRunHandle::NotStarted(not_started) => { @@ -165,11 +198,11 @@ impl Orchestrator { let env = self.job_environments.get(&job_run_id).cloned(); let running = not_started.run(env)?; - // Emit heartbeat event to notify BuildState that job is now running + // Collect heartbeat event to emit after drain completes let heartbeat_event = Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 { job_run_id: job_run_id.to_string(), }); - self.bel.append_event(&heartbeat_event)?; + events_to_append.push(heartbeat_event); JobRunHandle::Running(running) } @@ -178,12 +211,19 @@ impl Orchestrator { new_jobs.push(transitioned); } self.job_runs = new_jobs; + + // Now append all collected events + for event in events_to_append { + self.append_and_broadcast(&event)?; + } + Ok(()) } /// Visits individual job runs, appending resulting events, and moving runs between run status /// containers. Either jobs are still running, or they are moved to terminal states. fn poll_job_runs(&mut self) -> Result<(), DatabuildError> { + use crate::data_build_event::Event; use crate::job_run::{JobRunHandle, VisitResult}; self.schedule_queued_jobs()?; @@ -194,7 +234,9 @@ impl Orchestrator { // Need to decide on heartbeat frequency (e.g., every N polls or based on time elapsed) // Visit all running jobs using type-safe transitions + // Collect events first to avoid borrow checker issues let mut new_jobs = Vec::new(); + let mut events_to_append = Vec::new(); for job in self.job_runs.drain(..) { let transitioned = match job { JobRunHandle::Running(running) => match running.visit()? { @@ -205,19 +247,19 @@ impl Orchestrator { VisitResult::Completed(completed) => { println!("Completed job: {:?}", completed.job_run_id); let event = completed.state.to_event(&completed.job_run_id); - self.bel.append_event(&event)?; + events_to_append.push(event); JobRunHandle::Completed(completed) } VisitResult::Failed(failed) => { println!("Failed job: {:?}", failed.job_run_id); let event = failed.state.to_event(&failed.job_run_id); - self.bel.append_event(&event)?; + events_to_append.push(event); JobRunHandle::Failed(failed) } VisitResult::DepMiss(dep_miss) => { println!("Dep miss job: {:?}", dep_miss.job_run_id); let event = dep_miss.state.to_event(&dep_miss.job_run_id); - self.bel.append_event(&event)?; + events_to_append.push(event); JobRunHandle::DepMiss(dep_miss) } }, @@ -227,6 +269,11 @@ impl Orchestrator { } self.job_runs = new_jobs; + // Now append all collected events + for event in events_to_append { + self.append_and_broadcast(&event)?; + } + Ok(()) } @@ -302,7 +349,7 @@ impl Orchestrator { .collect(), want_attributed_partitions: wg.wants.iter().map(|w| w.into()).collect(), }); - self.bel.append_event(&job_buffer_event)?; + self.append_and_broadcast(&job_buffer_event)?; self.job_runs.push(job_run); Ok(()) diff --git a/databuild/server_main.rs b/databuild/server_main.rs deleted file mode 100644 index 7f93dfb..0000000 --- a/databuild/server_main.rs +++ /dev/null @@ -1,112 +0,0 @@ -use databuild::build_event_log::SqliteBELStorage; -use databuild::http_server::{create_router, AppState}; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::{broadcast, mpsc}; - -#[tokio::main] -async fn main() { - use databuild::orchestrator::{Orchestrator, OrchestratorConfig}; - - // Initialize logging - tracing_subscriber::fmt::init(); - - let database = ":memory:"; - - // Create SQLite BEL storage (shared between orchestrator and HTTP server) - let bel_storage = Arc::new( - SqliteBELStorage::create(database).expect("Failed to create BEL storage"), - ); - - // Create command channel for orchestrator communication - let (command_tx, command_rx) = mpsc::channel(100); - - // Create shutdown broadcast channel - let (shutdown_tx, _shutdown_rx) = broadcast::channel(1); - - // Spawn orchestrator in background thread - let orch_bel_storage = SqliteBELStorage::create(database).expect("Failed to create BEL storage"); - let orch_shutdown_rx = shutdown_tx.subscribe(); - let orch_handle = std::thread::spawn(move || { - // Create orchestrator with command channel - let config = OrchestratorConfig::default(); - let mut orchestrator = Orchestrator::new_with_commands(orch_bel_storage, config, command_rx); - let mut shutdown_rx = orch_shutdown_rx; - - // Run orchestrator loop - loop { - // Check for shutdown signal - if shutdown_rx.try_recv().is_ok() { - println!("Orchestrator received shutdown signal"); - break; - } - - if let Err(e) = orchestrator.step() { - eprintln!("Orchestrator error: {}", e); - } - // Small sleep to avoid busy-waiting - std::thread::sleep(std::time::Duration::from_millis(10)); - } - }); - - // Create app state with shared storage, command sender, and shutdown channel - let state = AppState::new(bel_storage, command_tx, shutdown_tx.clone()); - - // Spawn idle timeout checker task - let idle_state = state.clone(); - let idle_shutdown_tx = shutdown_tx.clone(); - tokio::spawn(async move { - let idle_timeout = Duration::from_secs(3 * 60 * 60); // 3 hours - - loop { - tokio::time::sleep(Duration::from_secs(60)).await; - - let last_request = idle_state.last_request_time.load(std::sync::atomic::Ordering::Relaxed); - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - - if now - last_request > idle_timeout.as_millis() as u64 { - eprintln!( - "Server idle for {} hours, shutting down", - idle_timeout.as_secs() / 3600 - ); - let _ = idle_shutdown_tx.send(()); - break; - } - } - }); - - // Create router - let app = create_router(state); - - // Bind to port 3000 - let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") - .await - .expect("Failed to bind to port 3000"); - - println!("DataBuild server listening on http://127.0.0.1:3000"); - println!(" GET /health"); - println!(" GET /api/wants"); - println!(" POST /api/wants"); - println!(" GET /api/wants/:id"); - println!(" GET /api/partitions"); - println!(" GET /api/job_runs"); - - // Subscribe to shutdown signal for graceful shutdown - let mut server_shutdown_rx = shutdown_tx.subscribe(); - - // Run the server with graceful shutdown - axum::serve(listener, app) - .with_graceful_shutdown(async move { - let _ = server_shutdown_rx.recv().await; - println!("HTTP server received shutdown signal"); - }) - .await - .expect("Server error"); - - // Wait for orchestrator to finish - let _ = orch_handle.join(); - println!("Shutdown complete"); -}