fix locking, and update multihop example

This commit is contained in:
Stuart Axelbrooke 2025-11-27 14:35:36 +08:00
parent f7c196c9b3
commit 9a072ff74d
5 changed files with 120 additions and 109 deletions

View file

@ -232,6 +232,39 @@ async fn cmd_serve(port: u16, database: &str, config_path: &str, config: &Databu
// Initialize logging // Initialize logging
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
// Acquire and hold the server lock for the duration of the server
let mut server_lock = ServerLock::new(&config.graph_label).unwrap_or_else(|e| {
eprintln!("Failed to create server lock: {}", e);
std::process::exit(1);
});
// Try to acquire exclusive lock
match server_lock.try_lock() {
Ok(true) => {
// Write our state
let config_hash = ServerLock::hash_config(Path::new(config_path)).unwrap_or_default();
let state = lib::server_lock::ServerLockState {
pid: std::process::id(),
port,
started_at: ServerLock::now_millis(),
config_hash,
};
if let Err(e) = server_lock.write_state(&state) {
eprintln!("Failed to write server state: {}", e);
}
}
Ok(false) => {
// Another server is holding the lock - this shouldn't happen in daemon mode
// but could happen if user manually runs serve while another server is running
eprintln!("Another server is already running for graph '{}'. Use 'databuild stop' first.", config.graph_label);
std::process::exit(1);
}
Err(e) => {
eprintln!("Failed to acquire server lock: {}", e);
std::process::exit(1);
}
}
println!("Loaded configuration from: {}", config_path); println!("Loaded configuration from: {}", config_path);
println!(" Graph: {}", config.graph_label); println!(" Graph: {}", config.graph_label);
println!(" Jobs: {}", config.jobs.len()); println!(" Jobs: {}", config.jobs.len());

View file

@ -78,9 +78,9 @@ impl DatabuildConfig {
/// If `bel_uri` is not set, returns the default path `.databuild/${graph_label}/bel.sqlite`. /// If `bel_uri` is not set, returns the default path `.databuild/${graph_label}/bel.sqlite`.
/// Relative paths are not resolved here - that's the caller's responsibility. /// Relative paths are not resolved here - that's the caller's responsibility.
pub fn effective_bel_uri(&self) -> String { pub fn effective_bel_uri(&self) -> String {
self.bel_uri.clone().unwrap_or_else(|| { self.bel_uri
format!(".databuild/{}/bel.sqlite", self.graph_label) .clone()
}) .unwrap_or_else(|| format!(".databuild/{}/bel.sqlite", self.graph_label))
} }
} }
@ -134,7 +134,10 @@ mod tests {
assert_eq!(config.effective_bel_uri(), ".databuild/my_graph/bel.sqlite"); assert_eq!(config.effective_bel_uri(), ".databuild/my_graph/bel.sqlite");
// Custom: uses provided value // Custom: uses provided value
let config = DatabuildConfig::from_json(r#"{ "graph_label": "my_graph", "bel_uri": "postgresql://localhost/db" }"#).unwrap(); let config = DatabuildConfig::from_json(
r#"{ "graph_label": "my_graph", "bel_uri": "postgresql://localhost/db" }"#,
)
.unwrap();
assert_eq!(config.effective_bel_uri(), "postgresql://localhost/db"); assert_eq!(config.effective_bel_uri(), "postgresql://localhost/db");
} }

View file

@ -2,7 +2,7 @@
//! //!
//! Implements the classic double-fork pattern to create a proper Unix daemon. //! Implements the classic double-fork pattern to create a proper Unix daemon.
use crate::server_lock::{ServerLock, ServerLockState}; use crate::server_lock::ServerLock;
use crate::util::DatabuildError; use crate::util::DatabuildError;
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::path::Path; use std::path::Path;
@ -108,68 +108,51 @@ pub fn ensure_server_running(
graph_label: &str, graph_label: &str,
config_hash: &str, config_hash: &str,
) -> Result<DaemonizeResult, DatabuildError> { ) -> Result<DaemonizeResult, DatabuildError> {
let mut lock = ServerLock::new(graph_label)?; let lock = ServerLock::new(graph_label)?;
// Try to acquire the lock // First, check if there's already a running server by reading existing state
if lock.try_lock()? { if let Some(state) = lock.read_state()? {
// We got the lock, so no server is running // Check if that process is still running
// Find an available port if ServerLock::is_process_running(state.pid) {
let port = find_available_port(3538)?; // Verify server is actually healthy
if health_check(state.port) {
// Write initial state (we'll update after server starts) // Check if config has changed
let state = ServerLockState { if state.config_hash != config_hash {
pid: std::process::id(), eprintln!(
port, "Warning: Config has changed since server started.\n\
started_at: ServerLock::now_millis(), Run 'databuild stop && databuild serve' to apply changes."
config_hash: config_hash.to_string(), );
}; }
lock.write_state(&state)?; return Ok(DaemonizeResult::AlreadyRunning { port: state.port });
}
// Spawn the daemon // Process exists but not healthy - might still be starting up
let log_path = lock.log_path(); // Wait a bit and check again
let mut child = spawn_daemon(config_path, port, &log_path)?; if wait_for_health(state.port, 5000).is_ok() {
return Ok(DaemonizeResult::AlreadyRunning { port: state.port });
// Update state with actual PID }
let pid = child.id(); // Still unhealthy, will need to be stopped manually
let state = ServerLockState {
pid,
port,
started_at: ServerLock::now_millis(),
config_hash: config_hash.to_string(),
};
lock.write_state(&state)?;
// Release the lock so the daemon can grab it
drop(lock);
// Wait for server to become healthy
wait_for_health(port, 10000)?;
Ok(DaemonizeResult::Started { port })
} else {
// Server is already running, read the port
let state = lock
.read_state()?
.ok_or_else(|| DatabuildError::from("Lock held but no state found"))?;
// Verify server is actually healthy
if !health_check(state.port) {
return Err(DatabuildError::from(format!( return Err(DatabuildError::from(format!(
"Server at port {} appears unhealthy. Try 'databuild stop' and retry.", "Server at port {} appears unhealthy. Try 'databuild stop' and retry.",
state.port state.port
))); )));
} else {
// Stale lock file - process is gone, clean up
lock.remove_stale_lock()?;
} }
// Check if config has changed
if state.config_hash != config_hash {
eprintln!(
"Warning: Config has changed since server started.\n\
Run 'databuild stop && databuild serve' to apply changes."
);
}
Ok(DaemonizeResult::AlreadyRunning { port: state.port })
} }
// No server running - start one
// Find an available port
let port = find_available_port(3538)?;
// Spawn the daemon - it will acquire its own lock
let log_path = lock.log_path();
let _child = spawn_daemon(config_path, port, &log_path)?;
// Wait for server to become healthy (which implies it has acquired the lock)
wait_for_health(port, 10000)?;
Ok(DaemonizeResult::Started { port })
} }
/// Stop a running server. /// Stop a running server.

View file

@ -45,8 +45,9 @@ impl ServerLock {
/// Creates the .databuild/${graph_label}/ directory if it doesn't exist. /// Creates the .databuild/${graph_label}/ directory if it doesn't exist.
pub fn new_in_dir(base_dir: &Path, graph_label: &str) -> Result<Self, DatabuildError> { pub fn new_in_dir(base_dir: &Path, graph_label: &str) -> Result<Self, DatabuildError> {
let graph_dir = base_dir.join(".databuild").join(graph_label); let graph_dir = base_dir.join(".databuild").join(graph_label);
fs::create_dir_all(&graph_dir) fs::create_dir_all(&graph_dir).map_err(|e| {
.map_err(|e| DatabuildError::from(format!("Failed to create graph directory: {}", e)))?; DatabuildError::from(format!("Failed to create graph directory: {}", e))
})?;
let lock_path = graph_dir.join("server.lock"); let lock_path = graph_dir.join("server.lock");
@ -88,7 +89,10 @@ impl ServerLock {
// Lock is held by another process // Lock is held by another process
Ok(false) Ok(false)
} }
Err(e) => Err(DatabuildError::from(format!("Failed to acquire lock: {}", e))), Err(e) => Err(DatabuildError::from(format!(
"Failed to acquire lock: {}",
e
))),
} }
} }
@ -151,9 +155,7 @@ impl ServerLock {
#[cfg(unix)] #[cfg(unix)]
{ {
use std::os::unix::process::CommandExt; use std::os::unix::process::CommandExt;
unsafe { unsafe { libc::kill(pid as i32, 0) == 0 }
libc::kill(pid as i32, 0) == 0
}
} }
#[cfg(not(unix))] #[cfg(not(unix))]
{ {
@ -278,7 +280,10 @@ mod tests {
let temp = tempdir().unwrap(); let temp = tempdir().unwrap();
let lock = ServerLock::new_in_dir(temp.path(), "nonexistent_graph").unwrap(); let lock = ServerLock::new_in_dir(temp.path(), "nonexistent_graph").unwrap();
let state = lock.read_state().unwrap(); let state = lock.read_state().unwrap();
assert!(state.is_none(), "Reading nonexistent lock file should return None"); assert!(
state.is_none(),
"Reading nonexistent lock file should return None"
);
} }
#[test] #[test]
@ -295,6 +300,9 @@ mod tests {
.unwrap() .unwrap()
.as_millis() as u64; .as_millis() as u64;
assert!(now >= before && now <= after, "now_millis should be between before and after"); assert!(
now >= before && now <= after,
"now_millis should be between before and after"
);
} }
} }

View file

@ -4,71 +4,55 @@ set -euo pipefail
# Navigate to repository root # Navigate to repository root
cd "$(dirname "$0")/.." cd "$(dirname "$0")/.."
# Configuration CONFIG="examples/multihop/config.json"
PORT=3050
DB_PATH="/tmp/databuild_multihop.db"
FLAG_FILE="/tmp/databuild_multihop_alpha_complete" FLAG_FILE="/tmp/databuild_multihop_alpha_complete"
PID_FILE="/tmp/databuild_multihop.pid"
echo "=== DataBuild Multi-Hop Example ===" echo "=== DataBuild Multi-Hop Example ==="
echo echo
# Clean up previous state # Clean up previous state
echo "Cleaning up previous state..." echo "Cleaning up previous state..."
rm -f "$DB_PATH" "$FLAG_FILE" "$PID_FILE" rm -f "$FLAG_FILE"
pkill -f "databuild.*serve.*port $PORT" || true rm -rf .databuild/multihop/
sleep 1 ./bazel-bin/databuild/databuild --config "$CONFIG" stop 2>/dev/null || true
# Build the binary # Build the binary
echo "Building databuild CLI..." echo "Building databuild CLI..."
bazel build //databuild:databuild bazel build //databuild:databuild
# Start the server in background echo
echo "Starting databuild server on port $PORT..." echo "=== Starting server ==="
./bazel-bin/databuild/databuild serve \ echo
--port $PORT \
--database "$DB_PATH" \
--config examples/multihop/config.json &
SERVER_PID=$! # Start the server by making a request (triggers auto-start)
echo $SERVER_PID > "$PID_FILE" OUTPUT=$(./bazel-bin/databuild/databuild --config "$CONFIG" wants list 2>&1)
echo "Server started with PID $SERVER_PID"
# Wait for server to be ready # Extract port from status
echo "Waiting for server to start..." PORT=$(./bazel-bin/databuild/databuild --config "$CONFIG" status 2>&1 | grep "Port:" | awk '{print $2}')
sleep 2
# Test server health
if curl -s http://localhost:$PORT/health > /dev/null 2>&1; then
echo "Server is ready!"
else
echo "WARNING: Server health check failed, but continuing..."
fi
echo echo
echo "=== Server is running ===" echo "Server running at: http://127.0.0.1:${PORT}"
echo echo
echo "You can now interact with the server:" echo "=== Ready to run example ==="
echo
echo "Try the following commands:"
echo
echo " # Check server status"
echo " ./bazel-bin/databuild/databuild --config $CONFIG status"
echo echo
echo " # Create a want for data/beta (triggers dependency chain)" echo " # Create a want for data/beta (triggers dependency chain)"
echo " ./bazel-bin/databuild/databuild --server http://localhost:$PORT want data/beta" echo " ./bazel-bin/databuild/databuild --config $CONFIG want data/beta"
echo echo
echo " # Monitor wants" echo " # Monitor wants"
echo " ./bazel-bin/databuild/databuild --server http://localhost:$PORT wants list" echo " ./bazel-bin/databuild/databuild --config $CONFIG wants list"
echo echo
echo " # Monitor job runs" echo " # Monitor job runs"
echo " ./bazel-bin/databuild/databuild --server http://localhost:$PORT job-runs list" echo " ./bazel-bin/databuild/databuild --config $CONFIG job-runs list"
echo echo
echo " # Monitor partitions" echo " # Monitor partitions"
echo " ./bazel-bin/databuild/databuild --server http://localhost:$PORT partitions list" echo " ./bazel-bin/databuild/databuild --config $CONFIG partitions list"
echo echo
echo "To stop the server:" echo " # Stop the server when done"
echo " kill $SERVER_PID" echo " ./bazel-bin/databuild/databuild --config $CONFIG stop"
echo " # or: pkill -f 'databuild.*serve.*port $PORT'"
echo echo
echo "Server logs will appear below. Press Ctrl+C to stop." echo "==========================================="
echo "=========================================="
echo
# Wait for the server process
wait $SERVER_PID