diff --git a/databuild/build_state.rs b/databuild/build_state.rs index be11681..27a5c35 100644 --- a/databuild/build_state.rs +++ b/databuild/build_state.rs @@ -20,6 +20,7 @@ use crate::{ }; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; +use tracing; /** Design Notes @@ -142,6 +143,14 @@ impl BuildState { let impacted_partition_refs: Vec = md.impacted.iter().map(|p| p.r#ref.clone()).collect(); + tracing::debug!( + derivative_want_id = %derivative_want_id, + source_job_run_id = %source_job_run_id, + missing_partitions = ?derivative_want_partitions.iter().map(|p| &p.r#ref).collect::>(), + impacted_partitions = ?impacted_partition_refs, + "Processing derivative want creation" + ); + // Find all wants that include these impacted partitions // These are the wants that need to wait for the derivative want to complete let mut impacted_want_ids: std::collections::HashSet = @@ -164,6 +173,11 @@ impl BuildState { let transitioned = match want { Want::Building(building) => { // First dep miss for this want: Building → UpstreamBuilding + tracing::info!( + want_id = %want_id, + derivative_want_id = %derivative_want_id, + "Want: Building → UpstreamBuilding (first missing dep detected)" + ); Want::UpstreamBuilding( building.detect_missing_deps(vec![derivative_want_id.to_string()]), ) @@ -171,6 +185,11 @@ impl BuildState { Want::UpstreamBuilding(upstream) => { // Additional dep miss: UpstreamBuilding → UpstreamBuilding (add another upstream) // This can happen if multiple jobs report dep misses for different upstreams + tracing::info!( + want_id = %want_id, + derivative_want_id = %derivative_want_id, + "Want: UpstreamBuilding → UpstreamBuilding (additional upstream added)" + ); Want::UpstreamBuilding( upstream.add_upstreams(vec![derivative_want_id.to_string()]), ) @@ -202,6 +221,11 @@ impl BuildState { let transitioned = match partition { // Valid: Missing -> Building Partition::Missing(missing) => { + tracing::info!( + partition = %building_ref.0.r#ref, + job_run_id = %job_run_id, + "Partition: Missing → Building" + ); Partition::Building(missing.start_building(job_run_id.to_string())) } // Invalid state: partition should not already be Building, Live, Failed, or Tainted @@ -219,6 +243,11 @@ impl BuildState { let missing = Partition::new_missing(building_ref.0.clone()); if let Partition::Missing(m) = missing { let building = m.start_building(job_run_id.to_string()); + tracing::info!( + partition = %building_ref.0.r#ref, + job_run_id = %job_run_id, + "Partition: Missing → Building (created)" + ); self.partitions .insert(building_ref.0.r#ref.clone(), Partition::Building(building)); } @@ -243,6 +272,11 @@ impl BuildState { // ONLY valid transition: Building -> Live let transitioned = match partition { Partition::Building(building) => { + tracing::info!( + partition = %pref.0.r#ref, + job_run_id = %job_run_id, + "Partition: Building → Live" + ); Partition::Live(building.complete(job_run_id.to_string(), timestamp)) } // All other states are invalid @@ -274,6 +308,11 @@ impl BuildState { // ONLY valid transition: Building -> Failed let transitioned = match partition { Partition::Building(building) => { + tracing::info!( + partition = %pref.0.r#ref, + job_run_id = %job_run_id, + "Partition: Building → Failed" + ); Partition::Failed(building.fail(job_run_id.to_string(), timestamp)) } // All other states are invalid @@ -302,7 +341,13 @@ impl BuildState { // Only valid transition: Building -> Missing let transitioned = match partition { - Partition::Building(building) => Partition::Missing(building.reset_to_missing()), + Partition::Building(building) => { + tracing::info!( + partition = %building_ref.0.r#ref, + "Partition: Building → Missing (dep miss)" + ); + Partition::Missing(building.reset_to_missing()) + } // All other states are invalid _ => { panic!( @@ -352,6 +397,11 @@ impl BuildState { if all_partitions_live { let successful_want = building.complete(job_run_id.to_string(), timestamp); + tracing::info!( + want_id = %want_id, + job_run_id = %job_run_id, + "Want: Building → Successful" + ); newly_successful_wants.push(successful_want.get_id()); Want::Successful(successful_want) } else { @@ -429,12 +479,12 @@ impl BuildState { job_run_id: &str, timestamp: u64, ) { - eprintln!( - "DEBUG unblock_downstream_wants: newly_successful_wants={:?}", - newly_successful_wants + tracing::debug!( + newly_successful_wants = ?newly_successful_wants .iter() .map(|w| &w.0) - .collect::>() + .collect::>(), + "Checking downstream wants for unblocking" ); // Find downstream wants that are waiting for any of the newly successful wants // TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants @@ -455,9 +505,9 @@ impl BuildState { } }) .collect(); - eprintln!( - "DEBUG downstream_wants_to_check={:?}", - downstream_wants_to_check + tracing::debug!( + downstream_wants_to_check = ?downstream_wants_to_check, + "Found downstream wants affected by upstream completion" ); for want_id in downstream_wants_to_check { @@ -468,9 +518,10 @@ impl BuildState { let transitioned = match want { Want::UpstreamBuilding(downstream_want) => { - eprintln!( - "DEBUG checking want_id={}, upstreams={:?}", - want_id, downstream_want.state.upstream_want_ids + tracing::debug!( + want_id = %want_id, + upstreams = ?downstream_want.state.upstream_want_ids, + "Checking if all upstreams are satisfied" ); // Check if ALL of this downstream want's upstream dependencies are now Successful let all_upstreams_successful = downstream_want @@ -483,9 +534,10 @@ impl BuildState { .map(|w| matches!(w, Want::Successful(_))) .unwrap_or(false) }); - eprintln!( - "DEBUG all_upstreams_successful={}", - all_upstreams_successful + tracing::debug!( + want_id = %want_id, + all_upstreams_successful = %all_upstreams_successful, + "Upstream satisfaction check complete" ); if all_upstreams_successful { @@ -499,26 +551,37 @@ impl BuildState { .map(|partition| matches!(partition, Partition::Building(_))) .unwrap_or(false) }); - eprintln!( - "DEBUG any_partition_building={}", - any_partition_building + tracing::debug!( + want_id = %want_id, + any_partition_building = %any_partition_building, + "Partition building status check" ); if any_partition_building { // Some partitions still being built, continue in Building state - eprintln!("DEBUG -> Building"); + tracing::info!( + want_id = %want_id, + job_run_id = %job_run_id, + "Want: UpstreamBuilding → Building (upstreams satisfied, partitions building)" + ); Want::Building( downstream_want .continue_building(job_run_id.to_string(), timestamp), ) } else { // No partitions being built, become schedulable again - eprintln!("DEBUG -> Idle"); + tracing::info!( + want_id = %want_id, + "Want: UpstreamBuilding → Idle (upstreams satisfied, ready to schedule)" + ); Want::Idle(downstream_want.upstreams_satisfied()) } } else { // Upstreams not all satisfied yet, stay in UpstreamBuilding - eprintln!("DEBUG -> UpstreamBuilding (stay)"); + tracing::debug!( + want_id = %want_id, + "Want remains in UpstreamBuilding state (upstreams not yet satisfied)" + ); Want::UpstreamBuilding(downstream_want) } } @@ -608,6 +671,25 @@ impl BuildState { fn handle_want_create(&mut self, event: &WantCreateEventV1) -> Vec { // Use From impl to create want in Idle state let want_idle: WantWithState = event.clone().into(); + + // Log creation with derivative vs user-created distinction + if let Some(source) = &event.source { + if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source { + tracing::info!( + want_id = %event.want_id, + partitions = ?event.partitions.iter().map(|p| &p.r#ref).collect::>(), + source_job_run_id = %job_triggered.job_run_id, + "Want created (derivative - auto-created due to missing dependency)" + ); + } + } else { + tracing::info!( + want_id = %event.want_id, + partitions = ?event.partitions.iter().map(|p| &p.r#ref).collect::>(), + "Want created (user-requested)" + ); + } + self.wants .insert(event.want_id.clone(), Want::Idle(want_idle)); @@ -694,6 +776,11 @@ impl BuildState { let transitioned = match want { Want::Idle(idle) => { // First job starting for this want + tracing::info!( + want_id = %wap.want_id, + job_run_id = %event.job_run_id, + "Want: Idle → Building (job scheduled)" + ); Want::Building(idle.start_building(current_timestamp())) } Want::Building(building) => { @@ -735,7 +822,13 @@ impl BuildState { let running = match job_run { // First heartbeat: Queued -> Running - JobRun::Queued(queued) => queued.start_running(current_timestamp()), + JobRun::Queued(queued) => { + tracing::info!( + job_run_id = %event.job_run_id, + "JobRun: Queued → Running" + ); + queued.start_running(current_timestamp()) + } // Subsequent heartbeat: update timestamp JobRun::Running(running) => running.heartbeat(current_timestamp()), _ => { @@ -758,7 +851,13 @@ impl BuildState { )); let succeeded = match job_run { - JobRun::Running(running) => running.succeed(current_timestamp()), + JobRun::Running(running) => { + tracing::info!( + job_run_id = %event.job_run_id, + "JobRun: Running → Succeeded" + ); + running.succeed(current_timestamp()) + } _ => { panic!( "BUG: Success event received for job run {} in invalid state {:?}. Job must be Running to succeed.", @@ -803,7 +902,14 @@ impl BuildState { )); let failed = match job_run { - JobRun::Running(running) => running.fail(current_timestamp(), event.reason.clone()), + JobRun::Running(running) => { + tracing::info!( + job_run_id = %event.job_run_id, + reason = %event.reason, + "JobRun: Running → Failed" + ); + running.fail(current_timestamp(), event.reason.clone()) + } _ => { panic!( "BUG: Failure event received for job run {} in invalid state {:?}. Job must be Running to fail.", @@ -875,11 +981,20 @@ impl BuildState { )); let dep_miss = match job_run { - JobRun::Running(running) => running.dep_miss( - current_timestamp(), - event.missing_deps.clone(), - event.read_deps.clone(), - ), + JobRun::Running(running) => { + tracing::info!( + job_run_id = %event.job_run_id, + missing_deps = ?event.missing_deps.iter() + .flat_map(|md| md.missing.iter().map(|p| &p.r#ref)) + .collect::>(), + "JobRun: Running → DepMiss (missing dependencies detected)" + ); + running.dep_miss( + current_timestamp(), + event.missing_deps.clone(), + event.read_deps.clone(), + ) + } _ => { panic!( "BUG: DepMiss event received for job run {} in invalid state {:?}. Job must be Running to hit dep miss.", diff --git a/databuild/http_server.rs b/databuild/http_server.rs index cab1b25..acc1db5 100644 --- a/databuild/http_server.rs +++ b/databuild/http_server.rs @@ -15,7 +15,7 @@ use axum::{ routing::{delete, get, post}, }; use std::sync::{ - Arc, + Arc, RwLock, atomic::{AtomicU64, Ordering}, }; use std::time::{SystemTime, UNIX_EPOCH}; @@ -25,7 +25,9 @@ use tower_http::cors::CorsLayer; /// Shared application state for HTTP handlers #[derive(Clone)] pub struct AppState { - /// Shared read-only access to BEL storage (for reconstructing state) + /// Shared read-only access to build state (maintained by orchestrator) + pub build_state: Arc>, + /// Shared read-only access to BEL storage (for event log queries) pub bel_storage: Arc, /// Command sender for write operations (sends to orchestrator) pub command_tx: mpsc::Sender, @@ -37,6 +39,7 @@ pub struct AppState { impl AppState { pub fn new( + build_state: Arc>, bel_storage: Arc, command_tx: mpsc::Sender, shutdown_tx: broadcast::Sender<()>, @@ -48,6 +51,7 @@ impl AppState { .as_millis() as u64; Self { + build_state, bel_storage, command_tx, last_request_time: Arc::new(AtomicU64::new(now)), @@ -147,23 +151,8 @@ async fn list_wants( State(state): State, Query(params): Query, ) -> impl IntoResponse { - // Read all events from storage - let events = match state.bel_storage.list_events(0, 100000) { - Ok(events) => events, - Err(e) => { - tracing::error!("Failed to read events from BEL storage: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ErrorResponse::new(format!("Failed to read events: {}", e))), - ) - .into_response(); - } - }; - - // Reconstruct state from events - let build_state = BuildState::from_events(&events); - - // Use existing API method + // Read from shared build state + let build_state = state.build_state.read().unwrap(); let response = build_state.list_wants(¶ms); (StatusCode::OK, Json(response)).into_response() @@ -171,24 +160,9 @@ async fn list_wants( /// Get a specific want by ID async fn get_want(State(state): State, Path(want_id): Path) -> impl IntoResponse { - let events = match state.bel_storage.list_events(0, 100000) { - Ok(events) => events, - Err(e) => { - tracing::error!("Failed to read events from BEL storage: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ErrorResponse::new(format!("Failed to read events: {}", e))), - ) - .into_response(); - } - }; - - let build_state = BuildState::from_events(&events); - - let req = GetWantRequest { - want_id: want_id.clone(), - }; - let response = build_state.get_want(&req.want_id); + // Read from shared build state + let build_state = state.build_state.read().unwrap(); + let response = build_state.get_want(&want_id); match response { Some(want) => (StatusCode::OK, Json(GetWantResponse { data: Some(want) })).into_response(), @@ -327,19 +301,8 @@ async fn list_partitions( State(state): State, Query(params): Query, ) -> impl IntoResponse { - let events = match state.bel_storage.list_events(0, 100000) { - Ok(events) => events, - Err(e) => { - tracing::error!("Failed to read events from BEL storage: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ErrorResponse::new(format!("Failed to read events: {}", e))), - ) - .into_response(); - } - }; - - let build_state = BuildState::from_events(&events); + // Read from shared build state + let build_state = state.build_state.read().unwrap(); let response = build_state.list_partitions(¶ms); (StatusCode::OK, Json(response)).into_response() @@ -350,19 +313,8 @@ async fn list_job_runs( State(state): State, Query(params): Query, ) -> impl IntoResponse { - let events = match state.bel_storage.list_events(0, 100000) { - Ok(events) => events, - Err(e) => { - tracing::error!("Failed to read events from BEL storage: {}", e); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(ErrorResponse::new(format!("Failed to read events: {}", e))), - ) - .into_response(); - } - }; - - let build_state = BuildState::from_events(&events); + // Read from shared build state + let build_state = state.build_state.read().unwrap(); let response = build_state.list_job_runs(¶ms); (StatusCode::OK, Json(response)).into_response()