diff --git a/databuild/build_event_log.rs b/databuild/build_event_log.rs index 894cdef..2ca3e0e 100644 --- a/databuild/build_event_log.rs +++ b/databuild/build_event_log.rs @@ -255,6 +255,13 @@ impl BuildEventLog { Ok(idx) } + pub fn append_event_no_recurse(&mut self, event: &Event) -> Result { + self.state.handle_event(&event); + let idx = self.storage.append_event(event)?; + // Recursion here might be dangerous, but in theory the event propagation always terminates + Ok(idx) + } + // API methods pub fn api_handle_list_wants(&self, req: ListWantsRequest) -> ListWantsResponse { self.state.list_wants(&req) diff --git a/databuild/build_state.rs b/databuild/build_state.rs index 893e6cb..be11681 100644 --- a/databuild/build_state.rs +++ b/databuild/build_state.rs @@ -1,5 +1,6 @@ use crate::data_build_event::Event; use crate::data_deps::{WantTimestamps, missing_deps_to_want_events}; +use crate::event_source::Source as EventSourceVariant; use crate::job_run_state::{JobRun, JobRunWithState, QueuedState as JobQueuedState}; use crate::partition_state::{ BuildingPartitionRef, FailedPartitionRef, LivePartitionRef, MissingPartitionRef, MissingState, @@ -45,7 +46,7 @@ critical that these state machines, their states, and their transitions are type /// Tracks all application state, defines valid state transitions, and manages cross-state machine /// state transitions (e.g. job run success resulting in partition going from Building to Live) -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct BuildState { wants: BTreeMap, // Type-safe want storage taints: BTreeMap, @@ -53,17 +54,6 @@ pub struct BuildState { job_runs: BTreeMap, // Type-safe job run storage } -impl Default for BuildState { - fn default() -> Self { - Self { - wants: Default::default(), - taints: Default::default(), - partitions: Default::default(), - job_runs: Default::default(), - } - } -} - impl BuildState { /// Reconstruct BuildState from a sequence of events (for read path in web server) /// This allows the web server to rebuild state from BEL storage without holding a lock @@ -73,7 +63,7 @@ impl BuildState { if let Some(ref inner_event) = event.event { // handle_event returns Vec for cascading events, but we ignore them // since we're replaying from a complete event log - let _ = state.handle_event(inner_event); + state.handle_event(inner_event); } } state @@ -100,6 +90,105 @@ impl BuildState { } } + /// Handle creation of a derivative want (created due to job dep miss) + /// + /// When a job reports missing dependencies, it returns WantCreateV1 events for those missing partitions. + /// Those events get appended to the BEL and eventually processed by handle_want_create(). + /// + /// This function is called when we detect a derivative want (has source.job_triggered) and transitions + /// the impacted wants to UpstreamBuilding state, tracking the derivative want ID as an upstream dependency. + /// + /// KEY INSIGHT: We must use the actual want_id from the WantCreateV1 event, not synthetic UUIDs generated + /// during event processing. This ensures replay works correctly - the same want IDs are used both during + /// original execution and during replay from the BEL. + fn handle_derivative_want_creation( + &mut self, + derivative_want_id: &str, + derivative_want_partitions: &[PartitionRef], + source_job_run_id: &str, + ) { + // Look up the job run that triggered this derivative want + // This job run must be in DepMiss state because it reported missing dependencies + let job_run = self.job_runs.get(source_job_run_id).expect(&format!( + "BUG: Job run {} must exist when derivative want created", + source_job_run_id + )); + + // Extract the missing deps from the DepMiss job run + let missing_deps = match job_run { + JobRun::DepMiss(dep_miss) => dep_miss.get_missing_deps(), + _ => { + panic!( + "BUG: Job run {} must be in DepMiss state when derivative want created, found {:?}", + source_job_run_id, job_run + ); + } + }; + + // Find which MissingDeps entry corresponds to this derivative want + // The derivative want was created for a specific set of missing partitions, + // and we need to find which downstream partitions are impacted by those missing partitions + for md in missing_deps { + // Check if this derivative want's partitions match the missing partitions in this entry + // We need exact match because one dep miss event can create multiple derivative wants + let partitions_match = md.missing.iter().all(|missing_ref| { + derivative_want_partitions + .iter() + .any(|p| p.r#ref == missing_ref.r#ref) + }) && derivative_want_partitions.len() == md.missing.len(); + + if partitions_match { + // Now we know which partitions are impacted by this missing dependency + let impacted_partition_refs: Vec = + md.impacted.iter().map(|p| p.r#ref.clone()).collect(); + + // Find all wants that include these impacted partitions + // These are the wants that need to wait for the derivative want to complete + let mut impacted_want_ids: std::collections::HashSet = + std::collections::HashSet::new(); + for partition_ref in &impacted_partition_refs { + if let Some(partition) = self.partitions.get(partition_ref) { + for want_id in partition.want_ids() { + impacted_want_ids.insert(want_id.clone()); + } + } + } + + // Transition each impacted want to UpstreamBuilding, tracking this derivative want as an upstream + for want_id in impacted_want_ids { + let want = self.wants.remove(&want_id).expect(&format!( + "BUG: Want {} must exist when processing derivative want", + want_id + )); + + let transitioned = match want { + Want::Building(building) => { + // First dep miss for this want: Building → UpstreamBuilding + Want::UpstreamBuilding( + building.detect_missing_deps(vec![derivative_want_id.to_string()]), + ) + } + Want::UpstreamBuilding(upstream) => { + // Additional dep miss: UpstreamBuilding → UpstreamBuilding (add another upstream) + // This can happen if multiple jobs report dep misses for different upstreams + Want::UpstreamBuilding( + upstream.add_upstreams(vec![derivative_want_id.to_string()]), + ) + } + _ => { + panic!( + "BUG: Want {} in invalid state {:?} when processing derivative want. Should be Building or UpstreamBuilding.", + want_id, want + ); + } + }; + + self.wants.insert(want_id, transitioned); + } + } + } + } + /// Transition partitions from Missing to Building state /// Used when a job run starts building partitions fn transition_partitions_to_building( @@ -340,6 +429,13 @@ impl BuildState { job_run_id: &str, timestamp: u64, ) { + eprintln!( + "DEBUG unblock_downstream_wants: newly_successful_wants={:?}", + newly_successful_wants + .iter() + .map(|w| &w.0) + .collect::>() + ); // Find downstream wants that are waiting for any of the newly successful wants // TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants let downstream_wants_to_check: Vec = self @@ -359,6 +455,10 @@ impl BuildState { } }) .collect(); + eprintln!( + "DEBUG downstream_wants_to_check={:?}", + downstream_wants_to_check + ); for want_id in downstream_wants_to_check { let want = self @@ -368,6 +468,10 @@ impl BuildState { let transitioned = match want { Want::UpstreamBuilding(downstream_want) => { + eprintln!( + "DEBUG checking want_id={}, upstreams={:?}", + want_id, downstream_want.state.upstream_want_ids + ); // Check if ALL of this downstream want's upstream dependencies are now Successful let all_upstreams_successful = downstream_want .state @@ -379,6 +483,10 @@ impl BuildState { .map(|w| matches!(w, Want::Successful(_))) .unwrap_or(false) }); + eprintln!( + "DEBUG all_upstreams_successful={}", + all_upstreams_successful + ); if all_upstreams_successful { // Check if any of this want's partitions are still being built @@ -391,19 +499,26 @@ impl BuildState { .map(|partition| matches!(partition, Partition::Building(_))) .unwrap_or(false) }); + eprintln!( + "DEBUG any_partition_building={}", + any_partition_building + ); if any_partition_building { // Some partitions still being built, continue in Building state + eprintln!("DEBUG -> Building"); Want::Building( downstream_want .continue_building(job_run_id.to_string(), timestamp), ) } else { // No partitions being built, become schedulable again + eprintln!("DEBUG -> Idle"); Want::Idle(downstream_want.upstreams_satisfied()) } } else { // Upstreams not all satisfied yet, stay in UpstreamBuilding + eprintln!("DEBUG -> UpstreamBuilding (stay)"); Want::UpstreamBuilding(downstream_want) } } @@ -468,109 +583,6 @@ impl BuildState { } } - /// Build a mapping from partition references to the want IDs that will build them - /// Used to track which upstream wants a downstream want depends on after a dep miss - fn build_partition_to_want_mapping( - &self, - want_events: &[Event], - ) -> std::collections::HashMap { - let mut partition_to_want_map: std::collections::HashMap = - std::collections::HashMap::new(); - - for event_item in want_events { - if let Event::WantCreateV1(want_create) = event_item { - for pref in &want_create.partitions { - partition_to_want_map.insert(pref.r#ref.clone(), want_create.want_id.clone()); - } - } - } - - partition_to_want_map - } - - /// Collect upstream want IDs that a servicing want now depends on based on dep misses - /// Returns a deduplicated, sorted list of upstream want IDs - fn collect_upstream_want_ids( - &self, - servicing_want: &crate::WantAttributedPartitions, - missing_deps: &[crate::MissingDeps], - partition_to_want_map: &std::collections::HashMap, - ) -> Vec { - let mut new_upstream_want_ids = Vec::new(); - - for missing_dep in missing_deps { - // Only process if this want contains an impacted partition - let is_impacted = missing_dep.impacted.iter().any(|imp| { - servicing_want - .partitions - .iter() - .any(|p| p.r#ref == imp.r#ref) - }); - - if is_impacted { - // For each missing partition, find the want ID that will build it - for missing_partition in &missing_dep.missing { - if let Some(want_id) = partition_to_want_map.get(&missing_partition.r#ref) { - new_upstream_want_ids.push(want_id.clone()); - } - } - } - } - - // Dedupe upstream want IDs (one job might report same dep multiple times) - new_upstream_want_ids.sort(); - new_upstream_want_ids.dedup(); - - new_upstream_want_ids - } - - /// Transition wants to UpstreamBuilding when they have missing dependencies - /// Handles Building → UpstreamBuilding and UpstreamBuilding → UpstreamBuilding (add upstreams) - fn transition_wants_to_upstream_building( - &mut self, - servicing_wants: &[crate::WantAttributedPartitions], - missing_deps: &[crate::MissingDeps], - partition_to_want_map: &std::collections::HashMap, - ) { - // For each want serviced by this job run, check if it was impacted by missing deps - for servicing_want in servicing_wants { - let want = self.wants.remove(&servicing_want.want_id).expect(&format!( - "BUG: Want {} must exist when serviced by job run", - servicing_want.want_id - )); - - // Collect the upstream want IDs that this want now depends on - let new_upstream_want_ids = - self.collect_upstream_want_ids(servicing_want, missing_deps, partition_to_want_map); - - let transitioned = if !new_upstream_want_ids.is_empty() { - match want { - Want::Building(building) => { - // First dep miss for this want: Building → UpstreamBuilding - Want::UpstreamBuilding(building.detect_missing_deps(new_upstream_want_ids)) - } - Want::UpstreamBuilding(upstream) => { - // Already in UpstreamBuilding, add more upstreams (self-transition) - // This can happen if multiple jobs report dep misses, or one job reports multiple dep misses - Want::UpstreamBuilding(upstream.add_upstreams(new_upstream_want_ids)) - } - _ => { - panic!( - "BUG: Want {} in invalid state {:?} when job run had dep miss. Should be Building or UpstreamBuilding.", - servicing_want.want_id, want - ); - } - } - } else { - // No new upstreams for this want (it wasn't impacted), keep current state - want - }; - - self.wants - .insert(servicing_want.want_id.clone(), transitioned); - } - } - /// Handles reacting to events, updating state, and erroring if its an invalid state transition /// Event handlers can return vecs of events that will then be appended to the BEL pub fn handle_event(&mut self, event: &Event) -> Vec { @@ -604,6 +616,17 @@ impl BuildState { self.add_want_to_partition(pref, &event.want_id); } + // If this is a derivative want (triggered by a job's dep miss), transition impacted wants to UpstreamBuilding + if let Some(source) = &event.source { + if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source { + self.handle_derivative_want_creation( + &event.want_id, + &event.partitions, + &job_triggered.job_run_id, + ); + } + } + vec![] } @@ -878,32 +901,26 @@ impl BuildState { let building_refs_to_reset = dep_miss.get_building_partitions_to_reset(); self.reset_partitions_to_missing(&building_refs_to_reset); - // Create wants from dep misses + // Generate WantCreateV1 events for the missing dependencies + // These events will be returned and appended to the BEL by BuildEventLog.append_event() let want_events = missing_deps_to_want_events( dep_miss.get_missing_deps().to_vec(), &event.job_run_id, want_timestamps, ); - // Building → UpstreamBuilding OR UpstreamBuilding → UpstreamBuilding (add upstreams) + // Store the job run in DepMiss state so we can access the missing_deps later + // When the derivative WantCreateV1 events get processed by handle_want_create(), + // they will look up this job run and use handle_derivative_want_creation() to + // transition impacted wants to UpstreamBuilding with the correct want IDs. // - // When a job reports missing dependencies, we need to: - // 1. Create new wants for the missing partitions (done above via want_events) - // 2. Transition the current want to UpstreamBuilding, tracking which upstream wants it's waiting for - - // Build a map: partition_ref -> want_id that will build it - let partition_to_want_map = self.build_partition_to_want_mapping(&want_events); - - // Transition servicing wants to UpstreamBuilding when they have missing dependencies - self.transition_wants_to_upstream_building( - &dep_miss.info.servicing_wants, - dep_miss.get_missing_deps(), - &partition_to_want_map, - ); - + // KEY: We do NOT transition wants here because the want_events have randomly generated UUIDs + // that won't match during replay. Instead, we transition wants when processing the actual + // WantCreateV1 events that get written to and read from the BEL. self.job_runs .insert(event.job_run_id.clone(), JobRun::DepMiss(dep_miss)); + // Return derivative want events to be appended to the BEL want_events } @@ -1183,7 +1200,8 @@ mod tests { mod sqlite_build_state { mod want { use crate::build_state::BuildState; - use crate::{WantCancelEventV1, WantCreateEventV1, WantDetail}; + use crate::data_build_event::Event; + use crate::{MissingDeps, WantCancelEventV1, WantCreateEventV1, WantDetail}; #[test] fn test_should_create_want() { @@ -1220,6 +1238,143 @@ mod tests { Some(crate::WantStatusCode::WantCanceled.into()) ); } + + #[test] + fn test_multihop_dependency_replay() { + use crate::data_build_event::Event; + use crate::{ + JobRunBufferEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, + JobRunSuccessEventV1, MissingDeps, PartitionRef, WantAttributedPartitions, + WantCreateEventV1, + }; + + let mut state = BuildState::default(); + let mut events = vec![]; + + // 1. Create want for data/beta + let beta_want_id = "beta-want".to_string(); + let mut create_beta = WantCreateEventV1::default(); + create_beta.want_id = beta_want_id.clone(); + create_beta.partitions = vec![PartitionRef { + r#ref: "data/beta".to_string(), + }]; + events.push(Event::WantCreateV1(create_beta)); + + // 2. Queue beta job (first attempt) + let beta_job_1_id = "beta-job-1".to_string(); + let mut buffer_beta_1 = JobRunBufferEventV1::default(); + buffer_beta_1.job_run_id = beta_job_1_id.clone(); + buffer_beta_1.job_label = "//job_beta".to_string(); + buffer_beta_1.want_attributed_partitions = vec![WantAttributedPartitions { + want_id: beta_want_id.clone(), + partitions: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + }]; + buffer_beta_1.building_partitions = vec![PartitionRef { + r#ref: "data/beta".to_string(), + }]; + events.push(Event::JobRunBufferV1(buffer_beta_1)); + + // 3. Beta job starts running + let mut heartbeat_beta_1 = JobRunHeartbeatEventV1::default(); + heartbeat_beta_1.job_run_id = beta_job_1_id.clone(); + events.push(Event::JobRunHeartbeatV1(heartbeat_beta_1)); + + // 4. Beta job reports missing dependency on data/alpha + let mut dep_miss_beta_1 = JobRunMissingDepsEventV1::default(); + dep_miss_beta_1.job_run_id = beta_job_1_id.clone(); + dep_miss_beta_1.missing_deps = vec![MissingDeps { + impacted: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + missing: vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }], + }]; + events.push(Event::JobRunMissingDepsV1(dep_miss_beta_1)); + + // 5. Create derivative want for data/alpha + let alpha_want_id = "alpha-want".to_string(); + let mut create_alpha = WantCreateEventV1::default(); + create_alpha.want_id = alpha_want_id.clone(); + create_alpha.partitions = vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }]; + events.push(Event::WantCreateV1(create_alpha)); + + // 6. Queue alpha job + let alpha_job_id = "alpha-job".to_string(); + let mut buffer_alpha = JobRunBufferEventV1::default(); + buffer_alpha.job_run_id = alpha_job_id.clone(); + buffer_alpha.job_label = "//job_alpha".to_string(); + buffer_alpha.want_attributed_partitions = vec![WantAttributedPartitions { + want_id: alpha_want_id.clone(), + partitions: vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }], + }]; + buffer_alpha.building_partitions = vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }]; + events.push(Event::JobRunBufferV1(buffer_alpha)); + + // 7. Alpha job starts running + let mut heartbeat_alpha = JobRunHeartbeatEventV1::default(); + heartbeat_alpha.job_run_id = alpha_job_id.clone(); + events.push(Event::JobRunHeartbeatV1(heartbeat_alpha)); + + // 8. Alpha job succeeds + let mut success_alpha = JobRunSuccessEventV1::default(); + success_alpha.job_run_id = alpha_job_id.clone(); + events.push(Event::JobRunSuccessV1(success_alpha)); + + // 9. Queue beta job again (second attempt) - THIS IS THE CRITICAL MOMENT + let beta_job_2_id = "beta-job-2".to_string(); + let mut buffer_beta_2 = JobRunBufferEventV1::default(); + buffer_beta_2.job_run_id = beta_job_2_id.clone(); + buffer_beta_2.job_label = "//job_beta".to_string(); + buffer_beta_2.want_attributed_partitions = vec![WantAttributedPartitions { + want_id: beta_want_id.clone(), + partitions: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + }]; + buffer_beta_2.building_partitions = vec![PartitionRef { + r#ref: "data/beta".to_string(), + }]; + events.push(Event::JobRunBufferV1(buffer_beta_2)); + + // 10. Beta job starts running + let mut heartbeat_beta_2 = JobRunHeartbeatEventV1::default(); + heartbeat_beta_2.job_run_id = beta_job_2_id.clone(); + events.push(Event::JobRunHeartbeatV1(heartbeat_beta_2)); + + // 11. Beta job succeeds + let mut success_beta_2 = JobRunSuccessEventV1::default(); + success_beta_2.job_run_id = beta_job_2_id.clone(); + events.push(Event::JobRunSuccessV1(success_beta_2)); + + // Process all events - this simulates replay + for event in &events { + state.handle_event(event); + } + + // Verify final state + let beta_want = state.get_want(&beta_want_id).unwrap(); + assert_eq!( + beta_want.status, + Some(crate::WantStatusCode::WantSuccessful.into()), + "Beta want should be successful after multi-hop dependency resolution" + ); + + let alpha_want = state.get_want(&alpha_want_id).unwrap(); + assert_eq!( + alpha_want.status, + Some(crate::WantStatusCode::WantSuccessful.into()), + "Alpha want should be successful" + ); + } } } } diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index f7ed6a3..38b19e5 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -429,11 +429,13 @@ mod tests { label: "alpha".to_string(), patterns: vec!["data/alpha".to_string()], entry_point: MockJobRun::bin_path(), + environment: Default::default(), }, JobConfiguration { label: "beta".to_string(), patterns: vec!["data/beta".to_string()], entry_point: MockJobRun::bin_path(), + environment: Default::default(), }, ], }; @@ -796,11 +798,13 @@ echo 'Beta succeeded' label: "alpha".to_string(), patterns: vec!["data/alpha".to_string()], entry_point: alpha_script.to_string(), + environment: Default::default(), }, JobConfiguration { label: "beta".to_string(), patterns: vec!["data/beta".to_string()], entry_point: beta_script.to_string(), + environment: Default::default(), }, ], }; @@ -978,6 +982,7 @@ echo 'Beta succeeded' label: label.to_string(), patterns: vec![pattern.to_string()], entry_point: "test_entrypoint".to_string(), + environment: Default::default(), } }