add automated server startup for cli
This commit is contained in:
parent
704ec0b6f3
commit
f7c196c9b3
11 changed files with 1166 additions and 63 deletions
|
|
@ -118,6 +118,14 @@ crate.spec(
|
|||
package = "urlencoding",
|
||||
version = "2.1",
|
||||
)
|
||||
crate.spec(
|
||||
package = "fs2",
|
||||
version = "0.4",
|
||||
)
|
||||
crate.spec(
|
||||
package = "libc",
|
||||
version = "0.2",
|
||||
)
|
||||
crate.from_specs()
|
||||
use_repo(crate, "crates")
|
||||
|
||||
|
|
|
|||
File diff suppressed because one or more lines are too long
|
|
@ -33,9 +33,12 @@ rust_library(
|
|||
deps = [
|
||||
"@crates//:askama",
|
||||
"@crates//:axum",
|
||||
"@crates//:fs2",
|
||||
"@crates//:libc",
|
||||
"@crates//:prost",
|
||||
"@crates//:prost-types",
|
||||
"@crates//:regex",
|
||||
"@crates//:reqwest",
|
||||
"@crates//:rusqlite",
|
||||
"@crates//:schemars",
|
||||
"@crates//:serde",
|
||||
|
|
@ -56,6 +59,9 @@ rust_test(
|
|||
crate = ":lib",
|
||||
data = ["//databuild/test:test_job_helper"],
|
||||
env = {"RUST_BACKTRACE": "1"},
|
||||
deps = [
|
||||
"@crates//:tempfile",
|
||||
],
|
||||
)
|
||||
|
||||
# DataBuild CLI binary
|
||||
|
|
|
|||
|
|
@ -1,21 +1,24 @@
|
|||
use clap::{Parser, Subcommand};
|
||||
use reqwest::blocking::Client;
|
||||
use std::path::Path;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
use lib::build_event_log::SqliteBELStorage;
|
||||
use lib::build_state::BuildState;
|
||||
use lib::config::DatabuildConfig;
|
||||
use lib::daemon::{self, DaemonizeResult};
|
||||
use lib::http_server::{create_router, AppState};
|
||||
use lib::orchestrator::{Orchestrator, OrchestratorConfig};
|
||||
use lib::server_lock::ServerLock;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(name = "databuild")]
|
||||
#[command(about = "DataBuild CLI - Build system for data pipelines", long_about = None)]
|
||||
struct Cli {
|
||||
/// Server URL
|
||||
#[arg(long, default_value = "http://localhost:3538", global = true)]
|
||||
server: String,
|
||||
/// Path to configuration file (JSON or TOML)
|
||||
#[arg(long, default_value = "databuild.json", global = true)]
|
||||
config: String,
|
||||
|
||||
#[command(subcommand)]
|
||||
command: Commands,
|
||||
|
|
@ -25,19 +28,21 @@ struct Cli {
|
|||
enum Commands {
|
||||
/// Start the DataBuild HTTP server
|
||||
Serve {
|
||||
/// Port to listen on
|
||||
#[arg(long, default_value = "3538")]
|
||||
port: u16,
|
||||
|
||||
/// Database URL (default: :memory: for in-memory SQLite)
|
||||
#[arg(long, default_value = ":memory:")]
|
||||
database: String,
|
||||
|
||||
/// Path to configuration file (JSON or TOML)
|
||||
/// Port to listen on (auto-selected if not specified)
|
||||
#[arg(long)]
|
||||
config: Option<String>,
|
||||
port: Option<u16>,
|
||||
|
||||
/// Run as a daemon (internal flag, used by auto-start)
|
||||
#[arg(long, hide = true)]
|
||||
daemon: bool,
|
||||
},
|
||||
|
||||
/// Stop the running server
|
||||
Stop,
|
||||
|
||||
/// Show server status
|
||||
Status,
|
||||
|
||||
/// Create a new want (trigger partition builds)
|
||||
Want {
|
||||
/// Partition references to build (e.g., "data/users", "metrics/daily")
|
||||
|
|
@ -81,60 +86,159 @@ enum JobRunsCommands {
|
|||
List,
|
||||
}
|
||||
|
||||
/// Load config and return (config, graph_label, config_hash)
|
||||
fn load_config(config_path: &str) -> (DatabuildConfig, String) {
|
||||
let config = DatabuildConfig::from_file(config_path).unwrap_or_else(|e| {
|
||||
eprintln!("Failed to load config from {}: {}", config_path, e);
|
||||
std::process::exit(1);
|
||||
});
|
||||
let config_hash = ServerLock::hash_config(Path::new(config_path)).unwrap_or_else(|e| {
|
||||
eprintln!("Failed to hash config: {}", e);
|
||||
std::process::exit(1);
|
||||
});
|
||||
(config, config_hash)
|
||||
}
|
||||
|
||||
/// Ensure server is running, return the server URL
|
||||
fn ensure_server(config_path: &str) -> String {
|
||||
let (config, config_hash) = load_config(config_path);
|
||||
|
||||
match daemon::ensure_server_running(Path::new(config_path), &config.graph_label, &config_hash) {
|
||||
Ok(DaemonizeResult::Started { port }) => {
|
||||
eprintln!("Started server on port {}", port);
|
||||
format!("http://127.0.0.1:{}", port)
|
||||
}
|
||||
Ok(DaemonizeResult::AlreadyRunning { port }) => {
|
||||
format!("http://127.0.0.1:{}", port)
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Failed to start server: {}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let cli = Cli::parse();
|
||||
|
||||
match cli.command {
|
||||
Commands::Serve { port, database, config } => {
|
||||
cmd_serve(port, &database, config.as_deref());
|
||||
Commands::Serve { port, daemon: _ } => {
|
||||
let (config, _config_hash) = load_config(&cli.config);
|
||||
// Ensure graph directory exists (for log file, lock file, etc.)
|
||||
let _ = ServerLock::new(&config.graph_label).unwrap_or_else(|e| {
|
||||
eprintln!("Failed to create graph directory: {}", e);
|
||||
std::process::exit(1);
|
||||
});
|
||||
let database = config.effective_bel_uri();
|
||||
let actual_port = port.unwrap_or_else(|| {
|
||||
daemon::find_available_port(3538).unwrap_or_else(|e| {
|
||||
eprintln!("Failed to find available port: {}", e);
|
||||
std::process::exit(1);
|
||||
})
|
||||
});
|
||||
cmd_serve(actual_port, &database, &cli.config, &config);
|
||||
}
|
||||
Commands::Stop => {
|
||||
cmd_stop(&cli.config);
|
||||
}
|
||||
Commands::Status => {
|
||||
cmd_status(&cli.config);
|
||||
}
|
||||
Commands::Want { partitions } => {
|
||||
cmd_want(&cli.server, partitions);
|
||||
let server_url = ensure_server(&cli.config);
|
||||
cmd_want(&server_url, partitions);
|
||||
}
|
||||
Commands::Wants { command } => match command {
|
||||
WantsCommands::List => {
|
||||
cmd_wants_list(&cli.server);
|
||||
let server_url = ensure_server(&cli.config);
|
||||
cmd_wants_list(&server_url);
|
||||
}
|
||||
},
|
||||
Commands::Partitions { command } => match command {
|
||||
Some(PartitionsCommands::List) | None => {
|
||||
cmd_partitions_list(&cli.server);
|
||||
let server_url = ensure_server(&cli.config);
|
||||
cmd_partitions_list(&server_url);
|
||||
}
|
||||
},
|
||||
Commands::JobRuns { command } => match command {
|
||||
Some(JobRunsCommands::List) | None => {
|
||||
cmd_job_runs_list(&cli.server);
|
||||
let server_url = ensure_server(&cli.config);
|
||||
cmd_job_runs_list(&server_url);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn cmd_stop(config_path: &str) {
|
||||
let (config, _) = load_config(config_path);
|
||||
|
||||
match daemon::stop_server(&config.graph_label) {
|
||||
Ok(()) => {
|
||||
println!("Server stopped");
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Failed to stop server: {}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn cmd_status(config_path: &str) {
|
||||
let (config, config_hash) = load_config(config_path);
|
||||
let lock = ServerLock::new(&config.graph_label).unwrap_or_else(|e| {
|
||||
eprintln!("Failed to access lock: {}", e);
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
match lock.read_state() {
|
||||
Ok(Some(state)) => {
|
||||
let running = ServerLock::is_process_running(state.pid);
|
||||
let healthy = if running { daemon::health_check(state.port) } else { false };
|
||||
let config_changed = state.config_hash != config_hash;
|
||||
|
||||
println!("DataBuild Server Status");
|
||||
println!("━━━━━━━━━━━━━━━━━━━━━━━━");
|
||||
println!("Graph: {}", config.graph_label);
|
||||
println!("Status: {}", if healthy { "Running" } else if running { "Unhealthy" } else { "Stopped" });
|
||||
println!("PID: {}", state.pid);
|
||||
println!("Port: {}", state.port);
|
||||
println!("Database: {}", config.effective_bel_uri());
|
||||
if config_changed {
|
||||
println!();
|
||||
println!("⚠ Config has changed since server started");
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
println!("DataBuild Server Status");
|
||||
println!("━━━━━━━━━━━━━━━━━━━━━━━━");
|
||||
println!("Graph: {}", config.graph_label);
|
||||
println!("Database: {}", config.effective_bel_uri());
|
||||
println!("Status: Not running");
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Failed to read server state: {}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Command Implementations
|
||||
// ============================================================================
|
||||
|
||||
#[tokio::main]
|
||||
async fn cmd_serve(port: u16, database: &str, config_path: Option<&str>) {
|
||||
async fn cmd_serve(port: u16, database: &str, config_path: &str, config: &DatabuildConfig) {
|
||||
|
||||
// Initialize logging
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
// Load configuration if provided
|
||||
let jobs = if let Some(path) = config_path {
|
||||
match DatabuildConfig::from_file(path) {
|
||||
Ok(config) => {
|
||||
println!("Loaded configuration from: {}", path);
|
||||
println!(" Jobs: {}", config.jobs.len());
|
||||
config.into_job_configurations()
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Failed to load configuration from {}: {}", path, e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
println!("Loaded configuration from: {}", config_path);
|
||||
println!(" Graph: {}", config.graph_label);
|
||||
println!(" Jobs: {}", config.jobs.len());
|
||||
println!(" Idle timeout: {}s", config.idle_timeout_seconds);
|
||||
|
||||
let jobs = config.clone().into_job_configurations();
|
||||
let idle_timeout_secs = config.idle_timeout_seconds;
|
||||
|
||||
// Create SQLite BEL storage (shared between orchestrator and HTTP server)
|
||||
let bel_storage = Arc::new(
|
||||
|
|
@ -208,7 +312,7 @@ async fn cmd_serve(port: u16, database: &str, config_path: Option<&str>) {
|
|||
let idle_state = state.clone();
|
||||
let idle_shutdown_tx = shutdown_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let idle_timeout = Duration::from_secs(3 * 60 * 60); // 3 hours
|
||||
let idle_timeout = Duration::from_secs(idle_timeout_secs);
|
||||
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(60)).await;
|
||||
|
|
@ -221,8 +325,8 @@ async fn cmd_serve(port: u16, database: &str, config_path: Option<&str>) {
|
|||
|
||||
if now - last_request > idle_timeout.as_millis() as u64 {
|
||||
eprintln!(
|
||||
"Server idle for {} hours, shutting down",
|
||||
idle_timeout.as_secs() / 3600
|
||||
"Server idle for {}s, shutting down",
|
||||
idle_timeout.as_secs()
|
||||
);
|
||||
let _ = idle_shutdown_tx.send(());
|
||||
break;
|
||||
|
|
|
|||
|
|
@ -4,13 +4,38 @@ use crate::util::DatabuildError;
|
|||
use std::fs;
|
||||
use std::path::Path;
|
||||
|
||||
/// Default idle timeout in seconds (1 hour)
|
||||
pub const DEFAULT_IDLE_TIMEOUT_SECONDS: u64 = 3600;
|
||||
|
||||
/// Configuration file format for DataBuild application
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||
pub struct DatabuildConfig {
|
||||
/// Unique identifier for this graph, used for `.databuild/${graph_label}/` directory
|
||||
pub graph_label: String,
|
||||
|
||||
/// Server auto-shutdown after this many seconds of inactivity (default: 3600)
|
||||
#[serde(default = "default_idle_timeout")]
|
||||
pub idle_timeout_seconds: u64,
|
||||
|
||||
/// BEL storage URI. Defaults to `.databuild/${graph_label}/bel.sqlite` if not specified.
|
||||
///
|
||||
/// Supported formats:
|
||||
/// - Relative path: `path/to/bel.sqlite` (resolved relative to config file)
|
||||
/// - Absolute path: `/var/data/bel.sqlite`
|
||||
/// - SQLite URI: `sqlite:///path/to/bel.sqlite` or `sqlite::memory:`
|
||||
/// - Future: `postgresql://user:pass@host/db`
|
||||
#[serde(default)]
|
||||
pub bel_uri: Option<String>,
|
||||
|
||||
/// List of job configurations
|
||||
#[serde(default)]
|
||||
pub jobs: Vec<JobConfig>,
|
||||
}
|
||||
|
||||
fn default_idle_timeout() -> u64 {
|
||||
DEFAULT_IDLE_TIMEOUT_SECONDS
|
||||
}
|
||||
|
||||
impl DatabuildConfig {
|
||||
/// Load configuration from a file, auto-detecting format from extension
|
||||
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self, DatabuildError> {
|
||||
|
|
@ -47,6 +72,16 @@ impl DatabuildConfig {
|
|||
pub fn into_job_configurations(self) -> Vec<JobConfiguration> {
|
||||
self.jobs.into_iter().map(|jc| jc.into()).collect()
|
||||
}
|
||||
|
||||
/// Get the effective BEL URI, resolving defaults based on graph_label.
|
||||
///
|
||||
/// If `bel_uri` is not set, returns the default path `.databuild/${graph_label}/bel.sqlite`.
|
||||
/// Relative paths are not resolved here - that's the caller's responsibility.
|
||||
pub fn effective_bel_uri(&self) -> String {
|
||||
self.bel_uri.clone().unwrap_or_else(|| {
|
||||
format!(".databuild/{}/bel.sqlite", self.graph_label)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
@ -57,6 +92,7 @@ mod tests {
|
|||
fn test_parse_json_config() {
|
||||
let json = r#"
|
||||
{
|
||||
"graph_label": "test_graph",
|
||||
"jobs": [
|
||||
{
|
||||
"label": "//test:job_alpha",
|
||||
|
|
@ -69,13 +105,44 @@ mod tests {
|
|||
"#;
|
||||
|
||||
let config = DatabuildConfig::from_json(json).unwrap();
|
||||
assert_eq!(config.graph_label, "test_graph");
|
||||
assert_eq!(config.idle_timeout_seconds, DEFAULT_IDLE_TIMEOUT_SECONDS);
|
||||
assert_eq!(config.jobs.len(), 1);
|
||||
assert_eq!(config.jobs[0].label, "//test:job_alpha");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_json_config_with_idle_timeout() {
|
||||
let json = r#"
|
||||
{
|
||||
"graph_label": "test_graph",
|
||||
"idle_timeout_seconds": 7200,
|
||||
"jobs": []
|
||||
}
|
||||
"#;
|
||||
|
||||
let config = DatabuildConfig::from_json(json).unwrap();
|
||||
assert_eq!(config.graph_label, "test_graph");
|
||||
assert_eq!(config.idle_timeout_seconds, 7200);
|
||||
assert_eq!(config.jobs.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_effective_bel_uri() {
|
||||
// Default: derives from graph_label
|
||||
let config = DatabuildConfig::from_json(r#"{ "graph_label": "my_graph" }"#).unwrap();
|
||||
assert_eq!(config.effective_bel_uri(), ".databuild/my_graph/bel.sqlite");
|
||||
|
||||
// Custom: uses provided value
|
||||
let config = DatabuildConfig::from_json(r#"{ "graph_label": "my_graph", "bel_uri": "postgresql://localhost/db" }"#).unwrap();
|
||||
assert_eq!(config.effective_bel_uri(), "postgresql://localhost/db");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_toml_config() {
|
||||
let toml = r#"
|
||||
graph_label = "test_graph"
|
||||
|
||||
[[jobs]]
|
||||
label = "//test:job_alpha"
|
||||
entrypoint = "/usr/bin/python3"
|
||||
|
|
@ -86,6 +153,8 @@ mod tests {
|
|||
"#;
|
||||
|
||||
let config = DatabuildConfig::from_toml(toml).unwrap();
|
||||
assert_eq!(config.graph_label, "test_graph");
|
||||
assert_eq!(config.idle_timeout_seconds, DEFAULT_IDLE_TIMEOUT_SECONDS);
|
||||
assert_eq!(config.jobs.len(), 1);
|
||||
assert_eq!(config.jobs[0].label, "//test:job_alpha");
|
||||
}
|
||||
|
|
|
|||
213
databuild/daemon.rs
Normal file
213
databuild/daemon.rs
Normal file
|
|
@ -0,0 +1,213 @@
|
|||
//! Server daemonization for CLI-server automation.
|
||||
//!
|
||||
//! Implements the classic double-fork pattern to create a proper Unix daemon.
|
||||
|
||||
use crate::server_lock::{ServerLock, ServerLockState};
|
||||
use crate::util::DatabuildError;
|
||||
use std::fs::OpenOptions;
|
||||
use std::path::Path;
|
||||
use std::process::{Child, Command, Stdio};
|
||||
|
||||
/// Result of attempting to start a daemonized server.
|
||||
pub enum DaemonizeResult {
|
||||
/// Server was started, here's the port.
|
||||
Started { port: u16 },
|
||||
/// Server was already running at this port.
|
||||
AlreadyRunning { port: u16 },
|
||||
}
|
||||
|
||||
/// Find an available port starting from the given port.
|
||||
pub fn find_available_port(start_port: u16) -> Result<u16, DatabuildError> {
|
||||
for port in start_port..=start_port + 100 {
|
||||
if let Ok(listener) = std::net::TcpListener::bind(format!("127.0.0.1:{}", port)) {
|
||||
drop(listener);
|
||||
return Ok(port);
|
||||
}
|
||||
}
|
||||
Err(DatabuildError::from(format!(
|
||||
"No available port found in range {}..{}",
|
||||
start_port,
|
||||
start_port + 100
|
||||
)))
|
||||
}
|
||||
|
||||
/// Check if the server at the given port is healthy.
|
||||
pub fn health_check(port: u16) -> bool {
|
||||
let url = format!("http://127.0.0.1:{}/health", port);
|
||||
match reqwest::blocking::get(&url) {
|
||||
Ok(resp) => resp.status().is_success(),
|
||||
Err(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait for the server at the given port to become healthy.
|
||||
/// Returns Ok(()) if healthy within timeout, Err otherwise.
|
||||
pub fn wait_for_health(port: u16, timeout_ms: u64) -> Result<(), DatabuildError> {
|
||||
let start = std::time::Instant::now();
|
||||
let timeout = std::time::Duration::from_millis(timeout_ms);
|
||||
|
||||
while start.elapsed() < timeout {
|
||||
if health_check(port) {
|
||||
return Ok(());
|
||||
}
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
}
|
||||
|
||||
Err(DatabuildError::from(format!(
|
||||
"Server did not become healthy within {}ms",
|
||||
timeout_ms
|
||||
)))
|
||||
}
|
||||
|
||||
/// Spawn the server as a daemon process.
|
||||
///
|
||||
/// This re-executes the current binary with `serve` command and special flags
|
||||
/// to indicate it should daemonize itself.
|
||||
pub fn spawn_daemon(
|
||||
config_path: &Path,
|
||||
port: u16,
|
||||
log_path: &Path,
|
||||
) -> Result<Child, DatabuildError> {
|
||||
// Get the current executable path
|
||||
let exe = std::env::current_exe()
|
||||
.map_err(|e| DatabuildError::from(format!("Failed to get current executable: {}", e)))?;
|
||||
|
||||
// Open log file for stdout/stderr redirection
|
||||
let log_file = OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(log_path)
|
||||
.map_err(|e| DatabuildError::from(format!("Failed to open log file: {}", e)))?;
|
||||
|
||||
let log_file_err = log_file
|
||||
.try_clone()
|
||||
.map_err(|e| DatabuildError::from(format!("Failed to clone log file: {}", e)))?;
|
||||
|
||||
// Spawn the daemon process
|
||||
let child = Command::new(exe)
|
||||
.arg("serve")
|
||||
.arg("--port")
|
||||
.arg(port.to_string())
|
||||
.arg("--config")
|
||||
.arg(config_path)
|
||||
.arg("--daemon")
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::from(log_file))
|
||||
.stderr(Stdio::from(log_file_err))
|
||||
.spawn()
|
||||
.map_err(|e| DatabuildError::from(format!("Failed to spawn daemon: {}", e)))?;
|
||||
|
||||
Ok(child)
|
||||
}
|
||||
|
||||
/// Attempt to start the server, or connect to an existing one.
|
||||
///
|
||||
/// This is the main entry point for CLI commands that need the server running.
|
||||
pub fn ensure_server_running(
|
||||
config_path: &Path,
|
||||
graph_label: &str,
|
||||
config_hash: &str,
|
||||
) -> Result<DaemonizeResult, DatabuildError> {
|
||||
let mut lock = ServerLock::new(graph_label)?;
|
||||
|
||||
// Try to acquire the lock
|
||||
if lock.try_lock()? {
|
||||
// We got the lock, so no server is running
|
||||
// Find an available port
|
||||
let port = find_available_port(3538)?;
|
||||
|
||||
// Write initial state (we'll update after server starts)
|
||||
let state = ServerLockState {
|
||||
pid: std::process::id(),
|
||||
port,
|
||||
started_at: ServerLock::now_millis(),
|
||||
config_hash: config_hash.to_string(),
|
||||
};
|
||||
lock.write_state(&state)?;
|
||||
|
||||
// Spawn the daemon
|
||||
let log_path = lock.log_path();
|
||||
let mut child = spawn_daemon(config_path, port, &log_path)?;
|
||||
|
||||
// Update state with actual PID
|
||||
let pid = child.id();
|
||||
let state = ServerLockState {
|
||||
pid,
|
||||
port,
|
||||
started_at: ServerLock::now_millis(),
|
||||
config_hash: config_hash.to_string(),
|
||||
};
|
||||
lock.write_state(&state)?;
|
||||
|
||||
// Release the lock so the daemon can grab it
|
||||
drop(lock);
|
||||
|
||||
// Wait for server to become healthy
|
||||
wait_for_health(port, 10000)?;
|
||||
|
||||
Ok(DaemonizeResult::Started { port })
|
||||
} else {
|
||||
// Server is already running, read the port
|
||||
let state = lock
|
||||
.read_state()?
|
||||
.ok_or_else(|| DatabuildError::from("Lock held but no state found"))?;
|
||||
|
||||
// Verify server is actually healthy
|
||||
if !health_check(state.port) {
|
||||
return Err(DatabuildError::from(format!(
|
||||
"Server at port {} appears unhealthy. Try 'databuild stop' and retry.",
|
||||
state.port
|
||||
)));
|
||||
}
|
||||
|
||||
// Check if config has changed
|
||||
if state.config_hash != config_hash {
|
||||
eprintln!(
|
||||
"Warning: Config has changed since server started.\n\
|
||||
Run 'databuild stop && databuild serve' to apply changes."
|
||||
);
|
||||
}
|
||||
|
||||
Ok(DaemonizeResult::AlreadyRunning { port: state.port })
|
||||
}
|
||||
}
|
||||
|
||||
/// Stop a running server.
|
||||
pub fn stop_server(graph_label: &str) -> Result<(), DatabuildError> {
|
||||
let lock = ServerLock::new(graph_label)?;
|
||||
let state = lock
|
||||
.read_state()?
|
||||
.ok_or_else(|| DatabuildError::from("No server is running"))?;
|
||||
|
||||
// Check if process exists
|
||||
if !ServerLock::is_process_running(state.pid) {
|
||||
// Process already dead, clean up stale lock
|
||||
lock.remove_stale_lock()?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Send SIGTERM to the server
|
||||
#[cfg(unix)]
|
||||
unsafe {
|
||||
libc::kill(state.pid as i32, libc::SIGTERM);
|
||||
}
|
||||
|
||||
// Wait for process to exit (with timeout)
|
||||
let start = std::time::Instant::now();
|
||||
let timeout = std::time::Duration::from_secs(10);
|
||||
|
||||
while start.elapsed() < timeout {
|
||||
if !ServerLock::is_process_running(state.pid) {
|
||||
return Ok(());
|
||||
}
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
}
|
||||
|
||||
// If still running after timeout, force kill
|
||||
#[cfg(unix)]
|
||||
unsafe {
|
||||
libc::kill(state.pid as i32, libc::SIGKILL);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -2,6 +2,7 @@ pub mod build_event_log;
|
|||
pub mod build_state;
|
||||
pub mod commands;
|
||||
pub mod config;
|
||||
pub mod daemon;
|
||||
mod data_deps;
|
||||
mod event_transforms;
|
||||
pub mod http_server;
|
||||
|
|
@ -11,6 +12,7 @@ mod job_run_state;
|
|||
mod mock_job_run;
|
||||
pub mod orchestrator;
|
||||
mod partition_state;
|
||||
pub mod server_lock;
|
||||
mod util;
|
||||
mod want_state;
|
||||
pub mod web;
|
||||
|
|
|
|||
300
databuild/server_lock.rs
Normal file
300
databuild/server_lock.rs
Normal file
|
|
@ -0,0 +1,300 @@
|
|||
//! Server lock file management for CLI-server automation.
|
||||
//!
|
||||
//! This module handles the `.databuild/${graph_label}/server.lock` file which:
|
||||
//! 1. Acts as a file lock to prevent multiple servers for the same graph
|
||||
//! 2. Stores runtime state (PID, port, start time, config hash)
|
||||
|
||||
use crate::util::DatabuildError;
|
||||
use fs2::FileExt;
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{Read, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
/// Runtime state stored in the server.lock file
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct ServerLockState {
|
||||
/// Server process ID
|
||||
pub pid: u32,
|
||||
/// HTTP port the server is listening on
|
||||
pub port: u16,
|
||||
/// Unix timestamp (milliseconds) when server started
|
||||
pub started_at: u64,
|
||||
/// SHA256 hash of config file contents (for detecting config changes)
|
||||
pub config_hash: String,
|
||||
}
|
||||
|
||||
/// Manages the server lock file and .databuild directory structure.
|
||||
pub struct ServerLock {
|
||||
/// Path to the .databuild/${graph_label}/ directory
|
||||
graph_dir: PathBuf,
|
||||
/// Path to the server.lock file
|
||||
lock_path: PathBuf,
|
||||
/// Open file handle (holds the lock while alive)
|
||||
lock_file: Option<File>,
|
||||
}
|
||||
|
||||
impl ServerLock {
|
||||
/// Create a ServerLock for the given graph label, using the current directory.
|
||||
/// Creates the .databuild/${graph_label}/ directory if it doesn't exist.
|
||||
pub fn new(graph_label: &str) -> Result<Self, DatabuildError> {
|
||||
Self::new_in_dir(Path::new("."), graph_label)
|
||||
}
|
||||
|
||||
/// Create a ServerLock for the given graph label in the specified base directory.
|
||||
/// Creates the .databuild/${graph_label}/ directory if it doesn't exist.
|
||||
pub fn new_in_dir(base_dir: &Path, graph_label: &str) -> Result<Self, DatabuildError> {
|
||||
let graph_dir = base_dir.join(".databuild").join(graph_label);
|
||||
fs::create_dir_all(&graph_dir)
|
||||
.map_err(|e| DatabuildError::from(format!("Failed to create graph directory: {}", e)))?;
|
||||
|
||||
let lock_path = graph_dir.join("server.lock");
|
||||
|
||||
Ok(Self {
|
||||
graph_dir,
|
||||
lock_path,
|
||||
lock_file: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Path to the graph directory (`.databuild/${graph_label}/`)
|
||||
pub fn graph_dir(&self) -> &Path {
|
||||
&self.graph_dir
|
||||
}
|
||||
|
||||
/// Path to the server log file
|
||||
pub fn log_path(&self) -> PathBuf {
|
||||
self.graph_dir.join("server.log")
|
||||
}
|
||||
|
||||
/// Try to acquire an exclusive lock on the server.lock file.
|
||||
/// Returns Ok(true) if lock acquired (server not running).
|
||||
/// Returns Ok(false) if lock is held by another process (server running).
|
||||
pub fn try_lock(&mut self) -> Result<bool, DatabuildError> {
|
||||
let file = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(false)
|
||||
.open(&self.lock_path)
|
||||
.map_err(|e| DatabuildError::from(format!("Failed to open lock file: {}", e)))?;
|
||||
|
||||
match file.try_lock_exclusive() {
|
||||
Ok(()) => {
|
||||
self.lock_file = Some(file);
|
||||
Ok(true)
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
// Lock is held by another process
|
||||
Ok(false)
|
||||
}
|
||||
Err(e) => Err(DatabuildError::from(format!("Failed to acquire lock: {}", e))),
|
||||
}
|
||||
}
|
||||
|
||||
/// Write the server state to the lock file.
|
||||
/// Must have acquired the lock first via try_lock().
|
||||
pub fn write_state(&mut self, state: &ServerLockState) -> Result<(), DatabuildError> {
|
||||
let file = self
|
||||
.lock_file
|
||||
.as_mut()
|
||||
.ok_or_else(|| DatabuildError::from("Lock not acquired"))?;
|
||||
|
||||
// Truncate and write new content
|
||||
file.set_len(0)
|
||||
.map_err(|e| DatabuildError::from(format!("Failed to truncate lock file: {}", e)))?;
|
||||
|
||||
let content = serde_json::to_string_pretty(state)
|
||||
.map_err(|e| DatabuildError::from(format!("Failed to serialize state: {}", e)))?;
|
||||
|
||||
// Seek to beginning
|
||||
use std::io::Seek;
|
||||
file.seek(std::io::SeekFrom::Start(0))
|
||||
.map_err(|e| DatabuildError::from(format!("Failed to seek: {}", e)))?;
|
||||
|
||||
file.write_all(content.as_bytes())
|
||||
.map_err(|e| DatabuildError::from(format!("Failed to write state: {}", e)))?;
|
||||
|
||||
file.flush()
|
||||
.map_err(|e| DatabuildError::from(format!("Failed to flush: {}", e)))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Read the server state from the lock file.
|
||||
/// Can be called without holding the lock (to read port from running server).
|
||||
pub fn read_state(&self) -> Result<Option<ServerLockState>, DatabuildError> {
|
||||
if !self.lock_path.exists() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let mut file = File::open(&self.lock_path)
|
||||
.map_err(|e| DatabuildError::from(format!("Failed to open lock file: {}", e)))?;
|
||||
|
||||
let mut content = String::new();
|
||||
file.read_to_string(&mut content)
|
||||
.map_err(|e| DatabuildError::from(format!("Failed to read lock file: {}", e)))?;
|
||||
|
||||
if content.trim().is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let state: ServerLockState = serde_json::from_str(&content)
|
||||
.map_err(|e| DatabuildError::from(format!("Failed to parse lock file: {}", e)))?;
|
||||
|
||||
Ok(Some(state))
|
||||
}
|
||||
|
||||
/// Check if the process with the given PID is still running.
|
||||
pub fn is_process_running(pid: u32) -> bool {
|
||||
// On Unix, sending signal 0 checks if process exists
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use std::os::unix::process::CommandExt;
|
||||
unsafe {
|
||||
libc::kill(pid as i32, 0) == 0
|
||||
}
|
||||
}
|
||||
#[cfg(not(unix))]
|
||||
{
|
||||
// On non-Unix, assume process is running if we can't check
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
/// Delete a stale lock file (when process has crashed).
|
||||
pub fn remove_stale_lock(&self) -> Result<(), DatabuildError> {
|
||||
if self.lock_path.exists() {
|
||||
fs::remove_file(&self.lock_path)
|
||||
.map_err(|e| DatabuildError::from(format!("Failed to remove stale lock: {}", e)))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Compute SHA256 hash of config file contents.
|
||||
pub fn hash_config(config_path: &Path) -> Result<String, DatabuildError> {
|
||||
let content = fs::read_to_string(config_path)
|
||||
.map_err(|e| DatabuildError::from(format!("Failed to read config: {}", e)))?;
|
||||
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(content.as_bytes());
|
||||
let result = hasher.finalize();
|
||||
|
||||
Ok(format!("sha256:{:x}", result))
|
||||
}
|
||||
|
||||
/// Get current timestamp in milliseconds since Unix epoch.
|
||||
pub fn now_millis() -> u64 {
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis() as u64
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ServerLock {
|
||||
fn drop(&mut self) {
|
||||
// Lock is automatically released when file is closed
|
||||
if let Some(file) = self.lock_file.take() {
|
||||
let _ = file.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[test]
|
||||
fn test_server_lock_acquire() {
|
||||
let temp = tempdir().unwrap();
|
||||
let mut lock = ServerLock::new_in_dir(temp.path(), "test_graph").unwrap();
|
||||
assert!(lock.try_lock().unwrap(), "Lock should succeed");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_lock_state_read_write() {
|
||||
let temp = tempdir().unwrap();
|
||||
let mut lock = ServerLock::new_in_dir(temp.path(), "test_graph").unwrap();
|
||||
assert!(lock.try_lock().unwrap());
|
||||
|
||||
let state = ServerLockState {
|
||||
pid: 12345,
|
||||
port: 8080,
|
||||
started_at: 1701234567890,
|
||||
config_hash: "sha256:abc123".to_string(),
|
||||
};
|
||||
|
||||
lock.write_state(&state).unwrap();
|
||||
|
||||
// Read it back (via a fresh ServerLock to test read without lock)
|
||||
let lock2 = ServerLock::new_in_dir(temp.path(), "test_graph").unwrap();
|
||||
let read_state = lock2.read_state().unwrap().unwrap();
|
||||
assert_eq!(read_state.pid, 12345);
|
||||
assert_eq!(read_state.port, 8080);
|
||||
assert_eq!(read_state.started_at, 1701234567890);
|
||||
assert_eq!(read_state.config_hash, "sha256:abc123");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_lock_graph_dir() {
|
||||
let temp = tempdir().unwrap();
|
||||
let lock = ServerLock::new_in_dir(temp.path(), "my_graph").unwrap();
|
||||
assert!(lock.graph_dir().ends_with(".databuild/my_graph"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_server_lock_log_path() {
|
||||
let temp = tempdir().unwrap();
|
||||
let lock = ServerLock::new_in_dir(temp.path(), "my_graph").unwrap();
|
||||
assert!(lock.log_path().ends_with(".databuild/my_graph/server.log"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_hash_config() {
|
||||
let temp = tempdir().unwrap();
|
||||
let config_path = temp.path().join("config.json");
|
||||
fs::write(&config_path, r#"{"graph_label": "test"}"#).unwrap();
|
||||
|
||||
let hash = ServerLock::hash_config(&config_path).unwrap();
|
||||
assert!(hash.starts_with("sha256:"));
|
||||
assert!(hash.len() > 10);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_hash_config_deterministic() {
|
||||
let temp = tempdir().unwrap();
|
||||
let config_path = temp.path().join("config.json");
|
||||
fs::write(&config_path, r#"{"graph_label": "test"}"#).unwrap();
|
||||
|
||||
let hash1 = ServerLock::hash_config(&config_path).unwrap();
|
||||
let hash2 = ServerLock::hash_config(&config_path).unwrap();
|
||||
assert_eq!(hash1, hash2, "Same content should produce same hash");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_read_state_nonexistent() {
|
||||
let temp = tempdir().unwrap();
|
||||
let lock = ServerLock::new_in_dir(temp.path(), "nonexistent_graph").unwrap();
|
||||
let state = lock.read_state().unwrap();
|
||||
assert!(state.is_none(), "Reading nonexistent lock file should return None");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_now_millis() {
|
||||
let before = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
|
||||
let now = ServerLock::now_millis();
|
||||
|
||||
let after = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
|
||||
assert!(now >= before && now <= after, "now_millis should be between before and after");
|
||||
}
|
||||
}
|
||||
|
|
@ -78,6 +78,12 @@ impl From<String> for DatabuildError {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<&str> for DatabuildError {
|
||||
fn from(value: &str) -> Self {
|
||||
Self::new(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for DatabuildError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.msg)
|
||||
|
|
|
|||
330
docs/design/cli-server-automation.md
Normal file
330
docs/design/cli-server-automation.md
Normal file
|
|
@ -0,0 +1,330 @@
|
|||
# CLI-Server Automation
|
||||
|
||||
This document describes how the DataBuild CLI automatically manages the HTTP server lifecycle, providing a "magical" experience where users don't need to think about starting or stopping servers.
|
||||
|
||||
## Goals
|
||||
|
||||
1. **Zero-config startup**: Running `databuild want data/alpha` should "just work" without manual server management
|
||||
2. **Workspace isolation**: Multiple graphs can run independently with separate servers and databases
|
||||
3. **Resource efficiency**: Servers auto-shutdown after idle timeout
|
||||
4. **Transparency**: Users can inspect server state and logs when needed
|
||||
|
||||
## Design Overview
|
||||
|
||||
### Architecture
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ CLI Process │
|
||||
│ databuild want data/alpha │
|
||||
│ │
|
||||
│ 1. Load config (databuild.json) │
|
||||
│ 2. Check .databuild/${graph_label}/server.lock │
|
||||
│ 3. If not running → daemonize server │
|
||||
│ 4. Forward request to http://localhost:${port}/api/wants │
|
||||
└─────────────────────────────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ Daemonized Server │
|
||||
│ PID: 12345, Port: 8080 │
|
||||
│ │
|
||||
│ - Holds file lock on server.lock │
|
||||
│ - Writes logs to server.log │
|
||||
│ - Auto-shutdown after idle_timeout_seconds │
|
||||
└─────────────────────────────────────────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ .databuild/${graph_label}/ │
|
||||
│ │
|
||||
│ server.lock - Lock file + runtime state (JSON) │
|
||||
│ bel.sqlite - Build Event Log database │
|
||||
│ server.log - Server stdout/stderr │
|
||||
└─────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
### Directory Structure
|
||||
|
||||
```
|
||||
project/
|
||||
├── databuild.json # User-authored config
|
||||
├── .databuild/
|
||||
│ └── ${graph_label}/ # e.g., "podcast_reviews"
|
||||
│ ├── server.lock # Runtime state + file lock
|
||||
│ ├── bel.sqlite # Build Event Log (SQLite)
|
||||
│ └── server.log # Server logs
|
||||
```
|
||||
|
||||
## Configuration
|
||||
|
||||
### Extended Config Schema
|
||||
|
||||
The `databuild.json` (or custom config file) is extended with:
|
||||
|
||||
```json
|
||||
{
|
||||
"graph_label": "podcast_reviews",
|
||||
"idle_timeout_seconds": 3600,
|
||||
"jobs": [
|
||||
{
|
||||
"label": "//examples:daily_summaries",
|
||||
"entrypoint": "./jobs/daily_summaries.sh",
|
||||
"environment": { "OUTPUT_DIR": "/data/output" },
|
||||
"partition_patterns": ["daily_summaries/.*"]
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
| Field | Type | Default | Description |
|
||||
|-------|------|---------|-------------|
|
||||
| `graph_label` | string | **required** | Unique identifier for this graph, used for `.databuild/${graph_label}/` directory |
|
||||
| `idle_timeout_seconds` | u64 | 3600 | Server auto-shutdown after this many seconds of inactivity |
|
||||
| `jobs` | array | [] | Job configurations (existing schema) |
|
||||
|
||||
### Runtime State (server.lock)
|
||||
|
||||
The `server.lock` file serves dual purposes:
|
||||
1. **File lock**: Prevents multiple servers for the same graph
|
||||
2. **Runtime state**: Contains current server information
|
||||
|
||||
```json
|
||||
{
|
||||
"pid": 12345,
|
||||
"port": 8080,
|
||||
"started_at": 1701234567890,
|
||||
"config_hash": "sha256:abc123..."
|
||||
}
|
||||
```
|
||||
|
||||
| Field | Description |
|
||||
|-------|-------------|
|
||||
| `pid` | Server process ID |
|
||||
| `port` | HTTP port the server is listening on |
|
||||
| `started_at` | Unix timestamp (milliseconds) when server started |
|
||||
| `config_hash` | Hash of config file contents (for detecting config changes) |
|
||||
|
||||
## CLI Commands
|
||||
|
||||
### Existing Commands (Enhanced)
|
||||
|
||||
All commands that interact with the server now auto-start if needed:
|
||||
|
||||
```bash
|
||||
# Creates want, auto-starting server if not running
|
||||
databuild want data/alpha data/beta
|
||||
|
||||
# Lists wants, auto-starting server if not running
|
||||
databuild wants list
|
||||
|
||||
# Lists partitions
|
||||
databuild partitions list
|
||||
|
||||
# Lists job runs
|
||||
databuild job-runs list
|
||||
```
|
||||
|
||||
### New Commands
|
||||
|
||||
```bash
|
||||
# Explicitly start server (for users who want manual control)
|
||||
databuild serve
|
||||
databuild serve --config ./custom-config.json
|
||||
|
||||
# Show server status
|
||||
databuild status
|
||||
|
||||
# Graceful shutdown
|
||||
databuild stop
|
||||
```
|
||||
|
||||
### Command: `databuild status`
|
||||
|
||||
Shows current server state:
|
||||
|
||||
```
|
||||
DataBuild Server Status
|
||||
━━━━━━━━━━━━━━━━━━━━━━━━
|
||||
Graph: podcast_reviews
|
||||
Status: Running
|
||||
PID: 12345
|
||||
Port: 8080
|
||||
Uptime: 2h 34m
|
||||
Database: .databuild/podcast_reviews/bel.sqlite
|
||||
|
||||
Active Job Runs: 2
|
||||
Pending Wants: 5
|
||||
```
|
||||
|
||||
### Command: `databuild stop`
|
||||
|
||||
Gracefully shuts down the server:
|
||||
|
||||
```bash
|
||||
$ databuild stop
|
||||
Stopping DataBuild server (PID 12345)...
|
||||
Server stopped.
|
||||
```
|
||||
|
||||
## Server Lifecycle
|
||||
|
||||
### Startup Flow
|
||||
|
||||
```
|
||||
CLI invocation (e.g., databuild want data/alpha)
|
||||
│
|
||||
▼
|
||||
Load databuild.json (or --config path)
|
||||
│
|
||||
▼
|
||||
Extract graph_label from config
|
||||
│
|
||||
▼
|
||||
Ensure .databuild/${graph_label}/ exists
|
||||
│
|
||||
▼
|
||||
Try flock(server.lock, LOCK_EX | LOCK_NB)
|
||||
│
|
||||
├─── Lock acquired → Server not running (or crashed)
|
||||
│ │
|
||||
│ ▼
|
||||
│ Find available port (start from 3538, increment if busy)
|
||||
│ │
|
||||
│ ▼
|
||||
│ Daemonize: fork → setsid → fork → redirect I/O
|
||||
│ │
|
||||
│ ▼
|
||||
│ Child: Start server, hold lock, write server.lock JSON
|
||||
│ Parent: Wait for health check, then proceed
|
||||
│
|
||||
└─── Lock blocked → Server already running
|
||||
│
|
||||
▼
|
||||
Read port from server.lock
|
||||
│
|
||||
▼
|
||||
Health check (GET /health)
|
||||
│
|
||||
├─── Success → Use this server
|
||||
│
|
||||
└─── Failure → Wait and retry (server starting up)
|
||||
│
|
||||
▼
|
||||
Forward request to http://localhost:${port}/api/...
|
||||
```
|
||||
|
||||
### Daemonization
|
||||
|
||||
The server daemonizes using the classic double-fork pattern:
|
||||
|
||||
1. **First fork**: Parent returns immediately to CLI
|
||||
2. **setsid()**: Become session leader, detach from terminal
|
||||
3. **Second fork**: Prevent re-acquiring terminal
|
||||
4. **Redirect I/O**: stdout/stderr → `server.log`, stdin → `/dev/null`
|
||||
5. **Write lock file**: PID, port, started_at, config_hash
|
||||
6. **Start serving**: Hold file lock for lifetime of process
|
||||
|
||||
### Idle Timeout
|
||||
|
||||
The server monitors request activity:
|
||||
|
||||
1. Track `last_request_time` (updated on each HTTP request)
|
||||
2. Background task checks every 60 seconds
|
||||
3. If `now - last_request_time > idle_timeout_seconds` → graceful shutdown
|
||||
|
||||
### Graceful Shutdown
|
||||
|
||||
On shutdown (idle timeout, SIGTERM, or `databuild stop`):
|
||||
|
||||
1. Stop accepting new connections
|
||||
2. Wait for in-flight requests to complete (with timeout)
|
||||
3. Signal orchestrator to stop
|
||||
4. Wait for orchestrator thread to finish
|
||||
5. Release file lock (automatic on process exit)
|
||||
6. Exit
|
||||
|
||||
## Port Selection
|
||||
|
||||
When starting a new server:
|
||||
|
||||
1. Start with default port 3538
|
||||
2. Try to bind; if port in use, increment and retry
|
||||
3. Store selected port in `server.lock`
|
||||
4. CLI reads port from lock file, not from config
|
||||
|
||||
This handles the case where the preferred port is occupied by another process.
|
||||
|
||||
## Config Change Detection
|
||||
|
||||
The `config_hash` field in `server.lock` enables detecting when the config file has changed since the server started:
|
||||
|
||||
1. On CLI invocation, compute hash of current config file
|
||||
2. Compare with `config_hash` in `server.lock`
|
||||
3. If different, warn user:
|
||||
```
|
||||
Warning: Config has changed since server started.
|
||||
Run 'databuild stop && databuild serve' to apply changes.
|
||||
```
|
||||
|
||||
We don't auto-restart because that could interrupt in-progress builds.
|
||||
|
||||
## Error Handling
|
||||
|
||||
### Stale Lock File
|
||||
|
||||
If `server.lock` exists but the lock is not held (process crashed):
|
||||
|
||||
1. Delete the stale `server.lock`
|
||||
2. Proceed with normal startup
|
||||
|
||||
### Server Unreachable
|
||||
|
||||
If lock is held but health check fails repeatedly:
|
||||
|
||||
1. Log warning: "Server appears unresponsive"
|
||||
2. After N retries, suggest: "Try 'kill -9 ${pid}' and retry"
|
||||
|
||||
### Port Conflict
|
||||
|
||||
If preferred port is in use:
|
||||
|
||||
1. Automatically try next port (3539, 3540, ...)
|
||||
2. Store actual port in `server.lock`
|
||||
3. CLI reads from lock file, so it always connects to correct port
|
||||
|
||||
## Future Considerations
|
||||
|
||||
### Multi-Graph Scenarios
|
||||
|
||||
The `graph_label` based directory structure supports multiple graphs in the same workspace. Each graph has independent:
|
||||
- Server process
|
||||
- Port allocation
|
||||
- BEL database
|
||||
- Idle timeout
|
||||
|
||||
### Remote Servers
|
||||
|
||||
The current design assumes localhost. Future extensions could support:
|
||||
- Remote server URLs in config
|
||||
- SSH tunneling
|
||||
- Cloud-hosted servers
|
||||
|
||||
### Job Re-entrance
|
||||
|
||||
Currently, if a server crashes mid-build, job runs are orphaned. Future work:
|
||||
- Detect orphaned job runs on startup
|
||||
- Resume or mark as failed
|
||||
- Track external job processes (e.g., Databricks jobs)
|
||||
|
||||
## Implementation Checklist
|
||||
|
||||
- [ ] Extend `DatabuildConfig` with `graph_label` and `idle_timeout_seconds`
|
||||
- [ ] Create `ServerLock` struct for reading/writing lock file
|
||||
- [ ] Implement file locking with `flock()`
|
||||
- [ ] Implement daemonization (double-fork pattern)
|
||||
- [ ] Add auto-start logic to existing CLI commands
|
||||
- [ ] Add `databuild stop` command
|
||||
- [ ] Add `databuild status` command
|
||||
- [ ] Update example configs with `graph_label`
|
||||
- [ ] Add integration tests for server lifecycle
|
||||
|
|
@ -1,4 +1,5 @@
|
|||
{
|
||||
"graph_label": "multihop",
|
||||
"jobs": [
|
||||
{
|
||||
"label": "//examples/multihop:job_alpha",
|
||||
|
|
|
|||
Loading…
Reference in a new issue