server refactor

This commit is contained in:
Stuart Axelbrooke 2025-11-23 10:39:33 +08:00
parent 7134b5e480
commit f71be8482f
6 changed files with 93 additions and 152 deletions

View file

@ -18,7 +18,7 @@ rust_binary(
# DataBuild library using generated prost code
rust_library(
name = "databuild",
name = "lib",
srcs = glob(["**/*.rs"]) + [
":generate_databuild_rust",
],
@ -44,34 +44,19 @@ rust_library(
rust_test(
name = "databuild_test",
crate = ":databuild",
crate = ":lib",
data = ["//databuild/test:test_job_helper"],
env = {"RUST_BACKTRACE": "1"},
)
# DataBuild HTTP server binary
rust_binary(
name = "databuild_server",
srcs = ["server_main.rs"],
edition = "2021",
visibility = ["//visibility:public"],
deps = [
":databuild",
"@crates//:axum",
"@crates//:tokio",
"@crates//:tracing",
"@crates//:tracing-subscriber",
],
)
# DataBuild CLI binary
rust_binary(
name = "databuild_cli",
name = "databuild",
srcs = ["cli_main.rs"],
edition = "2021",
visibility = ["//visibility:public"],
deps = [
":databuild",
":lib",
"@crates//:axum",
"@crates//:clap",
"@crates//:reqwest",

View file

@ -1,10 +1,13 @@
use clap::{Parser, Subcommand};
use databuild::build_event_log::SqliteBELStorage;
use databuild::http_server::{create_router, AppState};
use reqwest::blocking::Client;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::sync::{broadcast, mpsc};
use lib::build_event_log::SqliteBELStorage;
use lib::build_state::BuildState;
use lib::config::DatabuildConfig;
use lib::http_server::{create_router, AppState};
use lib::orchestrator::{Orchestrator, OrchestratorConfig};
#[derive(Parser)]
#[command(name = "databuild")]
@ -112,14 +115,13 @@ fn main() {
#[tokio::main]
async fn cmd_serve(port: u16, database: &str, config_path: Option<&str>) {
use databuild::build_event_log::BELStorage;
// Initialize logging
tracing_subscriber::fmt::init();
// Load configuration if provided
let jobs = if let Some(path) = config_path {
match databuild::config::DatabuildConfig::from_file(path) {
match DatabuildConfig::from_file(path) {
Ok(config) => {
println!("Loaded configuration from: {}", path);
println!(" Jobs: {}", config.jobs.len());
@ -142,19 +144,38 @@ async fn cmd_serve(port: u16, database: &str, config_path: Option<&str>) {
// Create command channel for orchestrator communication
let (command_tx, command_rx) = mpsc::channel(100);
// Create event broadcast channel (orchestrator -> HTTP server)
let (event_tx, _event_rx) = broadcast::channel(1000);
// Create shutdown broadcast channel
let (shutdown_tx, _shutdown_rx) = broadcast::channel(1);
// Create shared mirrored build state for HTTP server
let mirrored_state = Arc::new(RwLock::new(BuildState::default()));
// Spawn state-mirror task to keep HTTP server's build state in sync
let mirror_clone = mirrored_state.clone();
let mut mirror_rx = event_tx.subscribe();
tokio::spawn(async move {
while let Ok(event) = mirror_rx.recv().await {
let mut state = mirror_clone.write().unwrap();
state.handle_event(&event);
}
});
// Spawn orchestrator in background thread
// Note: Orchestrator needs its own BEL storage instance for writes
let orch_bel_storage = SqliteBELStorage::create(database).expect("Failed to create BEL storage");
let orch_shutdown_rx = shutdown_tx.subscribe();
let orch_handle = std::thread::spawn(move || {
use databuild::orchestrator::{Orchestrator, OrchestratorConfig};
// Create orchestrator with command channel and jobs from config
// Create orchestrator with both channels and jobs from config
let config = OrchestratorConfig { jobs };
let mut orchestrator = Orchestrator::new_with_commands(orch_bel_storage, config, command_rx);
let mut orchestrator = Orchestrator::new_with_channels(
orch_bel_storage,
config,
command_rx,
event_tx,
);
let mut shutdown_rx = orch_shutdown_rx;
// Run orchestrator loop
@ -173,8 +194,8 @@ async fn cmd_serve(port: u16, database: &str, config_path: Option<&str>) {
}
});
// Create app state with shared storage, command sender, and shutdown channel
let state = AppState::new(bel_storage, command_tx, shutdown_tx.clone());
// Create app state with mirrored state, shared storage, command sender, and shutdown channel
let state = AppState::new(mirrored_state, bel_storage, command_tx, shutdown_tx.clone());
// Spawn idle timeout checker task
let idle_state = state.clone();

View file

@ -25,9 +25,9 @@ use tower_http::cors::CorsLayer;
/// Shared application state for HTTP handlers
#[derive(Clone)]
pub struct AppState {
/// Shared read-only access to build state (maintained by orchestrator)
/// Mirrored build state (updated via event broadcast from orchestrator)
pub build_state: Arc<RwLock<BuildState>>,
/// Shared read-only access to BEL storage (for event log queries)
/// Shared read-only access to BEL storage (for event log queries if needed)
pub bel_storage: Arc<dyn BELStorage>,
/// Command sender for write operations (sends to orchestrator)
pub command_tx: mpsc::Sender<Command>,

View file

@ -1,5 +1,5 @@
pub mod build_event_log;
mod build_state;
pub mod build_state;
pub mod commands;
pub mod config;
mod data_deps;

View file

@ -7,7 +7,7 @@ use crate::util::DatabuildError;
use crate::{JobRunBufferEventV1, PartitionRef, WantDetail};
use std::collections::HashMap;
use std::fmt::Debug;
use tokio::sync::mpsc;
use tokio::sync::{broadcast, mpsc};
use uuid::Uuid;
/**
@ -26,6 +26,8 @@ pub struct Orchestrator<S: BELStorage + Debug> {
pub job_runs: Vec<crate::job_run::JobRunHandle<SubProcessBackend>>,
/// Optional command receiver for write operations from HTTP server
pub command_rx: Option<mpsc::Receiver<Command>>,
/// Optional event broadcaster for state updates to HTTP server
pub event_tx: Option<broadcast::Sender<Event>>,
/// Environment variables for each job run (keyed by job_run_id)
pub job_environments: HashMap<Uuid, HashMap<String, String>>,
}
@ -37,6 +39,7 @@ impl Default for Orchestrator<MemoryBELStorage> {
config: Default::default(),
job_runs: Default::default(),
command_rx: None,
event_tx: None,
job_environments: HashMap::new(),
}
}
@ -49,6 +52,7 @@ impl Orchestrator<MemoryBELStorage> {
config: self.config.clone(),
job_runs: Default::default(),
command_rx: None,
event_tx: None,
job_environments: HashMap::new(),
}
}
@ -133,6 +137,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
config,
job_runs: Vec::new(),
command_rx: None,
event_tx: None,
job_environments: HashMap::new(),
}
}
@ -147,16 +152,44 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
config,
job_runs: Vec::new(),
command_rx: Some(command_rx),
event_tx: None,
job_environments: HashMap::new(),
}
}
pub fn new_with_channels(
storage: S,
config: OrchestratorConfig,
command_rx: mpsc::Receiver<Command>,
event_tx: broadcast::Sender<Event>,
) -> Self {
Self {
bel: BuildEventLog::new(storage, Default::default()),
config,
job_runs: Vec::new(),
command_rx: Some(command_rx),
event_tx: Some(event_tx),
job_environments: HashMap::new(),
}
}
/// Append event to BEL and broadcast to HTTP server (if configured)
fn append_and_broadcast(&mut self, event: &Event) -> Result<u64, DatabuildError> {
let idx = self.bel.append_event(event)?;
if let Some(tx) = &self.event_tx {
// Ignore send errors (no receivers is fine)
let _ = tx.send(event.clone());
}
Ok(idx)
}
fn schedule_queued_jobs(&mut self) -> Result<(), DatabuildError> {
use crate::JobRunHeartbeatEventV1;
use crate::data_build_event::Event;
use crate::job_run::JobRunHandle;
let mut new_jobs = Vec::new();
let mut events_to_append = Vec::new();
for job in self.job_runs.drain(..) {
let transitioned = match job {
JobRunHandle::NotStarted(not_started) => {
@ -165,11 +198,11 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
let env = self.job_environments.get(&job_run_id).cloned();
let running = not_started.run(env)?;
// Emit heartbeat event to notify BuildState that job is now running
// Collect heartbeat event to emit after drain completes
let heartbeat_event = Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 {
job_run_id: job_run_id.to_string(),
});
self.bel.append_event(&heartbeat_event)?;
events_to_append.push(heartbeat_event);
JobRunHandle::Running(running)
}
@ -178,12 +211,19 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
new_jobs.push(transitioned);
}
self.job_runs = new_jobs;
// Now append all collected events
for event in events_to_append {
self.append_and_broadcast(&event)?;
}
Ok(())
}
/// Visits individual job runs, appending resulting events, and moving runs between run status
/// containers. Either jobs are still running, or they are moved to terminal states.
fn poll_job_runs(&mut self) -> Result<(), DatabuildError> {
use crate::data_build_event::Event;
use crate::job_run::{JobRunHandle, VisitResult};
self.schedule_queued_jobs()?;
@ -194,7 +234,9 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
// Need to decide on heartbeat frequency (e.g., every N polls or based on time elapsed)
// Visit all running jobs using type-safe transitions
// Collect events first to avoid borrow checker issues
let mut new_jobs = Vec::new();
let mut events_to_append = Vec::new();
for job in self.job_runs.drain(..) {
let transitioned = match job {
JobRunHandle::Running(running) => match running.visit()? {
@ -205,19 +247,19 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
VisitResult::Completed(completed) => {
println!("Completed job: {:?}", completed.job_run_id);
let event = completed.state.to_event(&completed.job_run_id);
self.bel.append_event(&event)?;
events_to_append.push(event);
JobRunHandle::Completed(completed)
}
VisitResult::Failed(failed) => {
println!("Failed job: {:?}", failed.job_run_id);
let event = failed.state.to_event(&failed.job_run_id);
self.bel.append_event(&event)?;
events_to_append.push(event);
JobRunHandle::Failed(failed)
}
VisitResult::DepMiss(dep_miss) => {
println!("Dep miss job: {:?}", dep_miss.job_run_id);
let event = dep_miss.state.to_event(&dep_miss.job_run_id);
self.bel.append_event(&event)?;
events_to_append.push(event);
JobRunHandle::DepMiss(dep_miss)
}
},
@ -227,6 +269,11 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
}
self.job_runs = new_jobs;
// Now append all collected events
for event in events_to_append {
self.append_and_broadcast(&event)?;
}
Ok(())
}
@ -302,7 +349,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
.collect(),
want_attributed_partitions: wg.wants.iter().map(|w| w.into()).collect(),
});
self.bel.append_event(&job_buffer_event)?;
self.append_and_broadcast(&job_buffer_event)?;
self.job_runs.push(job_run);
Ok(())

View file

@ -1,112 +0,0 @@
use databuild::build_event_log::SqliteBELStorage;
use databuild::http_server::{create_router, AppState};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, mpsc};
#[tokio::main]
async fn main() {
use databuild::orchestrator::{Orchestrator, OrchestratorConfig};
// Initialize logging
tracing_subscriber::fmt::init();
let database = ":memory:";
// Create SQLite BEL storage (shared between orchestrator and HTTP server)
let bel_storage = Arc::new(
SqliteBELStorage::create(database).expect("Failed to create BEL storage"),
);
// Create command channel for orchestrator communication
let (command_tx, command_rx) = mpsc::channel(100);
// Create shutdown broadcast channel
let (shutdown_tx, _shutdown_rx) = broadcast::channel(1);
// Spawn orchestrator in background thread
let orch_bel_storage = SqliteBELStorage::create(database).expect("Failed to create BEL storage");
let orch_shutdown_rx = shutdown_tx.subscribe();
let orch_handle = std::thread::spawn(move || {
// Create orchestrator with command channel
let config = OrchestratorConfig::default();
let mut orchestrator = Orchestrator::new_with_commands(orch_bel_storage, config, command_rx);
let mut shutdown_rx = orch_shutdown_rx;
// Run orchestrator loop
loop {
// Check for shutdown signal
if shutdown_rx.try_recv().is_ok() {
println!("Orchestrator received shutdown signal");
break;
}
if let Err(e) = orchestrator.step() {
eprintln!("Orchestrator error: {}", e);
}
// Small sleep to avoid busy-waiting
std::thread::sleep(std::time::Duration::from_millis(10));
}
});
// Create app state with shared storage, command sender, and shutdown channel
let state = AppState::new(bel_storage, command_tx, shutdown_tx.clone());
// Spawn idle timeout checker task
let idle_state = state.clone();
let idle_shutdown_tx = shutdown_tx.clone();
tokio::spawn(async move {
let idle_timeout = Duration::from_secs(3 * 60 * 60); // 3 hours
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
let last_request = idle_state.last_request_time.load(std::sync::atomic::Ordering::Relaxed);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
if now - last_request > idle_timeout.as_millis() as u64 {
eprintln!(
"Server idle for {} hours, shutting down",
idle_timeout.as_secs() / 3600
);
let _ = idle_shutdown_tx.send(());
break;
}
}
});
// Create router
let app = create_router(state);
// Bind to port 3000
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
.await
.expect("Failed to bind to port 3000");
println!("DataBuild server listening on http://127.0.0.1:3000");
println!(" GET /health");
println!(" GET /api/wants");
println!(" POST /api/wants");
println!(" GET /api/wants/:id");
println!(" GET /api/partitions");
println!(" GET /api/job_runs");
// Subscribe to shutdown signal for graceful shutdown
let mut server_shutdown_rx = shutdown_tx.subscribe();
// Run the server with graceful shutdown
axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = server_shutdown_rx.recv().await;
println!("HTTP server received shutdown signal");
})
.await
.expect("Server error");
// Wait for orchestrator to finish
let _ = orch_handle.join();
println!("Shutdown complete");
}