From b978be53f597563be4288b31b1bb28dd199c1dcc Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Sun, 23 Nov 2025 11:19:02 +0800 Subject: [PATCH] fix server panics related to not broadcasting derivative wants --- databuild/build_event_log.rs | 27 +++++++++++++-- databuild/cli_main.rs | 11 ++++-- databuild/http_server.rs | 56 +++++++++++++++++++++++++++--- databuild/orchestrator.rs | 67 ++++++++++++++++++++++++------------ 4 files changed, 131 insertions(+), 30 deletions(-) diff --git a/databuild/build_event_log.rs b/databuild/build_event_log.rs index 2ca3e0e..2c782dc 100644 --- a/databuild/build_event_log.rs +++ b/databuild/build_event_log.rs @@ -238,16 +238,34 @@ impl BELStorage for SqliteBELStorage { pub struct BuildEventLog { pub storage: S, pub state: BuildState, + /// Optional event broadcaster for HTTP server mirroring + #[cfg_attr(not(feature = "server"), allow(dead_code))] + pub event_broadcaster: Option>, } impl BuildEventLog { pub fn new(storage: S, state: BuildState) -> BuildEventLog { - BuildEventLog { storage, state } + BuildEventLog { + storage, + state, + event_broadcaster: None, + } + } + + pub fn with_broadcaster(mut self, broadcaster: tokio::sync::broadcast::Sender) -> Self { + self.event_broadcaster = Some(broadcaster); + self } pub fn append_event(&mut self, event: &Event) -> Result { let events = self.state.handle_event(&event); let idx = self.storage.append_event(event)?; + + // Broadcast event to HTTP server (if configured) + if let Some(ref tx) = self.event_broadcaster { + let _ = tx.send(event.clone()); + } + // Recursion here might be dangerous, but in theory the event propagation always terminates for event in events { self.append_event(&event)?; @@ -327,6 +345,7 @@ impl Clone for BuildEventLog { Self { storage: self.storage.clone(), state: self.state.clone(), + event_broadcaster: self.event_broadcaster.clone(), } } } @@ -344,7 +363,11 @@ mod tests { let storage = SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage"); let state = BuildState::default(); - let mut log = BuildEventLog { storage, state }; + let mut log = BuildEventLog { + storage, + state, + event_broadcaster: None, + }; let want_id = "sqlite_test_1234".to_string(); diff --git a/databuild/cli_main.rs b/databuild/cli_main.rs index 5595a30..fc163dc 100644 --- a/databuild/cli_main.rs +++ b/databuild/cli_main.rs @@ -158,8 +158,15 @@ async fn cmd_serve(port: u16, database: &str, config_path: Option<&str>) { let mut mirror_rx = event_tx.subscribe(); tokio::spawn(async move { while let Ok(event) = mirror_rx.recv().await { - let mut state = mirror_clone.write().unwrap(); - state.handle_event(&event); + match mirror_clone.write() { + Ok(mut state) => { + state.handle_event(&event); + } + Err(e) => { + eprintln!("State mirror task: RwLock poisoned, cannot update state: {}", e); + break; + } + } } }); diff --git a/databuild/http_server.rs b/databuild/http_server.rs index bffd884..4e49da2 100644 --- a/databuild/http_server.rs +++ b/databuild/http_server.rs @@ -152,7 +152,19 @@ async fn list_wants( Query(params): Query, ) -> impl IntoResponse { // Read from shared build state - let build_state = state.build_state.read().unwrap(); + let build_state = match state.build_state.read() { + Ok(state) => state, + Err(e) => { + tracing::error!("Failed to acquire read lock on build state: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse::new( + "Internal server error: state lock poisoned", + )), + ) + .into_response(); + } + }; let response = build_state.list_wants(¶ms); (StatusCode::OK, Json(response)).into_response() @@ -161,7 +173,19 @@ async fn list_wants( /// Get a specific want by ID async fn get_want(State(state): State, Path(want_id): Path) -> impl IntoResponse { // Read from shared build state - let build_state = state.build_state.read().unwrap(); + let build_state = match state.build_state.read() { + Ok(state) => state, + Err(e) => { + tracing::error!("Failed to acquire read lock on build state: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse::new( + "Internal server error: state lock poisoned", + )), + ) + .into_response(); + } + }; let response = build_state.get_want(&want_id); match response { @@ -302,7 +326,19 @@ async fn list_partitions( Query(params): Query, ) -> impl IntoResponse { // Read from shared build state - let build_state = state.build_state.read().unwrap(); + let build_state = match state.build_state.read() { + Ok(state) => state, + Err(e) => { + tracing::error!("Failed to acquire read lock on build state: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse::new( + "Internal server error: state lock poisoned", + )), + ) + .into_response(); + } + }; let response = build_state.list_partitions(¶ms); (StatusCode::OK, Json(response)).into_response() @@ -314,7 +350,19 @@ async fn list_job_runs( Query(params): Query, ) -> impl IntoResponse { // Read from shared build state - let build_state = state.build_state.read().unwrap(); + let build_state = match state.build_state.read() { + Ok(state) => state, + Err(e) => { + tracing::error!("Failed to acquire read lock on build state: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse::new( + "Internal server error: state lock poisoned", + )), + ) + .into_response(); + } + }; let response = build_state.list_job_runs(¶ms); (StatusCode::OK, Json(response)).into_response() diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index 27924cd..a27822d 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -164,7 +164,7 @@ impl Orchestrator { event_tx: broadcast::Sender, ) -> Self { Self { - bel: BuildEventLog::new(storage, Default::default()), + bel: BuildEventLog::new(storage, Default::default()).with_broadcaster(event_tx.clone()), config, job_runs: Vec::new(), command_rx: Some(command_rx), @@ -173,14 +173,9 @@ impl Orchestrator { } } - /// Append event to BEL and broadcast to HTTP server (if configured) + /// Append event to BEL (which handles state mutation, storage, broadcasting, and cascading events) fn append_and_broadcast(&mut self, event: &Event) -> Result { - let idx = self.bel.append_event(event)?; - if let Some(tx) = &self.event_tx { - // Ignore send errors (no receivers is fine) - let _ = tx.send(event.clone()); - } - Ok(idx) + self.bel.append_event(event) } fn schedule_queued_jobs(&mut self) -> Result<(), DatabuildError> { @@ -241,23 +236,32 @@ impl Orchestrator { let transitioned = match job { JobRunHandle::Running(running) => match running.visit()? { VisitResult::StillRunning(still_running) => { - println!("Still running job: {:?}", still_running.job_run_id); + tracing::debug!( + job_run_id = still_running.job_run_id.to_string(), + "Job still running" + ); JobRunHandle::Running(still_running) } VisitResult::Completed(completed) => { - println!("Completed job: {:?}", completed.job_run_id); + tracing::debug!( + job_run_id = completed.job_run_id.to_string(), + "Completed job" + ); let event = completed.state.to_event(&completed.job_run_id); events_to_append.push(event); JobRunHandle::Completed(completed) } VisitResult::Failed(failed) => { - println!("Failed job: {:?}", failed.job_run_id); + tracing::debug!(job_run_id = failed.job_run_id.to_string(), "Failed job"); let event = failed.state.to_event(&failed.job_run_id); events_to_append.push(event); JobRunHandle::Failed(failed) } VisitResult::DepMiss(dep_miss) => { - println!("Dep miss job: {:?}", dep_miss.job_run_id); + tracing::debug!( + job_run_id = dep_miss.job_run_id.to_string(), + "Job dep miss" + ); let event = dep_miss.state.to_event(&dep_miss.job_run_id); events_to_append.push(event); JobRunHandle::DepMiss(dep_miss) @@ -357,18 +361,37 @@ impl Orchestrator { /// Process all pending commands from the HTTP server fn process_commands(&mut self) -> Result<(), DatabuildError> { + // TODO Handle command failure gracefully - what's story there? + // Collect all pending commands first (to avoid borrow checker issues) + let mut commands = Vec::new(); if let Some(ref mut rx) = self.command_rx { - // Process all pending commands (non-blocking) while let Ok(cmd) = rx.try_recv() { - match cmd { - Command::CreateWant { request, reply } => { - let response = self.bel.api_handle_want_create(request); - let _ = reply.send(response); // Ignore send errors - } - Command::CancelWant { request, reply } => { - let response = self.bel.api_handle_want_cancel(request); - let _ = reply.send(response); // Ignore send errors - } + commands.push(cmd); + } + } + + // Now process all collected commands + for cmd in commands { + match cmd { + Command::CreateWant { request, reply } => { + // Convert request to event and broadcast it (so HTTP server receives it) + let event: crate::WantCreateEventV1 = request.into(); + let result = self + .append_and_broadcast(&crate::data_build_event::Event::WantCreateV1( + event.clone(), + )) + .map(|_| self.bel.state.get_want(&event.want_id).into()); + let _ = reply.send(result); // Ignore send errors + } + Command::CancelWant { request, reply } => { + // Convert request to event and broadcast it (so HTTP server receives it) + let event: crate::WantCancelEventV1 = request.into(); + let result = self + .append_and_broadcast(&crate::data_build_event::Event::WantCancelV1( + event.clone(), + )) + .map(|_| self.bel.state.get_want(&event.want_id).into()); + let _ = reply.send(result); // Ignore send errors } } }