implemented api phase 5

This commit is contained in:
Stuart Axelbrooke 2025-11-22 21:53:06 +08:00
parent da23af3227
commit be2b15de5e
3 changed files with 148 additions and 17 deletions

View file

@ -3,7 +3,8 @@ use databuild::build_event_log::SqliteBELStorage;
use databuild::http_server::{create_router, AppState};
use reqwest::blocking::Client;
use std::sync::Arc;
use tokio::sync::mpsc;
use std::time::Duration;
use tokio::sync::{broadcast, mpsc};
#[derive(Parser)]
#[command(name = "databuild")]
@ -120,18 +121,29 @@ async fn cmd_serve(port: u16, database: &str) {
// Create command channel for orchestrator communication
let (command_tx, command_rx) = mpsc::channel(100);
// Create shutdown broadcast channel
let (shutdown_tx, _shutdown_rx) = broadcast::channel(1);
// Spawn orchestrator in background thread
// Note: Orchestrator needs its own BEL storage instance for writes
let orch_bel_storage = SqliteBELStorage::create(database).expect("Failed to create BEL storage");
let orch_shutdown_rx = shutdown_tx.subscribe();
let orch_handle = std::thread::spawn(move || {
use databuild::orchestrator::{Orchestrator, OrchestratorConfig};
// Create orchestrator with command channel
let config = OrchestratorConfig::default();
let mut orchestrator = Orchestrator::new_with_commands(orch_bel_storage, config, command_rx);
let mut shutdown_rx = orch_shutdown_rx;
// Run orchestrator loop
loop {
// Check for shutdown signal
if shutdown_rx.try_recv().is_ok() {
println!("Orchestrator received shutdown signal");
break;
}
if let Err(e) = orchestrator.step() {
eprintln!("Orchestrator error: {}", e);
}
@ -140,8 +152,34 @@ async fn cmd_serve(port: u16, database: &str) {
}
});
// Create app state with shared storage and command sender
let state = AppState::new(bel_storage, command_tx);
// Create app state with shared storage, command sender, and shutdown channel
let state = AppState::new(bel_storage, command_tx, shutdown_tx.clone());
// Spawn idle timeout checker task
let idle_state = state.clone();
let idle_shutdown_tx = shutdown_tx.clone();
tokio::spawn(async move {
let idle_timeout = Duration::from_secs(3 * 60 * 60); // 3 hours
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
let last_request = idle_state.last_request_time.load(std::sync::atomic::Ordering::Relaxed);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
if now - last_request > idle_timeout.as_millis() as u64 {
eprintln!(
"Server idle for {} hours, shutting down",
idle_timeout.as_secs() / 3600
);
let _ = idle_shutdown_tx.send(());
break;
}
}
});
// Create router
let app = create_router(state);
@ -160,13 +198,21 @@ async fn cmd_serve(port: u16, database: &str) {
println!(" GET /api/partitions");
println!(" GET /api/job_runs");
// Run the server
// Subscribe to shutdown signal for graceful shutdown
let mut server_shutdown_rx = shutdown_tx.subscribe();
// Run the server with graceful shutdown
axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = server_shutdown_rx.recv().await;
println!("HTTP server received shutdown signal");
})
.await
.expect("Server error");
// Wait for orchestrator (this will never actually happen in normal operation)
// Wait for orchestrator to finish
let _ = orch_handle.join();
println!("Shutdown complete");
}
fn cmd_want(server_url: &str, partitions: Vec<String>) {

View file

@ -7,14 +7,19 @@ use crate::{
ListWantsRequest, ListWantsResponse,
};
use axum::{
extract::{Path, Query, State},
extract::{Path, Query, Request, State},
http::StatusCode,
response::IntoResponse,
middleware::{self, Next},
response::{IntoResponse, Response},
routing::{delete, get, post},
Json, Router,
};
use std::sync::{atomic::AtomicU64, Arc};
use tokio::sync::{mpsc, oneshot};
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::{broadcast, mpsc, oneshot};
/// Shared application state for HTTP handlers
#[derive(Clone)]
@ -23,20 +28,49 @@ pub struct AppState {
pub bel_storage: Arc<dyn BELStorage>,
/// Command sender for write operations (sends to orchestrator)
pub command_tx: mpsc::Sender<Command>,
/// For idle timeout tracking (Phase 5)
/// For idle timeout tracking (epoch millis)
pub last_request_time: Arc<AtomicU64>,
/// Broadcast channel for shutdown signal
pub shutdown_tx: broadcast::Sender<()>,
}
impl AppState {
pub fn new(bel_storage: Arc<dyn BELStorage>, command_tx: mpsc::Sender<Command>) -> Self {
pub fn new(
bel_storage: Arc<dyn BELStorage>,
command_tx: mpsc::Sender<Command>,
shutdown_tx: broadcast::Sender<()>,
) -> Self {
// Initialize last_request_time to current time
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Self {
bel_storage,
command_tx,
last_request_time: Arc::new(AtomicU64::new(0)),
last_request_time: Arc::new(AtomicU64::new(now)),
shutdown_tx,
}
}
}
/// Middleware to update last request time
async fn update_last_request_time(
State(state): State<AppState>,
req: Request,
next: Next,
) -> Response {
state.last_request_time.store(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);
next.run(req).await
}
/// Create the Axum router with all endpoints
pub fn create_router(state: AppState) -> Router {
Router::new()
@ -51,6 +85,11 @@ pub fn create_router(state: AppState) -> Router {
.route("/api/partitions", get(list_partitions))
// Job run endpoints
.route("/api/job_runs", get(list_job_runs))
// Add middleware to track request time
.layer(middleware::from_fn_with_state(
state.clone(),
update_last_request_time,
))
.with_state(state)
}

View file

@ -1,7 +1,8 @@
use databuild::build_event_log::SqliteBELStorage;
use databuild::http_server::{create_router, AppState};
use std::sync::Arc;
use tokio::sync::mpsc;
use std::time::Duration;
use tokio::sync::{broadcast, mpsc};
#[tokio::main]
async fn main() {
@ -20,15 +21,26 @@ async fn main() {
// Create command channel for orchestrator communication
let (command_tx, command_rx) = mpsc::channel(100);
// Create shutdown broadcast channel
let (shutdown_tx, _shutdown_rx) = broadcast::channel(1);
// Spawn orchestrator in background thread
let orch_bel_storage = SqliteBELStorage::create(database).expect("Failed to create BEL storage");
let orch_shutdown_rx = shutdown_tx.subscribe();
let orch_handle = std::thread::spawn(move || {
// Create orchestrator with command channel
let config = OrchestratorConfig::default();
let mut orchestrator = Orchestrator::new_with_commands(orch_bel_storage, config, command_rx);
let mut shutdown_rx = orch_shutdown_rx;
// Run orchestrator loop
loop {
// Check for shutdown signal
if shutdown_rx.try_recv().is_ok() {
println!("Orchestrator received shutdown signal");
break;
}
if let Err(e) = orchestrator.step() {
eprintln!("Orchestrator error: {}", e);
}
@ -37,8 +49,34 @@ async fn main() {
}
});
// Create app state with shared storage and command sender
let state = AppState::new(bel_storage, command_tx);
// Create app state with shared storage, command sender, and shutdown channel
let state = AppState::new(bel_storage, command_tx, shutdown_tx.clone());
// Spawn idle timeout checker task
let idle_state = state.clone();
let idle_shutdown_tx = shutdown_tx.clone();
tokio::spawn(async move {
let idle_timeout = Duration::from_secs(3 * 60 * 60); // 3 hours
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
let last_request = idle_state.last_request_time.load(std::sync::atomic::Ordering::Relaxed);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
if now - last_request > idle_timeout.as_millis() as u64 {
eprintln!(
"Server idle for {} hours, shutting down",
idle_timeout.as_secs() / 3600
);
let _ = idle_shutdown_tx.send(());
break;
}
}
});
// Create router
let app = create_router(state);
@ -56,11 +94,19 @@ async fn main() {
println!(" GET /api/partitions");
println!(" GET /api/job_runs");
// Run the server
// Subscribe to shutdown signal for graceful shutdown
let mut server_shutdown_rx = shutdown_tx.subscribe();
// Run the server with graceful shutdown
axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = server_shutdown_rx.recv().await;
println!("HTTP server received shutdown signal");
})
.await
.expect("Server error");
// Wait for orchestrator (this will never actually happen in normal operation)
// Wait for orchestrator to finish
let _ = orch_handle.join();
println!("Shutdown complete");
}