fix un-replayable BEL bug

This commit is contained in:
Stuart Axelbrooke 2025-11-23 09:21:14 +08:00
parent a5a1be8855
commit 32f35ecbd5
3 changed files with 300 additions and 133 deletions

View file

@ -255,6 +255,13 @@ impl<S: BELStorage + Debug> BuildEventLog<S> {
Ok(idx)
}
pub fn append_event_no_recurse(&mut self, event: &Event) -> Result<u64, DatabuildError> {
self.state.handle_event(&event);
let idx = self.storage.append_event(event)?;
// Recursion here might be dangerous, but in theory the event propagation always terminates
Ok(idx)
}
// API methods
pub fn api_handle_list_wants(&self, req: ListWantsRequest) -> ListWantsResponse {
self.state.list_wants(&req)

View file

@ -1,5 +1,6 @@
use crate::data_build_event::Event;
use crate::data_deps::{WantTimestamps, missing_deps_to_want_events};
use crate::event_source::Source as EventSourceVariant;
use crate::job_run_state::{JobRun, JobRunWithState, QueuedState as JobQueuedState};
use crate::partition_state::{
BuildingPartitionRef, FailedPartitionRef, LivePartitionRef, MissingPartitionRef, MissingState,
@ -45,7 +46,7 @@ critical that these state machines, their states, and their transitions are type
/// Tracks all application state, defines valid state transitions, and manages cross-state machine
/// state transitions (e.g. job run success resulting in partition going from Building to Live)
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct BuildState {
wants: BTreeMap<String, Want>, // Type-safe want storage
taints: BTreeMap<String, TaintDetail>,
@ -53,17 +54,6 @@ pub struct BuildState {
job_runs: BTreeMap<String, JobRun>, // Type-safe job run storage
}
impl Default for BuildState {
fn default() -> Self {
Self {
wants: Default::default(),
taints: Default::default(),
partitions: Default::default(),
job_runs: Default::default(),
}
}
}
impl BuildState {
/// Reconstruct BuildState from a sequence of events (for read path in web server)
/// This allows the web server to rebuild state from BEL storage without holding a lock
@ -73,7 +63,7 @@ impl BuildState {
if let Some(ref inner_event) = event.event {
// handle_event returns Vec<Event> for cascading events, but we ignore them
// since we're replaying from a complete event log
let _ = state.handle_event(inner_event);
state.handle_event(inner_event);
}
}
state
@ -100,6 +90,105 @@ impl BuildState {
}
}
/// Handle creation of a derivative want (created due to job dep miss)
///
/// When a job reports missing dependencies, it returns WantCreateV1 events for those missing partitions.
/// Those events get appended to the BEL and eventually processed by handle_want_create().
///
/// This function is called when we detect a derivative want (has source.job_triggered) and transitions
/// the impacted wants to UpstreamBuilding state, tracking the derivative want ID as an upstream dependency.
///
/// KEY INSIGHT: We must use the actual want_id from the WantCreateV1 event, not synthetic UUIDs generated
/// during event processing. This ensures replay works correctly - the same want IDs are used both during
/// original execution and during replay from the BEL.
fn handle_derivative_want_creation(
&mut self,
derivative_want_id: &str,
derivative_want_partitions: &[PartitionRef],
source_job_run_id: &str,
) {
// Look up the job run that triggered this derivative want
// This job run must be in DepMiss state because it reported missing dependencies
let job_run = self.job_runs.get(source_job_run_id).expect(&format!(
"BUG: Job run {} must exist when derivative want created",
source_job_run_id
));
// Extract the missing deps from the DepMiss job run
let missing_deps = match job_run {
JobRun::DepMiss(dep_miss) => dep_miss.get_missing_deps(),
_ => {
panic!(
"BUG: Job run {} must be in DepMiss state when derivative want created, found {:?}",
source_job_run_id, job_run
);
}
};
// Find which MissingDeps entry corresponds to this derivative want
// The derivative want was created for a specific set of missing partitions,
// and we need to find which downstream partitions are impacted by those missing partitions
for md in missing_deps {
// Check if this derivative want's partitions match the missing partitions in this entry
// We need exact match because one dep miss event can create multiple derivative wants
let partitions_match = md.missing.iter().all(|missing_ref| {
derivative_want_partitions
.iter()
.any(|p| p.r#ref == missing_ref.r#ref)
}) && derivative_want_partitions.len() == md.missing.len();
if partitions_match {
// Now we know which partitions are impacted by this missing dependency
let impacted_partition_refs: Vec<String> =
md.impacted.iter().map(|p| p.r#ref.clone()).collect();
// Find all wants that include these impacted partitions
// These are the wants that need to wait for the derivative want to complete
let mut impacted_want_ids: std::collections::HashSet<String> =
std::collections::HashSet::new();
for partition_ref in &impacted_partition_refs {
if let Some(partition) = self.partitions.get(partition_ref) {
for want_id in partition.want_ids() {
impacted_want_ids.insert(want_id.clone());
}
}
}
// Transition each impacted want to UpstreamBuilding, tracking this derivative want as an upstream
for want_id in impacted_want_ids {
let want = self.wants.remove(&want_id).expect(&format!(
"BUG: Want {} must exist when processing derivative want",
want_id
));
let transitioned = match want {
Want::Building(building) => {
// First dep miss for this want: Building → UpstreamBuilding
Want::UpstreamBuilding(
building.detect_missing_deps(vec![derivative_want_id.to_string()]),
)
}
Want::UpstreamBuilding(upstream) => {
// Additional dep miss: UpstreamBuilding → UpstreamBuilding (add another upstream)
// This can happen if multiple jobs report dep misses for different upstreams
Want::UpstreamBuilding(
upstream.add_upstreams(vec![derivative_want_id.to_string()]),
)
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} when processing derivative want. Should be Building or UpstreamBuilding.",
want_id, want
);
}
};
self.wants.insert(want_id, transitioned);
}
}
}
}
/// Transition partitions from Missing to Building state
/// Used when a job run starts building partitions
fn transition_partitions_to_building(
@ -340,6 +429,13 @@ impl BuildState {
job_run_id: &str,
timestamp: u64,
) {
eprintln!(
"DEBUG unblock_downstream_wants: newly_successful_wants={:?}",
newly_successful_wants
.iter()
.map(|w| &w.0)
.collect::<Vec<_>>()
);
// Find downstream wants that are waiting for any of the newly successful wants
// TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants
let downstream_wants_to_check: Vec<String> = self
@ -359,6 +455,10 @@ impl BuildState {
}
})
.collect();
eprintln!(
"DEBUG downstream_wants_to_check={:?}",
downstream_wants_to_check
);
for want_id in downstream_wants_to_check {
let want = self
@ -368,6 +468,10 @@ impl BuildState {
let transitioned = match want {
Want::UpstreamBuilding(downstream_want) => {
eprintln!(
"DEBUG checking want_id={}, upstreams={:?}",
want_id, downstream_want.state.upstream_want_ids
);
// Check if ALL of this downstream want's upstream dependencies are now Successful
let all_upstreams_successful = downstream_want
.state
@ -379,6 +483,10 @@ impl BuildState {
.map(|w| matches!(w, Want::Successful(_)))
.unwrap_or(false)
});
eprintln!(
"DEBUG all_upstreams_successful={}",
all_upstreams_successful
);
if all_upstreams_successful {
// Check if any of this want's partitions are still being built
@ -391,19 +499,26 @@ impl BuildState {
.map(|partition| matches!(partition, Partition::Building(_)))
.unwrap_or(false)
});
eprintln!(
"DEBUG any_partition_building={}",
any_partition_building
);
if any_partition_building {
// Some partitions still being built, continue in Building state
eprintln!("DEBUG -> Building");
Want::Building(
downstream_want
.continue_building(job_run_id.to_string(), timestamp),
)
} else {
// No partitions being built, become schedulable again
eprintln!("DEBUG -> Idle");
Want::Idle(downstream_want.upstreams_satisfied())
}
} else {
// Upstreams not all satisfied yet, stay in UpstreamBuilding
eprintln!("DEBUG -> UpstreamBuilding (stay)");
Want::UpstreamBuilding(downstream_want)
}
}
@ -468,109 +583,6 @@ impl BuildState {
}
}
/// Build a mapping from partition references to the want IDs that will build them
/// Used to track which upstream wants a downstream want depends on after a dep miss
fn build_partition_to_want_mapping(
&self,
want_events: &[Event],
) -> std::collections::HashMap<String, String> {
let mut partition_to_want_map: std::collections::HashMap<String, String> =
std::collections::HashMap::new();
for event_item in want_events {
if let Event::WantCreateV1(want_create) = event_item {
for pref in &want_create.partitions {
partition_to_want_map.insert(pref.r#ref.clone(), want_create.want_id.clone());
}
}
}
partition_to_want_map
}
/// Collect upstream want IDs that a servicing want now depends on based on dep misses
/// Returns a deduplicated, sorted list of upstream want IDs
fn collect_upstream_want_ids(
&self,
servicing_want: &crate::WantAttributedPartitions,
missing_deps: &[crate::MissingDeps],
partition_to_want_map: &std::collections::HashMap<String, String>,
) -> Vec<String> {
let mut new_upstream_want_ids = Vec::new();
for missing_dep in missing_deps {
// Only process if this want contains an impacted partition
let is_impacted = missing_dep.impacted.iter().any(|imp| {
servicing_want
.partitions
.iter()
.any(|p| p.r#ref == imp.r#ref)
});
if is_impacted {
// For each missing partition, find the want ID that will build it
for missing_partition in &missing_dep.missing {
if let Some(want_id) = partition_to_want_map.get(&missing_partition.r#ref) {
new_upstream_want_ids.push(want_id.clone());
}
}
}
}
// Dedupe upstream want IDs (one job might report same dep multiple times)
new_upstream_want_ids.sort();
new_upstream_want_ids.dedup();
new_upstream_want_ids
}
/// Transition wants to UpstreamBuilding when they have missing dependencies
/// Handles Building → UpstreamBuilding and UpstreamBuilding → UpstreamBuilding (add upstreams)
fn transition_wants_to_upstream_building(
&mut self,
servicing_wants: &[crate::WantAttributedPartitions],
missing_deps: &[crate::MissingDeps],
partition_to_want_map: &std::collections::HashMap<String, String>,
) {
// For each want serviced by this job run, check if it was impacted by missing deps
for servicing_want in servicing_wants {
let want = self.wants.remove(&servicing_want.want_id).expect(&format!(
"BUG: Want {} must exist when serviced by job run",
servicing_want.want_id
));
// Collect the upstream want IDs that this want now depends on
let new_upstream_want_ids =
self.collect_upstream_want_ids(servicing_want, missing_deps, partition_to_want_map);
let transitioned = if !new_upstream_want_ids.is_empty() {
match want {
Want::Building(building) => {
// First dep miss for this want: Building → UpstreamBuilding
Want::UpstreamBuilding(building.detect_missing_deps(new_upstream_want_ids))
}
Want::UpstreamBuilding(upstream) => {
// Already in UpstreamBuilding, add more upstreams (self-transition)
// This can happen if multiple jobs report dep misses, or one job reports multiple dep misses
Want::UpstreamBuilding(upstream.add_upstreams(new_upstream_want_ids))
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} when job run had dep miss. Should be Building or UpstreamBuilding.",
servicing_want.want_id, want
);
}
}
} else {
// No new upstreams for this want (it wasn't impacted), keep current state
want
};
self.wants
.insert(servicing_want.want_id.clone(), transitioned);
}
}
/// Handles reacting to events, updating state, and erroring if its an invalid state transition
/// Event handlers can return vecs of events that will then be appended to the BEL
pub fn handle_event(&mut self, event: &Event) -> Vec<Event> {
@ -604,6 +616,17 @@ impl BuildState {
self.add_want_to_partition(pref, &event.want_id);
}
// If this is a derivative want (triggered by a job's dep miss), transition impacted wants to UpstreamBuilding
if let Some(source) = &event.source {
if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source {
self.handle_derivative_want_creation(
&event.want_id,
&event.partitions,
&job_triggered.job_run_id,
);
}
}
vec![]
}
@ -878,32 +901,26 @@ impl BuildState {
let building_refs_to_reset = dep_miss.get_building_partitions_to_reset();
self.reset_partitions_to_missing(&building_refs_to_reset);
// Create wants from dep misses
// Generate WantCreateV1 events for the missing dependencies
// These events will be returned and appended to the BEL by BuildEventLog.append_event()
let want_events = missing_deps_to_want_events(
dep_miss.get_missing_deps().to_vec(),
&event.job_run_id,
want_timestamps,
);
// Building → UpstreamBuilding OR UpstreamBuilding → UpstreamBuilding (add upstreams)
// Store the job run in DepMiss state so we can access the missing_deps later
// When the derivative WantCreateV1 events get processed by handle_want_create(),
// they will look up this job run and use handle_derivative_want_creation() to
// transition impacted wants to UpstreamBuilding with the correct want IDs.
//
// When a job reports missing dependencies, we need to:
// 1. Create new wants for the missing partitions (done above via want_events)
// 2. Transition the current want to UpstreamBuilding, tracking which upstream wants it's waiting for
// Build a map: partition_ref -> want_id that will build it
let partition_to_want_map = self.build_partition_to_want_mapping(&want_events);
// Transition servicing wants to UpstreamBuilding when they have missing dependencies
self.transition_wants_to_upstream_building(
&dep_miss.info.servicing_wants,
dep_miss.get_missing_deps(),
&partition_to_want_map,
);
// KEY: We do NOT transition wants here because the want_events have randomly generated UUIDs
// that won't match during replay. Instead, we transition wants when processing the actual
// WantCreateV1 events that get written to and read from the BEL.
self.job_runs
.insert(event.job_run_id.clone(), JobRun::DepMiss(dep_miss));
// Return derivative want events to be appended to the BEL
want_events
}
@ -1183,7 +1200,8 @@ mod tests {
mod sqlite_build_state {
mod want {
use crate::build_state::BuildState;
use crate::{WantCancelEventV1, WantCreateEventV1, WantDetail};
use crate::data_build_event::Event;
use crate::{MissingDeps, WantCancelEventV1, WantCreateEventV1, WantDetail};
#[test]
fn test_should_create_want() {
@ -1220,6 +1238,143 @@ mod tests {
Some(crate::WantStatusCode::WantCanceled.into())
);
}
#[test]
fn test_multihop_dependency_replay() {
use crate::data_build_event::Event;
use crate::{
JobRunBufferEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1,
JobRunSuccessEventV1, MissingDeps, PartitionRef, WantAttributedPartitions,
WantCreateEventV1,
};
let mut state = BuildState::default();
let mut events = vec![];
// 1. Create want for data/beta
let beta_want_id = "beta-want".to_string();
let mut create_beta = WantCreateEventV1::default();
create_beta.want_id = beta_want_id.clone();
create_beta.partitions = vec![PartitionRef {
r#ref: "data/beta".to_string(),
}];
events.push(Event::WantCreateV1(create_beta));
// 2. Queue beta job (first attempt)
let beta_job_1_id = "beta-job-1".to_string();
let mut buffer_beta_1 = JobRunBufferEventV1::default();
buffer_beta_1.job_run_id = beta_job_1_id.clone();
buffer_beta_1.job_label = "//job_beta".to_string();
buffer_beta_1.want_attributed_partitions = vec![WantAttributedPartitions {
want_id: beta_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
}];
buffer_beta_1.building_partitions = vec![PartitionRef {
r#ref: "data/beta".to_string(),
}];
events.push(Event::JobRunBufferV1(buffer_beta_1));
// 3. Beta job starts running
let mut heartbeat_beta_1 = JobRunHeartbeatEventV1::default();
heartbeat_beta_1.job_run_id = beta_job_1_id.clone();
events.push(Event::JobRunHeartbeatV1(heartbeat_beta_1));
// 4. Beta job reports missing dependency on data/alpha
let mut dep_miss_beta_1 = JobRunMissingDepsEventV1::default();
dep_miss_beta_1.job_run_id = beta_job_1_id.clone();
dep_miss_beta_1.missing_deps = vec![MissingDeps {
impacted: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
missing: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
}];
events.push(Event::JobRunMissingDepsV1(dep_miss_beta_1));
// 5. Create derivative want for data/alpha
let alpha_want_id = "alpha-want".to_string();
let mut create_alpha = WantCreateEventV1::default();
create_alpha.want_id = alpha_want_id.clone();
create_alpha.partitions = vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}];
events.push(Event::WantCreateV1(create_alpha));
// 6. Queue alpha job
let alpha_job_id = "alpha-job".to_string();
let mut buffer_alpha = JobRunBufferEventV1::default();
buffer_alpha.job_run_id = alpha_job_id.clone();
buffer_alpha.job_label = "//job_alpha".to_string();
buffer_alpha.want_attributed_partitions = vec![WantAttributedPartitions {
want_id: alpha_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
}];
buffer_alpha.building_partitions = vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}];
events.push(Event::JobRunBufferV1(buffer_alpha));
// 7. Alpha job starts running
let mut heartbeat_alpha = JobRunHeartbeatEventV1::default();
heartbeat_alpha.job_run_id = alpha_job_id.clone();
events.push(Event::JobRunHeartbeatV1(heartbeat_alpha));
// 8. Alpha job succeeds
let mut success_alpha = JobRunSuccessEventV1::default();
success_alpha.job_run_id = alpha_job_id.clone();
events.push(Event::JobRunSuccessV1(success_alpha));
// 9. Queue beta job again (second attempt) - THIS IS THE CRITICAL MOMENT
let beta_job_2_id = "beta-job-2".to_string();
let mut buffer_beta_2 = JobRunBufferEventV1::default();
buffer_beta_2.job_run_id = beta_job_2_id.clone();
buffer_beta_2.job_label = "//job_beta".to_string();
buffer_beta_2.want_attributed_partitions = vec![WantAttributedPartitions {
want_id: beta_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
}];
buffer_beta_2.building_partitions = vec![PartitionRef {
r#ref: "data/beta".to_string(),
}];
events.push(Event::JobRunBufferV1(buffer_beta_2));
// 10. Beta job starts running
let mut heartbeat_beta_2 = JobRunHeartbeatEventV1::default();
heartbeat_beta_2.job_run_id = beta_job_2_id.clone();
events.push(Event::JobRunHeartbeatV1(heartbeat_beta_2));
// 11. Beta job succeeds
let mut success_beta_2 = JobRunSuccessEventV1::default();
success_beta_2.job_run_id = beta_job_2_id.clone();
events.push(Event::JobRunSuccessV1(success_beta_2));
// Process all events - this simulates replay
for event in &events {
state.handle_event(event);
}
// Verify final state
let beta_want = state.get_want(&beta_want_id).unwrap();
assert_eq!(
beta_want.status,
Some(crate::WantStatusCode::WantSuccessful.into()),
"Beta want should be successful after multi-hop dependency resolution"
);
let alpha_want = state.get_want(&alpha_want_id).unwrap();
assert_eq!(
alpha_want.status,
Some(crate::WantStatusCode::WantSuccessful.into()),
"Alpha want should be successful"
);
}
}
}
}

View file

@ -429,11 +429,13 @@ mod tests {
label: "alpha".to_string(),
patterns: vec!["data/alpha".to_string()],
entry_point: MockJobRun::bin_path(),
environment: Default::default(),
},
JobConfiguration {
label: "beta".to_string(),
patterns: vec!["data/beta".to_string()],
entry_point: MockJobRun::bin_path(),
environment: Default::default(),
},
],
};
@ -796,11 +798,13 @@ echo 'Beta succeeded'
label: "alpha".to_string(),
patterns: vec!["data/alpha".to_string()],
entry_point: alpha_script.to_string(),
environment: Default::default(),
},
JobConfiguration {
label: "beta".to_string(),
patterns: vec!["data/beta".to_string()],
entry_point: beta_script.to_string(),
environment: Default::default(),
},
],
};
@ -978,6 +982,7 @@ echo 'Beta succeeded'
label: label.to_string(),
patterns: vec![pattern.to_string()],
entry_point: "test_entrypoint".to_string(),
environment: Default::default(),
}
}