Compare commits

..

4 commits

Author SHA1 Message Date
d812bb51e2 update multihop example
Some checks failed
/ setup (push) Has been cancelled
2025-11-27 14:51:13 +08:00
9a072ff74d fix locking, and update multihop example 2025-11-27 14:35:36 +08:00
f7c196c9b3 add automated server startup for cli 2025-11-27 14:20:40 +08:00
704ec0b6f3 add ecomm example idea 2025-11-27 12:05:33 +08:00
15 changed files with 1239 additions and 102 deletions

1
.gitignore vendored
View file

@ -17,6 +17,7 @@ generated_number
target
logs/databuild/
**/logs/databuild/
**/.databuild
# DSL generated code
**/generated/

View file

@ -118,6 +118,14 @@ crate.spec(
package = "urlencoding",
version = "2.1",
)
crate.spec(
package = "fs2",
version = "0.4",
)
crate.spec(
package = "libc",
version = "0.2",
)
crate.from_specs()
use_repo(crate, "crates")

File diff suppressed because one or more lines are too long

View file

@ -33,9 +33,12 @@ rust_library(
deps = [
"@crates//:askama",
"@crates//:axum",
"@crates//:fs2",
"@crates//:libc",
"@crates//:prost",
"@crates//:prost-types",
"@crates//:regex",
"@crates//:reqwest",
"@crates//:rusqlite",
"@crates//:schemars",
"@crates//:serde",
@ -56,6 +59,9 @@ rust_test(
crate = ":lib",
data = ["//databuild/test:test_job_helper"],
env = {"RUST_BACKTRACE": "1"},
deps = [
"@crates//:tempfile",
],
)
# DataBuild CLI binary

View file

@ -1,21 +1,24 @@
use clap::{Parser, Subcommand};
use reqwest::blocking::Client;
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::sync::{broadcast, mpsc};
use lib::build_event_log::SqliteBELStorage;
use lib::build_state::BuildState;
use lib::config::DatabuildConfig;
use lib::daemon::{self, DaemonizeResult};
use lib::http_server::{create_router, AppState};
use lib::orchestrator::{Orchestrator, OrchestratorConfig};
use lib::server_lock::ServerLock;
#[derive(Parser)]
#[command(name = "databuild")]
#[command(about = "DataBuild CLI - Build system for data pipelines", long_about = None)]
struct Cli {
/// Server URL
#[arg(long, default_value = "http://localhost:3538", global = true)]
server: String,
/// Path to configuration file (JSON or TOML)
#[arg(long, default_value = "databuild.json", global = true)]
config: String,
#[command(subcommand)]
command: Commands,
@ -25,19 +28,21 @@ struct Cli {
enum Commands {
/// Start the DataBuild HTTP server
Serve {
/// Port to listen on
#[arg(long, default_value = "3538")]
port: u16,
/// Database URL (default: :memory: for in-memory SQLite)
#[arg(long, default_value = ":memory:")]
database: String,
/// Path to configuration file (JSON or TOML)
/// Port to listen on (auto-selected if not specified)
#[arg(long)]
config: Option<String>,
port: Option<u16>,
/// Run as a daemon (internal flag, used by auto-start)
#[arg(long, hide = true)]
daemon: bool,
},
/// Stop the running server
Stop,
/// Show server status
Status,
/// Create a new want (trigger partition builds)
Want {
/// Partition references to build (e.g., "data/users", "metrics/daily")
@ -81,60 +86,192 @@ enum JobRunsCommands {
List,
}
/// Load config and return (config, graph_label, config_hash)
fn load_config(config_path: &str) -> (DatabuildConfig, String) {
let config = DatabuildConfig::from_file(config_path).unwrap_or_else(|e| {
eprintln!("Failed to load config from {}: {}", config_path, e);
std::process::exit(1);
});
let config_hash = ServerLock::hash_config(Path::new(config_path)).unwrap_or_else(|e| {
eprintln!("Failed to hash config: {}", e);
std::process::exit(1);
});
(config, config_hash)
}
/// Ensure server is running, return the server URL
fn ensure_server(config_path: &str) -> String {
let (config, config_hash) = load_config(config_path);
match daemon::ensure_server_running(Path::new(config_path), &config.graph_label, &config_hash) {
Ok(DaemonizeResult::Started { port }) => {
eprintln!("Started server on port {}", port);
format!("http://127.0.0.1:{}", port)
}
Ok(DaemonizeResult::AlreadyRunning { port }) => {
format!("http://127.0.0.1:{}", port)
}
Err(e) => {
eprintln!("Failed to start server: {}", e);
std::process::exit(1);
}
}
}
fn main() {
let cli = Cli::parse();
match cli.command {
Commands::Serve { port, database, config } => {
cmd_serve(port, &database, config.as_deref());
Commands::Serve { port, daemon: _ } => {
let (config, _config_hash) = load_config(&cli.config);
// Ensure graph directory exists (for log file, lock file, etc.)
let _ = ServerLock::new(&config.graph_label).unwrap_or_else(|e| {
eprintln!("Failed to create graph directory: {}", e);
std::process::exit(1);
});
let database = config.effective_bel_uri();
let actual_port = port.unwrap_or_else(|| {
daemon::find_available_port(3538).unwrap_or_else(|e| {
eprintln!("Failed to find available port: {}", e);
std::process::exit(1);
})
});
cmd_serve(actual_port, &database, &cli.config, &config);
}
Commands::Stop => {
cmd_stop(&cli.config);
}
Commands::Status => {
cmd_status(&cli.config);
}
Commands::Want { partitions } => {
cmd_want(&cli.server, partitions);
let server_url = ensure_server(&cli.config);
cmd_want(&server_url, partitions);
}
Commands::Wants { command } => match command {
WantsCommands::List => {
cmd_wants_list(&cli.server);
let server_url = ensure_server(&cli.config);
cmd_wants_list(&server_url);
}
},
Commands::Partitions { command } => match command {
Some(PartitionsCommands::List) | None => {
cmd_partitions_list(&cli.server);
let server_url = ensure_server(&cli.config);
cmd_partitions_list(&server_url);
}
},
Commands::JobRuns { command } => match command {
Some(JobRunsCommands::List) | None => {
cmd_job_runs_list(&cli.server);
let server_url = ensure_server(&cli.config);
cmd_job_runs_list(&server_url);
}
},
}
}
fn cmd_stop(config_path: &str) {
let (config, _) = load_config(config_path);
match daemon::stop_server(&config.graph_label) {
Ok(()) => {
println!("Server stopped");
}
Err(e) => {
eprintln!("Failed to stop server: {}", e);
std::process::exit(1);
}
}
}
fn cmd_status(config_path: &str) {
let (config, config_hash) = load_config(config_path);
let lock = ServerLock::new(&config.graph_label).unwrap_or_else(|e| {
eprintln!("Failed to access lock: {}", e);
std::process::exit(1);
});
match lock.read_state() {
Ok(Some(state)) => {
let running = ServerLock::is_process_running(state.pid);
let healthy = if running { daemon::health_check(state.port) } else { false };
let config_changed = state.config_hash != config_hash;
println!("DataBuild Server Status");
println!("━━━━━━━━━━━━━━━━━━━━━━━━");
println!("Graph: {}", config.graph_label);
println!("Status: {}", if healthy { "Running" } else if running { "Unhealthy" } else { "Stopped" });
println!("URL: http://127.0.0.1:{}", state.port);
println!("PID: {}", state.pid);
println!("Database: {}", config.effective_bel_uri());
if config_changed {
println!();
println!("⚠ Config has changed since server started");
}
}
Ok(None) => {
println!("DataBuild Server Status");
println!("━━━━━━━━━━━━━━━━━━━━━━━━");
println!("Graph: {}", config.graph_label);
println!("Database: {}", config.effective_bel_uri());
println!("Status: Not running");
}
Err(e) => {
eprintln!("Failed to read server state: {}", e);
std::process::exit(1);
}
}
}
// ============================================================================
// Command Implementations
// ============================================================================
#[tokio::main]
async fn cmd_serve(port: u16, database: &str, config_path: Option<&str>) {
async fn cmd_serve(port: u16, database: &str, config_path: &str, config: &DatabuildConfig) {
// Initialize logging
tracing_subscriber::fmt::init();
// Load configuration if provided
let jobs = if let Some(path) = config_path {
match DatabuildConfig::from_file(path) {
Ok(config) => {
println!("Loaded configuration from: {}", path);
println!(" Jobs: {}", config.jobs.len());
config.into_job_configurations()
}
Err(e) => {
eprintln!("Failed to load configuration from {}: {}", path, e);
std::process::exit(1);
// Acquire and hold the server lock for the duration of the server
let mut server_lock = ServerLock::new(&config.graph_label).unwrap_or_else(|e| {
eprintln!("Failed to create server lock: {}", e);
std::process::exit(1);
});
// Try to acquire exclusive lock
match server_lock.try_lock() {
Ok(true) => {
// Write our state
let config_hash = ServerLock::hash_config(Path::new(config_path)).unwrap_or_default();
let state = lib::server_lock::ServerLockState {
pid: std::process::id(),
port,
started_at: ServerLock::now_millis(),
config_hash,
};
if let Err(e) = server_lock.write_state(&state) {
eprintln!("Failed to write server state: {}", e);
}
}
} else {
Vec::new()
};
Ok(false) => {
// Another server is holding the lock - this shouldn't happen in daemon mode
// but could happen if user manually runs serve while another server is running
eprintln!("Another server is already running for graph '{}'. Use 'databuild stop' first.", config.graph_label);
std::process::exit(1);
}
Err(e) => {
eprintln!("Failed to acquire server lock: {}", e);
std::process::exit(1);
}
}
println!("Loaded configuration from: {}", config_path);
println!(" Graph: {}", config.graph_label);
println!(" Jobs: {}", config.jobs.len());
println!(" Idle timeout: {}s", config.idle_timeout_seconds);
let jobs = config.clone().into_job_configurations();
let idle_timeout_secs = config.idle_timeout_seconds;
// Create SQLite BEL storage (shared between orchestrator and HTTP server)
let bel_storage = Arc::new(
@ -208,7 +345,7 @@ async fn cmd_serve(port: u16, database: &str, config_path: Option<&str>) {
let idle_state = state.clone();
let idle_shutdown_tx = shutdown_tx.clone();
tokio::spawn(async move {
let idle_timeout = Duration::from_secs(3 * 60 * 60); // 3 hours
let idle_timeout = Duration::from_secs(idle_timeout_secs);
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
@ -221,8 +358,8 @@ async fn cmd_serve(port: u16, database: &str, config_path: Option<&str>) {
if now - last_request > idle_timeout.as_millis() as u64 {
eprintln!(
"Server idle for {} hours, shutting down",
idle_timeout.as_secs() / 3600
"Server idle for {}s, shutting down",
idle_timeout.as_secs()
);
let _ = idle_shutdown_tx.send(());
break;

View file

@ -4,13 +4,38 @@ use crate::util::DatabuildError;
use std::fs;
use std::path::Path;
/// Default idle timeout in seconds (1 hour)
pub const DEFAULT_IDLE_TIMEOUT_SECONDS: u64 = 3600;
/// Configuration file format for DataBuild application
#[derive(Debug, serde::Deserialize)]
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DatabuildConfig {
/// Unique identifier for this graph, used for `.databuild/${graph_label}/` directory
pub graph_label: String,
/// Server auto-shutdown after this many seconds of inactivity (default: 3600)
#[serde(default = "default_idle_timeout")]
pub idle_timeout_seconds: u64,
/// BEL storage URI. Defaults to `.databuild/${graph_label}/bel.sqlite` if not specified.
///
/// Supported formats:
/// - Relative path: `path/to/bel.sqlite` (resolved relative to config file)
/// - Absolute path: `/var/data/bel.sqlite`
/// - SQLite URI: `sqlite:///path/to/bel.sqlite` or `sqlite::memory:`
/// - Future: `postgresql://user:pass@host/db`
#[serde(default)]
pub bel_uri: Option<String>,
/// List of job configurations
#[serde(default)]
pub jobs: Vec<JobConfig>,
}
fn default_idle_timeout() -> u64 {
DEFAULT_IDLE_TIMEOUT_SECONDS
}
impl DatabuildConfig {
/// Load configuration from a file, auto-detecting format from extension
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self, DatabuildError> {
@ -47,6 +72,16 @@ impl DatabuildConfig {
pub fn into_job_configurations(self) -> Vec<JobConfiguration> {
self.jobs.into_iter().map(|jc| jc.into()).collect()
}
/// Get the effective BEL URI, resolving defaults based on graph_label.
///
/// If `bel_uri` is not set, returns the default path `.databuild/${graph_label}/bel.sqlite`.
/// Relative paths are not resolved here - that's the caller's responsibility.
pub fn effective_bel_uri(&self) -> String {
self.bel_uri
.clone()
.unwrap_or_else(|| format!(".databuild/{}/bel.sqlite", self.graph_label))
}
}
#[cfg(test)]
@ -57,6 +92,7 @@ mod tests {
fn test_parse_json_config() {
let json = r#"
{
"graph_label": "test_graph",
"jobs": [
{
"label": "//test:job_alpha",
@ -69,13 +105,47 @@ mod tests {
"#;
let config = DatabuildConfig::from_json(json).unwrap();
assert_eq!(config.graph_label, "test_graph");
assert_eq!(config.idle_timeout_seconds, DEFAULT_IDLE_TIMEOUT_SECONDS);
assert_eq!(config.jobs.len(), 1);
assert_eq!(config.jobs[0].label, "//test:job_alpha");
}
#[test]
fn test_parse_json_config_with_idle_timeout() {
let json = r#"
{
"graph_label": "test_graph",
"idle_timeout_seconds": 7200,
"jobs": []
}
"#;
let config = DatabuildConfig::from_json(json).unwrap();
assert_eq!(config.graph_label, "test_graph");
assert_eq!(config.idle_timeout_seconds, 7200);
assert_eq!(config.jobs.len(), 0);
}
#[test]
fn test_effective_bel_uri() {
// Default: derives from graph_label
let config = DatabuildConfig::from_json(r#"{ "graph_label": "my_graph" }"#).unwrap();
assert_eq!(config.effective_bel_uri(), ".databuild/my_graph/bel.sqlite");
// Custom: uses provided value
let config = DatabuildConfig::from_json(
r#"{ "graph_label": "my_graph", "bel_uri": "postgresql://localhost/db" }"#,
)
.unwrap();
assert_eq!(config.effective_bel_uri(), "postgresql://localhost/db");
}
#[test]
fn test_parse_toml_config() {
let toml = r#"
graph_label = "test_graph"
[[jobs]]
label = "//test:job_alpha"
entrypoint = "/usr/bin/python3"
@ -86,6 +156,8 @@ mod tests {
"#;
let config = DatabuildConfig::from_toml(toml).unwrap();
assert_eq!(config.graph_label, "test_graph");
assert_eq!(config.idle_timeout_seconds, DEFAULT_IDLE_TIMEOUT_SECONDS);
assert_eq!(config.jobs.len(), 1);
assert_eq!(config.jobs[0].label, "//test:job_alpha");
}

196
databuild/daemon.rs Normal file
View file

@ -0,0 +1,196 @@
//! Server daemonization for CLI-server automation.
//!
//! Implements the classic double-fork pattern to create a proper Unix daemon.
use crate::server_lock::ServerLock;
use crate::util::DatabuildError;
use std::fs::OpenOptions;
use std::path::Path;
use std::process::{Child, Command, Stdio};
/// Result of attempting to start a daemonized server.
pub enum DaemonizeResult {
/// Server was started, here's the port.
Started { port: u16 },
/// Server was already running at this port.
AlreadyRunning { port: u16 },
}
/// Find an available port starting from the given port.
pub fn find_available_port(start_port: u16) -> Result<u16, DatabuildError> {
for port in start_port..=start_port + 100 {
if let Ok(listener) = std::net::TcpListener::bind(format!("127.0.0.1:{}", port)) {
drop(listener);
return Ok(port);
}
}
Err(DatabuildError::from(format!(
"No available port found in range {}..{}",
start_port,
start_port + 100
)))
}
/// Check if the server at the given port is healthy.
pub fn health_check(port: u16) -> bool {
let url = format!("http://127.0.0.1:{}/health", port);
match reqwest::blocking::get(&url) {
Ok(resp) => resp.status().is_success(),
Err(_) => false,
}
}
/// Wait for the server at the given port to become healthy.
/// Returns Ok(()) if healthy within timeout, Err otherwise.
pub fn wait_for_health(port: u16, timeout_ms: u64) -> Result<(), DatabuildError> {
let start = std::time::Instant::now();
let timeout = std::time::Duration::from_millis(timeout_ms);
while start.elapsed() < timeout {
if health_check(port) {
return Ok(());
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
Err(DatabuildError::from(format!(
"Server did not become healthy within {}ms",
timeout_ms
)))
}
/// Spawn the server as a daemon process.
///
/// This re-executes the current binary with `serve` command and special flags
/// to indicate it should daemonize itself.
pub fn spawn_daemon(
config_path: &Path,
port: u16,
log_path: &Path,
) -> Result<Child, DatabuildError> {
// Get the current executable path
let exe = std::env::current_exe()
.map_err(|e| DatabuildError::from(format!("Failed to get current executable: {}", e)))?;
// Open log file for stdout/stderr redirection
let log_file = OpenOptions::new()
.create(true)
.append(true)
.open(log_path)
.map_err(|e| DatabuildError::from(format!("Failed to open log file: {}", e)))?;
let log_file_err = log_file
.try_clone()
.map_err(|e| DatabuildError::from(format!("Failed to clone log file: {}", e)))?;
// Spawn the daemon process
let child = Command::new(exe)
.arg("serve")
.arg("--port")
.arg(port.to_string())
.arg("--config")
.arg(config_path)
.arg("--daemon")
.stdin(Stdio::null())
.stdout(Stdio::from(log_file))
.stderr(Stdio::from(log_file_err))
.spawn()
.map_err(|e| DatabuildError::from(format!("Failed to spawn daemon: {}", e)))?;
Ok(child)
}
/// Attempt to start the server, or connect to an existing one.
///
/// This is the main entry point for CLI commands that need the server running.
pub fn ensure_server_running(
config_path: &Path,
graph_label: &str,
config_hash: &str,
) -> Result<DaemonizeResult, DatabuildError> {
let lock = ServerLock::new(graph_label)?;
// First, check if there's already a running server by reading existing state
if let Some(state) = lock.read_state()? {
// Check if that process is still running
if ServerLock::is_process_running(state.pid) {
// Verify server is actually healthy
if health_check(state.port) {
// Check if config has changed
if state.config_hash != config_hash {
eprintln!(
"Warning: Config has changed since server started.\n\
Run 'databuild stop && databuild serve' to apply changes."
);
}
return Ok(DaemonizeResult::AlreadyRunning { port: state.port });
}
// Process exists but not healthy - might still be starting up
// Wait a bit and check again
if wait_for_health(state.port, 5000).is_ok() {
return Ok(DaemonizeResult::AlreadyRunning { port: state.port });
}
// Still unhealthy, will need to be stopped manually
return Err(DatabuildError::from(format!(
"Server at port {} appears unhealthy. Try 'databuild stop' and retry.",
state.port
)));
} else {
// Stale lock file - process is gone, clean up
lock.remove_stale_lock()?;
}
}
// No server running - start one
// Find an available port
let port = find_available_port(3538)?;
// Spawn the daemon - it will acquire its own lock
let log_path = lock.log_path();
let _child = spawn_daemon(config_path, port, &log_path)?;
// Wait for server to become healthy (which implies it has acquired the lock)
wait_for_health(port, 10000)?;
Ok(DaemonizeResult::Started { port })
}
/// Stop a running server.
pub fn stop_server(graph_label: &str) -> Result<(), DatabuildError> {
let lock = ServerLock::new(graph_label)?;
let state = lock
.read_state()?
.ok_or_else(|| DatabuildError::from("No server is running"))?;
// Check if process exists
if !ServerLock::is_process_running(state.pid) {
// Process already dead, clean up stale lock
lock.remove_stale_lock()?;
return Ok(());
}
// Send SIGTERM to the server
#[cfg(unix)]
unsafe {
libc::kill(state.pid as i32, libc::SIGTERM);
}
// Wait for process to exit (with timeout)
let start = std::time::Instant::now();
let timeout = std::time::Duration::from_secs(10);
while start.elapsed() < timeout {
if !ServerLock::is_process_running(state.pid) {
return Ok(());
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
// If still running after timeout, force kill
#[cfg(unix)]
unsafe {
libc::kill(state.pid as i32, libc::SIGKILL);
}
Ok(())
}

View file

@ -2,6 +2,7 @@ pub mod build_event_log;
pub mod build_state;
pub mod commands;
pub mod config;
pub mod daemon;
mod data_deps;
mod event_transforms;
pub mod http_server;
@ -11,6 +12,7 @@ mod job_run_state;
mod mock_job_run;
pub mod orchestrator;
mod partition_state;
pub mod server_lock;
mod util;
mod want_state;
pub mod web;

308
databuild/server_lock.rs Normal file
View file

@ -0,0 +1,308 @@
//! Server lock file management for CLI-server automation.
//!
//! This module handles the `.databuild/${graph_label}/server.lock` file which:
//! 1. Acts as a file lock to prevent multiple servers for the same graph
//! 2. Stores runtime state (PID, port, start time, config hash)
use crate::util::DatabuildError;
use fs2::FileExt;
use sha2::{Digest, Sha256};
use std::fs::{self, File, OpenOptions};
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
/// Runtime state stored in the server.lock file
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ServerLockState {
/// Server process ID
pub pid: u32,
/// HTTP port the server is listening on
pub port: u16,
/// Unix timestamp (milliseconds) when server started
pub started_at: u64,
/// SHA256 hash of config file contents (for detecting config changes)
pub config_hash: String,
}
/// Manages the server lock file and .databuild directory structure.
pub struct ServerLock {
/// Path to the .databuild/${graph_label}/ directory
graph_dir: PathBuf,
/// Path to the server.lock file
lock_path: PathBuf,
/// Open file handle (holds the lock while alive)
lock_file: Option<File>,
}
impl ServerLock {
/// Create a ServerLock for the given graph label, using the current directory.
/// Creates the .databuild/${graph_label}/ directory if it doesn't exist.
pub fn new(graph_label: &str) -> Result<Self, DatabuildError> {
Self::new_in_dir(Path::new("."), graph_label)
}
/// Create a ServerLock for the given graph label in the specified base directory.
/// Creates the .databuild/${graph_label}/ directory if it doesn't exist.
pub fn new_in_dir(base_dir: &Path, graph_label: &str) -> Result<Self, DatabuildError> {
let graph_dir = base_dir.join(".databuild").join(graph_label);
fs::create_dir_all(&graph_dir).map_err(|e| {
DatabuildError::from(format!("Failed to create graph directory: {}", e))
})?;
let lock_path = graph_dir.join("server.lock");
Ok(Self {
graph_dir,
lock_path,
lock_file: None,
})
}
/// Path to the graph directory (`.databuild/${graph_label}/`)
pub fn graph_dir(&self) -> &Path {
&self.graph_dir
}
/// Path to the server log file
pub fn log_path(&self) -> PathBuf {
self.graph_dir.join("server.log")
}
/// Try to acquire an exclusive lock on the server.lock file.
/// Returns Ok(true) if lock acquired (server not running).
/// Returns Ok(false) if lock is held by another process (server running).
pub fn try_lock(&mut self) -> Result<bool, DatabuildError> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&self.lock_path)
.map_err(|e| DatabuildError::from(format!("Failed to open lock file: {}", e)))?;
match file.try_lock_exclusive() {
Ok(()) => {
self.lock_file = Some(file);
Ok(true)
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// Lock is held by another process
Ok(false)
}
Err(e) => Err(DatabuildError::from(format!(
"Failed to acquire lock: {}",
e
))),
}
}
/// Write the server state to the lock file.
/// Must have acquired the lock first via try_lock().
pub fn write_state(&mut self, state: &ServerLockState) -> Result<(), DatabuildError> {
let file = self
.lock_file
.as_mut()
.ok_or_else(|| DatabuildError::from("Lock not acquired"))?;
// Truncate and write new content
file.set_len(0)
.map_err(|e| DatabuildError::from(format!("Failed to truncate lock file: {}", e)))?;
let content = serde_json::to_string_pretty(state)
.map_err(|e| DatabuildError::from(format!("Failed to serialize state: {}", e)))?;
// Seek to beginning
use std::io::Seek;
file.seek(std::io::SeekFrom::Start(0))
.map_err(|e| DatabuildError::from(format!("Failed to seek: {}", e)))?;
file.write_all(content.as_bytes())
.map_err(|e| DatabuildError::from(format!("Failed to write state: {}", e)))?;
file.flush()
.map_err(|e| DatabuildError::from(format!("Failed to flush: {}", e)))?;
Ok(())
}
/// Read the server state from the lock file.
/// Can be called without holding the lock (to read port from running server).
pub fn read_state(&self) -> Result<Option<ServerLockState>, DatabuildError> {
if !self.lock_path.exists() {
return Ok(None);
}
let mut file = File::open(&self.lock_path)
.map_err(|e| DatabuildError::from(format!("Failed to open lock file: {}", e)))?;
let mut content = String::new();
file.read_to_string(&mut content)
.map_err(|e| DatabuildError::from(format!("Failed to read lock file: {}", e)))?;
if content.trim().is_empty() {
return Ok(None);
}
let state: ServerLockState = serde_json::from_str(&content)
.map_err(|e| DatabuildError::from(format!("Failed to parse lock file: {}", e)))?;
Ok(Some(state))
}
/// Check if the process with the given PID is still running.
pub fn is_process_running(pid: u32) -> bool {
// On Unix, sending signal 0 checks if process exists
#[cfg(unix)]
{
use std::os::unix::process::CommandExt;
unsafe { libc::kill(pid as i32, 0) == 0 }
}
#[cfg(not(unix))]
{
// On non-Unix, assume process is running if we can't check
true
}
}
/// Delete a stale lock file (when process has crashed).
pub fn remove_stale_lock(&self) -> Result<(), DatabuildError> {
if self.lock_path.exists() {
fs::remove_file(&self.lock_path)
.map_err(|e| DatabuildError::from(format!("Failed to remove stale lock: {}", e)))?;
}
Ok(())
}
/// Compute SHA256 hash of config file contents.
pub fn hash_config(config_path: &Path) -> Result<String, DatabuildError> {
let content = fs::read_to_string(config_path)
.map_err(|e| DatabuildError::from(format!("Failed to read config: {}", e)))?;
let mut hasher = Sha256::new();
hasher.update(content.as_bytes());
let result = hasher.finalize();
Ok(format!("sha256:{:x}", result))
}
/// Get current timestamp in milliseconds since Unix epoch.
pub fn now_millis() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}
}
impl Drop for ServerLock {
fn drop(&mut self) {
// Lock is automatically released when file is closed
if let Some(file) = self.lock_file.take() {
let _ = file.unlock();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_server_lock_acquire() {
let temp = tempdir().unwrap();
let mut lock = ServerLock::new_in_dir(temp.path(), "test_graph").unwrap();
assert!(lock.try_lock().unwrap(), "Lock should succeed");
}
#[test]
fn test_server_lock_state_read_write() {
let temp = tempdir().unwrap();
let mut lock = ServerLock::new_in_dir(temp.path(), "test_graph").unwrap();
assert!(lock.try_lock().unwrap());
let state = ServerLockState {
pid: 12345,
port: 8080,
started_at: 1701234567890,
config_hash: "sha256:abc123".to_string(),
};
lock.write_state(&state).unwrap();
// Read it back (via a fresh ServerLock to test read without lock)
let lock2 = ServerLock::new_in_dir(temp.path(), "test_graph").unwrap();
let read_state = lock2.read_state().unwrap().unwrap();
assert_eq!(read_state.pid, 12345);
assert_eq!(read_state.port, 8080);
assert_eq!(read_state.started_at, 1701234567890);
assert_eq!(read_state.config_hash, "sha256:abc123");
}
#[test]
fn test_server_lock_graph_dir() {
let temp = tempdir().unwrap();
let lock = ServerLock::new_in_dir(temp.path(), "my_graph").unwrap();
assert!(lock.graph_dir().ends_with(".databuild/my_graph"));
}
#[test]
fn test_server_lock_log_path() {
let temp = tempdir().unwrap();
let lock = ServerLock::new_in_dir(temp.path(), "my_graph").unwrap();
assert!(lock.log_path().ends_with(".databuild/my_graph/server.log"));
}
#[test]
fn test_hash_config() {
let temp = tempdir().unwrap();
let config_path = temp.path().join("config.json");
fs::write(&config_path, r#"{"graph_label": "test"}"#).unwrap();
let hash = ServerLock::hash_config(&config_path).unwrap();
assert!(hash.starts_with("sha256:"));
assert!(hash.len() > 10);
}
#[test]
fn test_hash_config_deterministic() {
let temp = tempdir().unwrap();
let config_path = temp.path().join("config.json");
fs::write(&config_path, r#"{"graph_label": "test"}"#).unwrap();
let hash1 = ServerLock::hash_config(&config_path).unwrap();
let hash2 = ServerLock::hash_config(&config_path).unwrap();
assert_eq!(hash1, hash2, "Same content should produce same hash");
}
#[test]
fn test_read_state_nonexistent() {
let temp = tempdir().unwrap();
let lock = ServerLock::new_in_dir(temp.path(), "nonexistent_graph").unwrap();
let state = lock.read_state().unwrap();
assert!(
state.is_none(),
"Reading nonexistent lock file should return None"
);
}
#[test]
fn test_now_millis() {
let before = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let now = ServerLock::now_millis();
let after = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
assert!(
now >= before && now <= after,
"now_millis should be between before and after"
);
}
}

View file

@ -78,6 +78,12 @@ impl From<String> for DatabuildError {
}
}
impl From<&str> for DatabuildError {
fn from(value: &str) -> Self {
Self::new(value)
}
}
impl std::fmt::Display for DatabuildError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.msg)

View file

@ -0,0 +1,330 @@
# CLI-Server Automation
This document describes how the DataBuild CLI automatically manages the HTTP server lifecycle, providing a "magical" experience where users don't need to think about starting or stopping servers.
## Goals
1. **Zero-config startup**: Running `databuild want data/alpha` should "just work" without manual server management
2. **Workspace isolation**: Multiple graphs can run independently with separate servers and databases
3. **Resource efficiency**: Servers auto-shutdown after idle timeout
4. **Transparency**: Users can inspect server state and logs when needed
## Design Overview
### Architecture
```
┌─────────────────────────────────────────────────────────────┐
│ CLI Process │
│ databuild want data/alpha │
│ │
│ 1. Load config (databuild.json) │
│ 2. Check .databuild/${graph_label}/server.lock │
│ 3. If not running → daemonize server │
│ 4. Forward request to http://localhost:${port}/api/wants │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Daemonized Server │
│ PID: 12345, Port: 8080 │
│ │
│ - Holds file lock on server.lock │
│ - Writes logs to server.log │
│ - Auto-shutdown after idle_timeout_seconds │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ .databuild/${graph_label}/ │
│ │
│ server.lock - Lock file + runtime state (JSON) │
│ bel.sqlite - Build Event Log database │
│ server.log - Server stdout/stderr │
└─────────────────────────────────────────────────────────────┘
```
### Directory Structure
```
project/
├── databuild.json # User-authored config
├── .databuild/
│ └── ${graph_label}/ # e.g., "podcast_reviews"
│ ├── server.lock # Runtime state + file lock
│ ├── bel.sqlite # Build Event Log (SQLite)
│ └── server.log # Server logs
```
## Configuration
### Extended Config Schema
The `databuild.json` (or custom config file) is extended with:
```json
{
"graph_label": "podcast_reviews",
"idle_timeout_seconds": 3600,
"jobs": [
{
"label": "//examples:daily_summaries",
"entrypoint": "./jobs/daily_summaries.sh",
"environment": { "OUTPUT_DIR": "/data/output" },
"partition_patterns": ["daily_summaries/.*"]
}
]
}
```
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `graph_label` | string | **required** | Unique identifier for this graph, used for `.databuild/${graph_label}/` directory |
| `idle_timeout_seconds` | u64 | 3600 | Server auto-shutdown after this many seconds of inactivity |
| `jobs` | array | [] | Job configurations (existing schema) |
### Runtime State (server.lock)
The `server.lock` file serves dual purposes:
1. **File lock**: Prevents multiple servers for the same graph
2. **Runtime state**: Contains current server information
```json
{
"pid": 12345,
"port": 8080,
"started_at": 1701234567890,
"config_hash": "sha256:abc123..."
}
```
| Field | Description |
|-------|-------------|
| `pid` | Server process ID |
| `port` | HTTP port the server is listening on |
| `started_at` | Unix timestamp (milliseconds) when server started |
| `config_hash` | Hash of config file contents (for detecting config changes) |
## CLI Commands
### Existing Commands (Enhanced)
All commands that interact with the server now auto-start if needed:
```bash
# Creates want, auto-starting server if not running
databuild want data/alpha data/beta
# Lists wants, auto-starting server if not running
databuild wants list
# Lists partitions
databuild partitions list
# Lists job runs
databuild job-runs list
```
### New Commands
```bash
# Explicitly start server (for users who want manual control)
databuild serve
databuild serve --config ./custom-config.json
# Show server status
databuild status
# Graceful shutdown
databuild stop
```
### Command: `databuild status`
Shows current server state:
```
DataBuild Server Status
━━━━━━━━━━━━━━━━━━━━━━━━
Graph: podcast_reviews
Status: Running
PID: 12345
Port: 8080
Uptime: 2h 34m
Database: .databuild/podcast_reviews/bel.sqlite
Active Job Runs: 2
Pending Wants: 5
```
### Command: `databuild stop`
Gracefully shuts down the server:
```bash
$ databuild stop
Stopping DataBuild server (PID 12345)...
Server stopped.
```
## Server Lifecycle
### Startup Flow
```
CLI invocation (e.g., databuild want data/alpha)
Load databuild.json (or --config path)
Extract graph_label from config
Ensure .databuild/${graph_label}/ exists
Try flock(server.lock, LOCK_EX | LOCK_NB)
├─── Lock acquired → Server not running (or crashed)
│ │
│ ▼
│ Find available port (start from 3538, increment if busy)
│ │
│ ▼
│ Daemonize: fork → setsid → fork → redirect I/O
│ │
│ ▼
│ Child: Start server, hold lock, write server.lock JSON
│ Parent: Wait for health check, then proceed
└─── Lock blocked → Server already running
Read port from server.lock
Health check (GET /health)
├─── Success → Use this server
└─── Failure → Wait and retry (server starting up)
Forward request to http://localhost:${port}/api/...
```
### Daemonization
The server daemonizes using the classic double-fork pattern:
1. **First fork**: Parent returns immediately to CLI
2. **setsid()**: Become session leader, detach from terminal
3. **Second fork**: Prevent re-acquiring terminal
4. **Redirect I/O**: stdout/stderr → `server.log`, stdin → `/dev/null`
5. **Write lock file**: PID, port, started_at, config_hash
6. **Start serving**: Hold file lock for lifetime of process
### Idle Timeout
The server monitors request activity:
1. Track `last_request_time` (updated on each HTTP request)
2. Background task checks every 60 seconds
3. If `now - last_request_time > idle_timeout_seconds` → graceful shutdown
### Graceful Shutdown
On shutdown (idle timeout, SIGTERM, or `databuild stop`):
1. Stop accepting new connections
2. Wait for in-flight requests to complete (with timeout)
3. Signal orchestrator to stop
4. Wait for orchestrator thread to finish
5. Release file lock (automatic on process exit)
6. Exit
## Port Selection
When starting a new server:
1. Start with default port 3538
2. Try to bind; if port in use, increment and retry
3. Store selected port in `server.lock`
4. CLI reads port from lock file, not from config
This handles the case where the preferred port is occupied by another process.
## Config Change Detection
The `config_hash` field in `server.lock` enables detecting when the config file has changed since the server started:
1. On CLI invocation, compute hash of current config file
2. Compare with `config_hash` in `server.lock`
3. If different, warn user:
```
Warning: Config has changed since server started.
Run 'databuild stop && databuild serve' to apply changes.
```
We don't auto-restart because that could interrupt in-progress builds.
## Error Handling
### Stale Lock File
If `server.lock` exists but the lock is not held (process crashed):
1. Delete the stale `server.lock`
2. Proceed with normal startup
### Server Unreachable
If lock is held but health check fails repeatedly:
1. Log warning: "Server appears unresponsive"
2. After N retries, suggest: "Try 'kill -9 ${pid}' and retry"
### Port Conflict
If preferred port is in use:
1. Automatically try next port (3539, 3540, ...)
2. Store actual port in `server.lock`
3. CLI reads from lock file, so it always connects to correct port
## Future Considerations
### Multi-Graph Scenarios
The `graph_label` based directory structure supports multiple graphs in the same workspace. Each graph has independent:
- Server process
- Port allocation
- BEL database
- Idle timeout
### Remote Servers
The current design assumes localhost. Future extensions could support:
- Remote server URLs in config
- SSH tunneling
- Cloud-hosted servers
### Job Re-entrance
Currently, if a server crashes mid-build, job runs are orphaned. Future work:
- Detect orphaned job runs on startup
- Resume or mark as failed
- Track external job processes (e.g., Databricks jobs)
## Implementation Checklist
- [ ] Extend `DatabuildConfig` with `graph_label` and `idle_timeout_seconds`
- [ ] Create `ServerLock` struct for reading/writing lock file
- [ ] Implement file locking with `flock()`
- [ ] Implement daemonization (double-fork pattern)
- [ ] Add auto-start logic to existing CLI commands
- [ ] Add `databuild stop` command
- [ ] Add `databuild status` command
- [ ] Update example configs with `graph_label`
- [ ] Add integration tests for server lifecycle

17
examples/README.md Normal file
View file

@ -0,0 +1,17 @@
# Examples
- `multihop` - Simple as possible demonstration of a graph with two partitions alpha and beta, where beta depends on alpha.
## Ideas
### E-Commerce Search Ranker
- Search to detail page behavior to conversion to return pipeline
- Add in "profile resolution" step to capture dep type? (e.g. person signs in at checkout)
- "input" partitioning: date, country (different infra/laws/products/culture), customer? (what if its for shopify)
- search ranking modeling flow: TDC, train, inference (simple linear model)
- cross-customer product fraud alliance to qualify products (excuse to have fan-in-and-out pattern - "opt in")
- sessionizing/preprocessing impressions
- returns happen later, and handling them in an atomic, final way
- performance metrics aggregates
- what does multiple paths back to the same partition?

View file

@ -1,4 +1,5 @@
{
"graph_label": "multihop",
"jobs": [
{
"label": "//examples/multihop:job_alpha",

1
scripts/databuild Symbolic link
View file

@ -0,0 +1 @@
../bazel-bin/databuild/databuild

View file

@ -3,72 +3,60 @@ set -euo pipefail
# Navigate to repository root
cd "$(dirname "$0")/.."
REPO_ROOT="$(pwd)"
# Configuration
PORT=3050
DB_PATH="/tmp/databuild_multihop.db"
EXAMPLE_DIR="$REPO_ROOT/examples/multihop"
FLAG_FILE="/tmp/databuild_multihop_alpha_complete"
PID_FILE="/tmp/databuild_multihop.pid"
CLI="$REPO_ROOT/bazel-bin/databuild/databuild"
echo "=== DataBuild Multi-Hop Example ==="
echo
# Clean up previous state
echo "Cleaning up previous state..."
rm -f "$DB_PATH" "$FLAG_FILE" "$PID_FILE"
pkill -f "databuild.*serve.*port $PORT" || true
sleep 1
rm -f "$FLAG_FILE"
rm -rf "$EXAMPLE_DIR/.databuild/"
(cd "$EXAMPLE_DIR" && "$CLI" stop 2>/dev/null || true)
# Build the binary
echo "Building databuild CLI..."
bazel build //databuild:databuild
# Start the server in background
echo "Starting databuild server on port $PORT..."
./bazel-bin/databuild/databuild serve \
--port $PORT \
--database "$DB_PATH" \
--config examples/multihop/config.json &
echo
echo "=== Starting server ==="
echo
SERVER_PID=$!
echo $SERVER_PID > "$PID_FILE"
echo "Server started with PID $SERVER_PID"
# Start the server by making a request (triggers auto-start)
cd "$EXAMPLE_DIR"
OUTPUT=$("$CLI" wants list 2>&1)
# Wait for server to be ready
echo "Waiting for server to start..."
sleep 2
# Test server health
if curl -s http://localhost:$PORT/health > /dev/null 2>&1; then
echo "Server is ready!"
else
echo "WARNING: Server health check failed, but continuing..."
fi
# Extract port from status
PORT=$("$CLI" status 2>&1 | grep "Port:" | awk '{print $2}')
echo
echo "=== Server is running ==="
echo "Server running at: http://127.0.0.1:${PORT}"
echo
echo "You can now interact with the server:"
echo "=== Ready to run example ==="
echo
echo "From the examples/multihop directory, try the following commands:"
echo "(The CLI uses databuild.json by default)"
echo
echo " # Check server status"
echo " databuild status"
echo
echo " # Create a want for data/beta (triggers dependency chain)"
echo " ./bazel-bin/databuild/databuild --server http://localhost:$PORT want data/beta"
echo " databuild want data/beta"
echo
echo " # Monitor wants"
echo " ./bazel-bin/databuild/databuild --server http://localhost:$PORT wants list"
echo " databuild wants list"
echo
echo " # Monitor job runs"
echo " ./bazel-bin/databuild/databuild --server http://localhost:$PORT job-runs list"
echo " databuild job-runs list"
echo
echo " # Monitor partitions"
echo " ./bazel-bin/databuild/databuild --server http://localhost:$PORT partitions list"
echo " databuild partitions list"
echo
echo "To stop the server:"
echo " kill $SERVER_PID"
echo " # or: pkill -f 'databuild.*serve.*port $PORT'"
echo " # Stop the server when done"
echo " databuild stop"
echo
echo "Server logs will appear below. Press Ctrl+C to stop."
echo "=========================================="
echo
# Wait for the server process
wait $SERVER_PID
echo "==========================================="