From be2b15de5e93c00607d639bdfc1fa039638b970a Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Sat, 22 Nov 2025 21:53:06 +0800 Subject: [PATCH] implemented api phase 5 --- databuild/cli_main.rs | 56 ++++++++++++++++++++++++++++++++++++---- databuild/http_server.rs | 53 ++++++++++++++++++++++++++++++++----- databuild/server_main.rs | 56 ++++++++++++++++++++++++++++++++++++---- 3 files changed, 148 insertions(+), 17 deletions(-) diff --git a/databuild/cli_main.rs b/databuild/cli_main.rs index 7c4d869..5b7113b 100644 --- a/databuild/cli_main.rs +++ b/databuild/cli_main.rs @@ -3,7 +3,8 @@ use databuild::build_event_log::SqliteBELStorage; use databuild::http_server::{create_router, AppState}; use reqwest::blocking::Client; use std::sync::Arc; -use tokio::sync::mpsc; +use std::time::Duration; +use tokio::sync::{broadcast, mpsc}; #[derive(Parser)] #[command(name = "databuild")] @@ -120,18 +121,29 @@ async fn cmd_serve(port: u16, database: &str) { // Create command channel for orchestrator communication let (command_tx, command_rx) = mpsc::channel(100); + // Create shutdown broadcast channel + let (shutdown_tx, _shutdown_rx) = broadcast::channel(1); + // Spawn orchestrator in background thread // Note: Orchestrator needs its own BEL storage instance for writes let orch_bel_storage = SqliteBELStorage::create(database).expect("Failed to create BEL storage"); + let orch_shutdown_rx = shutdown_tx.subscribe(); let orch_handle = std::thread::spawn(move || { use databuild::orchestrator::{Orchestrator, OrchestratorConfig}; // Create orchestrator with command channel let config = OrchestratorConfig::default(); let mut orchestrator = Orchestrator::new_with_commands(orch_bel_storage, config, command_rx); + let mut shutdown_rx = orch_shutdown_rx; // Run orchestrator loop loop { + // Check for shutdown signal + if shutdown_rx.try_recv().is_ok() { + println!("Orchestrator received shutdown signal"); + break; + } + if let Err(e) = orchestrator.step() { eprintln!("Orchestrator error: {}", e); } @@ -140,8 +152,34 @@ async fn cmd_serve(port: u16, database: &str) { } }); - // Create app state with shared storage and command sender - let state = AppState::new(bel_storage, command_tx); + // Create app state with shared storage, command sender, and shutdown channel + let state = AppState::new(bel_storage, command_tx, shutdown_tx.clone()); + + // Spawn idle timeout checker task + let idle_state = state.clone(); + let idle_shutdown_tx = shutdown_tx.clone(); + tokio::spawn(async move { + let idle_timeout = Duration::from_secs(3 * 60 * 60); // 3 hours + + loop { + tokio::time::sleep(Duration::from_secs(60)).await; + + let last_request = idle_state.last_request_time.load(std::sync::atomic::Ordering::Relaxed); + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + + if now - last_request > idle_timeout.as_millis() as u64 { + eprintln!( + "Server idle for {} hours, shutting down", + idle_timeout.as_secs() / 3600 + ); + let _ = idle_shutdown_tx.send(()); + break; + } + } + }); // Create router let app = create_router(state); @@ -160,13 +198,21 @@ async fn cmd_serve(port: u16, database: &str) { println!(" GET /api/partitions"); println!(" GET /api/job_runs"); - // Run the server + // Subscribe to shutdown signal for graceful shutdown + let mut server_shutdown_rx = shutdown_tx.subscribe(); + + // Run the server with graceful shutdown axum::serve(listener, app) + .with_graceful_shutdown(async move { + let _ = server_shutdown_rx.recv().await; + println!("HTTP server received shutdown signal"); + }) .await .expect("Server error"); - // Wait for orchestrator (this will never actually happen in normal operation) + // Wait for orchestrator to finish let _ = orch_handle.join(); + println!("Shutdown complete"); } fn cmd_want(server_url: &str, partitions: Vec) { diff --git a/databuild/http_server.rs b/databuild/http_server.rs index 3f7b3d7..77fb654 100644 --- a/databuild/http_server.rs +++ b/databuild/http_server.rs @@ -7,14 +7,19 @@ use crate::{ ListWantsRequest, ListWantsResponse, }; use axum::{ - extract::{Path, Query, State}, + extract::{Path, Query, Request, State}, http::StatusCode, - response::IntoResponse, + middleware::{self, Next}, + response::{IntoResponse, Response}, routing::{delete, get, post}, Json, Router, }; -use std::sync::{atomic::AtomicU64, Arc}; -use tokio::sync::{mpsc, oneshot}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::sync::{broadcast, mpsc, oneshot}; /// Shared application state for HTTP handlers #[derive(Clone)] @@ -23,20 +28,49 @@ pub struct AppState { pub bel_storage: Arc, /// Command sender for write operations (sends to orchestrator) pub command_tx: mpsc::Sender, - /// For idle timeout tracking (Phase 5) + /// For idle timeout tracking (epoch millis) pub last_request_time: Arc, + /// Broadcast channel for shutdown signal + pub shutdown_tx: broadcast::Sender<()>, } impl AppState { - pub fn new(bel_storage: Arc, command_tx: mpsc::Sender) -> Self { + pub fn new( + bel_storage: Arc, + command_tx: mpsc::Sender, + shutdown_tx: broadcast::Sender<()>, + ) -> Self { + // Initialize last_request_time to current time + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + Self { bel_storage, command_tx, - last_request_time: Arc::new(AtomicU64::new(0)), + last_request_time: Arc::new(AtomicU64::new(now)), + shutdown_tx, } } } +/// Middleware to update last request time +async fn update_last_request_time( + State(state): State, + req: Request, + next: Next, +) -> Response { + state.last_request_time.store( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64, + Ordering::Relaxed, + ); + next.run(req).await +} + /// Create the Axum router with all endpoints pub fn create_router(state: AppState) -> Router { Router::new() @@ -51,6 +85,11 @@ pub fn create_router(state: AppState) -> Router { .route("/api/partitions", get(list_partitions)) // Job run endpoints .route("/api/job_runs", get(list_job_runs)) + // Add middleware to track request time + .layer(middleware::from_fn_with_state( + state.clone(), + update_last_request_time, + )) .with_state(state) } diff --git a/databuild/server_main.rs b/databuild/server_main.rs index 015553d..7f93dfb 100644 --- a/databuild/server_main.rs +++ b/databuild/server_main.rs @@ -1,7 +1,8 @@ use databuild::build_event_log::SqliteBELStorage; use databuild::http_server::{create_router, AppState}; use std::sync::Arc; -use tokio::sync::mpsc; +use std::time::Duration; +use tokio::sync::{broadcast, mpsc}; #[tokio::main] async fn main() { @@ -20,15 +21,26 @@ async fn main() { // Create command channel for orchestrator communication let (command_tx, command_rx) = mpsc::channel(100); + // Create shutdown broadcast channel + let (shutdown_tx, _shutdown_rx) = broadcast::channel(1); + // Spawn orchestrator in background thread let orch_bel_storage = SqliteBELStorage::create(database).expect("Failed to create BEL storage"); + let orch_shutdown_rx = shutdown_tx.subscribe(); let orch_handle = std::thread::spawn(move || { // Create orchestrator with command channel let config = OrchestratorConfig::default(); let mut orchestrator = Orchestrator::new_with_commands(orch_bel_storage, config, command_rx); + let mut shutdown_rx = orch_shutdown_rx; // Run orchestrator loop loop { + // Check for shutdown signal + if shutdown_rx.try_recv().is_ok() { + println!("Orchestrator received shutdown signal"); + break; + } + if let Err(e) = orchestrator.step() { eprintln!("Orchestrator error: {}", e); } @@ -37,8 +49,34 @@ async fn main() { } }); - // Create app state with shared storage and command sender - let state = AppState::new(bel_storage, command_tx); + // Create app state with shared storage, command sender, and shutdown channel + let state = AppState::new(bel_storage, command_tx, shutdown_tx.clone()); + + // Spawn idle timeout checker task + let idle_state = state.clone(); + let idle_shutdown_tx = shutdown_tx.clone(); + tokio::spawn(async move { + let idle_timeout = Duration::from_secs(3 * 60 * 60); // 3 hours + + loop { + tokio::time::sleep(Duration::from_secs(60)).await; + + let last_request = idle_state.last_request_time.load(std::sync::atomic::Ordering::Relaxed); + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + + if now - last_request > idle_timeout.as_millis() as u64 { + eprintln!( + "Server idle for {} hours, shutting down", + idle_timeout.as_secs() / 3600 + ); + let _ = idle_shutdown_tx.send(()); + break; + } + } + }); // Create router let app = create_router(state); @@ -56,11 +94,19 @@ async fn main() { println!(" GET /api/partitions"); println!(" GET /api/job_runs"); - // Run the server + // Subscribe to shutdown signal for graceful shutdown + let mut server_shutdown_rx = shutdown_tx.subscribe(); + + // Run the server with graceful shutdown axum::serve(listener, app) + .with_graceful_shutdown(async move { + let _ = server_shutdown_rx.recv().await; + println!("HTTP server received shutdown signal"); + }) .await .expect("Server error"); - // Wait for orchestrator (this will never actually happen in normal operation) + // Wait for orchestrator to finish let _ = orch_handle.join(); + println!("Shutdown complete"); }