From 17d5987517ff0da6459604eb6d27ed6331ea2f97 Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Fri, 28 Nov 2025 12:48:54 +0800 Subject: [PATCH] WIP --- databuild/build_state/event_handlers.rs | 182 ++++++-------- databuild/build_state/mod.rs | 14 ++ databuild/build_state/queries.rs | 223 +++++++++++++++++- databuild/databuild.proto | 51 +++- databuild/event_transforms.rs | 12 +- databuild/http_server.rs | 12 +- databuild/job_run.rs | 2 + databuild/job_run_state.rs | 190 ++++++++++++++- databuild/orchestrator.rs | 2 + databuild/partition_state.rs | 70 +++++- databuild/util.rs | 222 +++++++++++++++++ databuild/want_state.rs | 37 ++- databuild/web/templates.rs | 153 +++++++++++- databuild/web/templates/job_runs/detail.html | 39 +++ .../web/templates/partitions/detail.html | 27 ++- databuild/web/templates/wants/detail.html | 22 ++ examples/multihop/databuild.json | 4 +- 17 files changed, 1124 insertions(+), 138 deletions(-) diff --git a/databuild/build_state/event_handlers.rs b/databuild/build_state/event_handlers.rs index ead6527..908d2cd 100644 --- a/databuild/build_state/event_handlers.rs +++ b/databuild/build_state/event_handlers.rs @@ -340,18 +340,51 @@ impl BuildState { } pub(crate) fn handle_job_run_success(&mut self, event: &JobRunSuccessEventV1) -> Vec { + use std::collections::BTreeMap; + let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!( "BUG: Job run {} must exist when success event received", event.job_run_id )); + // Resolve read partition UUIDs from the read_deps in the event + let mut read_partition_uuids: BTreeMap = BTreeMap::new(); + for read_dep in &event.read_deps { + for read_ref in &read_dep.read { + if let Some(uuid) = self.get_canonical_partition_uuid(&read_ref.r#ref) { + read_partition_uuids.insert(read_ref.r#ref.clone(), uuid.to_string()); + } + } + } + + // Get the building partitions from the job run to resolve wrote_partition_uuids + let building_partitions = match &job_run { + JobRun::Running(running) => running.info.building_partitions.clone(), + _ => vec![], + }; + + // Resolve wrote partition UUIDs - these will be set when we transition partitions to Live + // For now, we compute them based on job_run_id (same logic as transition_partitions_to_live) + let mut wrote_partition_uuids: BTreeMap = BTreeMap::new(); + for pref in &building_partitions { + let uuid = + crate::partition_state::derive_partition_uuid(&event.job_run_id, &pref.r#ref); + wrote_partition_uuids.insert(pref.r#ref.clone(), uuid.to_string()); + } + let succeeded = match job_run { JobRun::Running(running) => { tracing::info!( job_run_id = %event.job_run_id, + read_deps_count = event.read_deps.len(), "JobRun: Running → Succeeded" ); - running.succeed(current_timestamp()) + running.succeed( + current_timestamp(), + event.read_deps.clone(), + read_partition_uuids.clone(), + wrote_partition_uuids.clone(), + ) } _ => { panic!( @@ -361,6 +394,34 @@ impl BuildState { } }; + // Populate the consumer index from read_deps (using UUIDs for historical lineage) + // For each read partition UUID, record that the written partition UUIDs consumed it + for read_dep in &event.read_deps { + for read_ref in &read_dep.read { + // Look up the read partition's UUID + if let Some(read_uuid) = read_partition_uuids.get(&read_ref.r#ref) { + if let Ok(read_uuid) = uuid::Uuid::parse_str(read_uuid) { + let consumers = self + .partition_consumers + .entry(read_uuid) + .or_insert_with(Vec::new); + for impacted_ref in &read_dep.impacted { + // Look up the impacted (output) partition's UUID + if let Some(wrote_uuid) = wrote_partition_uuids.get(&impacted_ref.r#ref) + { + if let Ok(wrote_uuid) = uuid::Uuid::parse_str(wrote_uuid) { + let entry = (wrote_uuid, event.job_run_id.clone()); + if !consumers.contains(&entry) { + consumers.push(entry); + } + } + } + } + } + } + } + } + // Job run success is SOURCE of truth that partitions are live let newly_live_partitions = succeeded.get_completed_partitions(); @@ -642,7 +703,6 @@ impl BuildState { #[cfg(test)] mod tests { use super::*; - use crate::{MissingDeps, WantAttributedPartitions}; mod want { use super::*; @@ -686,133 +746,25 @@ mod tests { #[test] fn test_multihop_dependency_replay() { - use crate::{ - JobRunBufferEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, - JobRunSuccessEventV1, MissingDeps, PartitionRef, WantAttributedPartitions, - WantCreateEventV1, - }; + use crate::util::test_scenarios::multihop_scenario; - let mut state = BuildState::default(); - let mut events = vec![]; - - // 1. Create want for data/beta - let beta_want_id = "beta-want".to_string(); - let mut create_beta = WantCreateEventV1::default(); - create_beta.want_id = beta_want_id.clone(); - create_beta.partitions = vec![PartitionRef { - r#ref: "data/beta".to_string(), - }]; - events.push(Event::WantCreateV1(create_beta)); - - // 2. Queue beta job (first attempt) - let beta_job_1_id = "beta-job-1".to_string(); - let mut buffer_beta_1 = JobRunBufferEventV1::default(); - buffer_beta_1.job_run_id = beta_job_1_id.clone(); - buffer_beta_1.job_label = "//job_beta".to_string(); - buffer_beta_1.want_attributed_partitions = vec![WantAttributedPartitions { - want_id: beta_want_id.clone(), - partitions: vec![PartitionRef { - r#ref: "data/beta".to_string(), - }], - }]; - buffer_beta_1.building_partitions = vec![PartitionRef { - r#ref: "data/beta".to_string(), - }]; - events.push(Event::JobRunBufferV1(buffer_beta_1)); - - // 3. Beta job starts running - let mut heartbeat_beta_1 = JobRunHeartbeatEventV1::default(); - heartbeat_beta_1.job_run_id = beta_job_1_id.clone(); - events.push(Event::JobRunHeartbeatV1(heartbeat_beta_1)); - - // 4. Beta job reports missing dependency on data/alpha - let mut dep_miss_beta_1 = JobRunMissingDepsEventV1::default(); - dep_miss_beta_1.job_run_id = beta_job_1_id.clone(); - dep_miss_beta_1.missing_deps = vec![MissingDeps { - impacted: vec![PartitionRef { - r#ref: "data/beta".to_string(), - }], - missing: vec![PartitionRef { - r#ref: "data/alpha".to_string(), - }], - }]; - events.push(Event::JobRunMissingDepsV1(dep_miss_beta_1)); - - // 5. Create derivative want for data/alpha - let alpha_want_id = "alpha-want".to_string(); - let mut create_alpha = WantCreateEventV1::default(); - create_alpha.want_id = alpha_want_id.clone(); - create_alpha.partitions = vec![PartitionRef { - r#ref: "data/alpha".to_string(), - }]; - events.push(Event::WantCreateV1(create_alpha)); - - // 6. Queue alpha job - let alpha_job_id = "alpha-job".to_string(); - let mut buffer_alpha = JobRunBufferEventV1::default(); - buffer_alpha.job_run_id = alpha_job_id.clone(); - buffer_alpha.job_label = "//job_alpha".to_string(); - buffer_alpha.want_attributed_partitions = vec![WantAttributedPartitions { - want_id: alpha_want_id.clone(), - partitions: vec![PartitionRef { - r#ref: "data/alpha".to_string(), - }], - }]; - buffer_alpha.building_partitions = vec![PartitionRef { - r#ref: "data/alpha".to_string(), - }]; - events.push(Event::JobRunBufferV1(buffer_alpha)); - - // 7. Alpha job starts running - let mut heartbeat_alpha = JobRunHeartbeatEventV1::default(); - heartbeat_alpha.job_run_id = alpha_job_id.clone(); - events.push(Event::JobRunHeartbeatV1(heartbeat_alpha)); - - // 8. Alpha job succeeds - let mut success_alpha = JobRunSuccessEventV1::default(); - success_alpha.job_run_id = alpha_job_id.clone(); - events.push(Event::JobRunSuccessV1(success_alpha)); - - // 9. Queue beta job again (second attempt) - THIS IS THE CRITICAL MOMENT - let beta_job_2_id = "beta-job-2".to_string(); - let mut buffer_beta_2 = JobRunBufferEventV1::default(); - buffer_beta_2.job_run_id = beta_job_2_id.clone(); - buffer_beta_2.job_label = "//job_beta".to_string(); - buffer_beta_2.want_attributed_partitions = vec![WantAttributedPartitions { - want_id: beta_want_id.clone(), - partitions: vec![PartitionRef { - r#ref: "data/beta".to_string(), - }], - }]; - buffer_beta_2.building_partitions = vec![PartitionRef { - r#ref: "data/beta".to_string(), - }]; - events.push(Event::JobRunBufferV1(buffer_beta_2)); - - // 10. Beta job starts running - let mut heartbeat_beta_2 = JobRunHeartbeatEventV1::default(); - heartbeat_beta_2.job_run_id = beta_job_2_id.clone(); - events.push(Event::JobRunHeartbeatV1(heartbeat_beta_2)); - - // 11. Beta job succeeds - let mut success_beta_2 = JobRunSuccessEventV1::default(); - success_beta_2.job_run_id = beta_job_2_id.clone(); - events.push(Event::JobRunSuccessV1(success_beta_2)); + let (events, ids) = multihop_scenario(); // Process all events - this simulates replay + let mut state = BuildState::default(); for event in &events { state.handle_event(event); } // Verify final state - let beta_want = state.get_want(&beta_want_id).unwrap(); + let beta_want = state.get_want(&ids.beta_want_id).unwrap(); assert_eq!( beta_want.status, Some(crate::WantStatusCode::WantSuccessful.into()), "Beta want should be successful after multi-hop dependency resolution" ); - let alpha_want = state.get_want(&alpha_want_id).unwrap(); + let alpha_want = state.get_want(&ids.alpha_want_id).unwrap(); assert_eq!( alpha_want.status, Some(crate::WantStatusCode::WantSuccessful.into()), diff --git a/databuild/build_state/mod.rs b/databuild/build_state/mod.rs index 2b6bc8a..a624f66 100644 --- a/databuild/build_state/mod.rs +++ b/databuild/build_state/mod.rs @@ -61,6 +61,11 @@ pub struct BuildState { // Inverted indexes pub(crate) wants_for_partition: BTreeMap>, // partition ref → want_ids pub(crate) downstream_waiting: BTreeMap>, // upstream ref → partition UUIDs waiting for it + + // Consumer index for lineage queries: input_uuid → list of (output_uuid, job_run_id) + // Uses UUIDs (not refs) to preserve historical lineage across partition rebuilds + // Populated from read_deps on job success + pub(crate) partition_consumers: BTreeMap>, } impl BuildState { @@ -118,6 +123,15 @@ impl BuildState { .unwrap_or(&[]) } + /// Get consumers for a partition UUID (downstream partitions that read this one) + /// Returns list of (output_uuid, job_run_id) tuples + pub fn get_partition_consumers(&self, uuid: &Uuid) -> &[(Uuid, String)] { + self.partition_consumers + .get(uuid) + .map(|v| v.as_slice()) + .unwrap_or(&[]) + } + /// Register a want in the wants_for_partition inverted index pub(crate) fn register_want_for_partitions( &mut self, diff --git a/databuild/build_state/queries.rs b/databuild/build_state/queries.rs index a0a6872..63fb40f 100644 --- a/databuild/build_state/queries.rs +++ b/databuild/build_state/queries.rs @@ -2,10 +2,12 @@ //! //! Read-only methods for accessing state (get_*, list_*) used by the API layer. +use crate::util::{HasRelatedIds, RelatedIds}; use crate::{ - JobRunDetail, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, - ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, - ListWantsResponse, PartitionDetail, TaintDetail, WantDetail, + GetJobRunResponse, GetPartitionResponse, GetWantResponse, JobRunDetail, ListJobRunsRequest, + ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, + ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, RelatedEntities, + TaintDetail, WantDetail, }; use std::collections::BTreeMap; @@ -49,6 +51,7 @@ impl BuildState { match_count: self.wants.len() as u64, page, page_size, + index: None, } } @@ -60,6 +63,7 @@ impl BuildState { match_count: self.wants.len() as u64, page, page_size, + index: None, } } @@ -81,6 +85,7 @@ impl BuildState { match_count: self.canonical_partitions.len() as u64, page, page_size, + index: None, } } @@ -102,6 +107,7 @@ impl BuildState { match_count: self.job_runs.len() as u64, page, page_size, + index: None, } } } @@ -116,3 +122,214 @@ fn list_state_items(map: &BTreeMap, page: u64, page_size: u .cloned() .collect() } + +// ============================================================================ +// Response builders with RelatedEntities index +// ============================================================================ + +impl BuildState { + /// Resolve RelatedIds to a RelatedEntities index by looking up entities in BuildState. + /// This is the central method for building the index from collected IDs. + pub fn resolve_related_ids(&self, ids: &RelatedIds) -> RelatedEntities { + let mut index = RelatedEntities::default(); + + // Resolve partition refs + for partition_ref in &ids.partition_refs { + if !index.partitions.contains_key(partition_ref) { + if let Some(p) = self.get_canonical_partition(partition_ref) { + index + .partitions + .insert(partition_ref.clone(), p.to_detail()); + } + } + } + + // Resolve partition UUIDs + for uuid in &ids.partition_uuids { + if let Some(p) = self.partitions_by_uuid.get(uuid) { + let detail = p.to_detail(); + if let Some(ref pref) = detail.r#ref { + if !index.partitions.contains_key(&pref.r#ref) { + index.partitions.insert(pref.r#ref.clone(), detail); + } + } + } + } + + // Resolve job run IDs + for job_run_id in &ids.job_run_ids { + if !index.job_runs.contains_key(job_run_id) { + if let Some(jr) = self.job_runs.get(job_run_id) { + index.job_runs.insert(job_run_id.clone(), jr.to_detail()); + } + } + } + + // Resolve want IDs + for want_id in &ids.want_ids { + if !index.wants.contains_key(want_id) { + if let Some(w) = self.wants.get(want_id) { + index.wants.insert(want_id.clone(), w.to_detail()); + } + } + } + + index + } + + /// Get a want with its related entities (job runs, partitions) + pub fn get_want_with_index(&self, want_id: &str) -> Option { + let want = self.wants.get(want_id)?; + let want_detail = want.to_detail(); + let ids = want.related_ids(); + let index = self.resolve_related_ids(&ids); + + Some(GetWantResponse { + data: Some(want_detail), + index: Some(index), + }) + } + + /// Get a partition with its related entities (builder job run, downstream consumers) + pub fn get_partition_with_index(&self, partition_ref: &str) -> Option { + let partition = self.get_canonical_partition(partition_ref)?; + let partition_detail = partition.to_detail(); + + let mut ids = partition.related_ids(); + + // Add downstream consumers from the consumer index (not stored on partition) + let uuid = partition.uuid(); + for (output_uuid, job_run_id) in self.get_partition_consumers(&uuid) { + if !ids.partition_uuids.contains(output_uuid) { + ids.partition_uuids.push(*output_uuid); + } + if !ids.job_run_ids.contains(job_run_id) { + ids.job_run_ids.push(job_run_id.clone()); + } + } + + // Add wants that reference this partition (from inverted index) + for want_id in self.get_wants_for_partition(partition_ref) { + if !ids.want_ids.contains(want_id) { + ids.want_ids.push(want_id.clone()); + } + } + + let index = self.resolve_related_ids(&ids); + + Some(GetPartitionResponse { + data: Some(partition_detail), + index: Some(index), + }) + } + + /// Get a job run with its related entities (read/wrote partitions, derivative wants) + pub fn get_job_run_with_index(&self, job_run_id: &str) -> Option { + let job_run = self.job_runs.get(job_run_id)?; + let job_run_detail = job_run.to_detail(); + let ids = job_run.related_ids(); + let index = self.resolve_related_ids(&ids); + + Some(GetJobRunResponse { + data: Some(job_run_detail), + index: Some(index), + }) + } + + /// List wants with related entities index + pub fn list_wants_with_index(&self, request: &ListWantsRequest) -> ListWantsResponse { + let page = request.page.unwrap_or(0); + let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE); + let start = page * page_size; + + let wants: Vec<_> = self + .wants + .values() + .skip(start as usize) + .take(page_size as usize) + .collect(); + + // Collect related IDs from all wants + let mut all_ids = RelatedIds::default(); + for want in &wants { + all_ids.merge(want.related_ids()); + } + + let data: Vec = wants.iter().map(|w| w.to_detail()).collect(); + let index = self.resolve_related_ids(&all_ids); + + ListWantsResponse { + data, + match_count: self.wants.len() as u64, + page, + page_size, + index: Some(index), + } + } + + /// List partitions with related entities index + pub fn list_partitions_with_index( + &self, + request: &ListPartitionsRequest, + ) -> ListPartitionsResponse { + let page = request.page.unwrap_or(0); + let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE); + let start = page * page_size; + + let partitions: Vec<_> = self + .canonical_partitions + .iter() + .skip(start as usize) + .take(page_size as usize) + .filter_map(|(_, uuid)| self.partitions_by_uuid.get(uuid)) + .collect(); + + // Collect related IDs from all partitions + let mut all_ids = RelatedIds::default(); + for partition in &partitions { + all_ids.merge(partition.related_ids()); + } + + let data: Vec = partitions.iter().map(|p| p.to_detail()).collect(); + let index = self.resolve_related_ids(&all_ids); + + ListPartitionsResponse { + data, + match_count: self.canonical_partitions.len() as u64, + page, + page_size, + index: Some(index), + } + } + + /// List job runs with related entities index + pub fn list_job_runs_with_index(&self, request: &ListJobRunsRequest) -> ListJobRunsResponse { + let page = request.page.unwrap_or(0); + let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE); + let start = page * page_size; + + let job_runs: Vec<_> = self + .job_runs + .values() + .skip(start as usize) + .take(page_size as usize) + .collect(); + + // Collect related IDs from all job runs + let mut all_ids = RelatedIds::default(); + for job_run in &job_runs { + all_ids.merge(job_run.related_ids()); + } + + let data: Vec = job_runs.iter().map(|jr| jr.to_detail()).collect(); + let index = self.resolve_related_ids(&all_ids); + + ListJobRunsResponse { + data, + match_count: self.job_runs.len() as u64, + page, + page_size, + index: Some(index), + } + } +} diff --git a/databuild/databuild.proto b/databuild/databuild.proto index f179195..1faddbd 100644 --- a/databuild/databuild.proto +++ b/databuild/databuild.proto @@ -85,9 +85,11 @@ message JobRunHeartbeatEventV1 { string job_run_id = 1; // TODO reentrance? } -// Simply indicates that the job has succeeded. +// Indicates that the job has succeeded, including what data was read. message JobRunSuccessEventV1 { string job_run_id = 1; + // The read dependencies for this job run, preserving impacted→read relationships + repeated ReadDeps read_deps = 2; } // Simply indicates that the job has failed. Depending on retry logic defined in job, it may retry. message JobRunFailureEventV1 { @@ -212,7 +214,10 @@ message WantDetail { optional string comment = 8; WantStatus status = 9; uint64 last_updated_timestamp = 10; - // TODO + // Lineage: all job runs that have serviced this want + repeated string job_run_ids = 11; + // Lineage: derivative wants spawned by this want's job dep-misses + repeated string derivative_want_ids = 12; } message PartitionDetail { @@ -230,6 +235,11 @@ message PartitionDetail { // The unique identifier for this partition instance (UUID as string) // Each time a partition is built, it gets a new UUID derived from the job_run_id string uuid = 7; + // Lineage: job run that built this partition (for Live/Tainted partitions) + // Upstream lineage is resolved via this job run's read_deps (job run is source of truth) + optional string built_by_job_run_id = 8; + // Lineage: downstream partition UUIDs that consumed this (from consumer index) + repeated string downstream_partition_uuids = 9; } message PartitionStatus { PartitionStatusCode code = 1; @@ -289,10 +299,26 @@ message JobRunDetail { optional uint64 last_heartbeat_at = 3; repeated PartitionRef building_partitions = 4; repeated WantAttributedPartitions servicing_wants = 5; + // Lineage: read dependencies with resolved UUIDs (for Succeeded jobs) + repeated ReadDeps read_deps = 6; + // Lineage: resolved UUIDs for read partitions (ref → UUID) + map read_partition_uuids = 7; + // Lineage: resolved UUIDs for written partitions (ref → UUID) + map wrote_partition_uuids = 8; + // Lineage: derivative wants spawned by this job's dep-miss (for DepMiss jobs) + repeated string derivative_want_ids = 9; } +// Related entities index - used in API responses for deduplication and O(1) lookup +// Each entity appears once in the index, even if referenced by multiple items in `data` +message RelatedEntities { + map partitions = 1; + map job_runs = 2; + map wants = 3; +} + message EventFilter { // IDs of wants to get relevant events for repeated string want_ids = 1; @@ -308,6 +334,7 @@ message ListWantsResponse { uint64 match_count = 2; uint64 page = 3; uint64 page_size = 4; + RelatedEntities index = 5; } message ListTaintsRequest { @@ -320,6 +347,7 @@ message ListTaintsResponse { uint64 match_count = 2; uint64 page = 3; uint64 page_size = 4; + RelatedEntities index = 5; } message ListPartitionsRequest { @@ -332,6 +360,7 @@ message ListPartitionsResponse { uint64 match_count = 2; uint64 page = 3; uint64 page_size = 4; + RelatedEntities index = 5; } message ListJobRunsRequest { @@ -344,6 +373,7 @@ message ListJobRunsResponse { uint64 match_count = 2; uint64 page = 3; uint64 page_size = 4; + RelatedEntities index = 5; } message CreateWantRequest { @@ -372,6 +402,23 @@ message GetWantRequest { } message GetWantResponse { WantDetail data = 1; + RelatedEntities index = 2; +} + +message GetPartitionRequest { + string partition_ref = 1; +} +message GetPartitionResponse { + PartitionDetail data = 1; + RelatedEntities index = 2; +} + +message GetJobRunRequest { + string job_run_id = 1; +} +message GetJobRunResponse { + JobRunDetail data = 1; + RelatedEntities index = 2; } message CreateTaintRequest { diff --git a/databuild/event_transforms.rs b/databuild/event_transforms.rs index db32b8a..fc65a2c 100644 --- a/databuild/event_transforms.rs +++ b/databuild/event_transforms.rs @@ -30,6 +30,8 @@ impl From for WantDetail { comment: e.comment, status: Some(WantStatusCode::WantIdle.into()), last_updated_timestamp: current_timestamp(), + job_run_ids: vec![], + derivative_want_ids: vec![], } } } @@ -74,12 +76,17 @@ impl From for WantStatus { impl From for JobRunDetail { fn from(value: JobRunBufferEventV1) -> Self { + use std::collections::HashMap; Self { id: value.job_run_id, status: Some(JobRunStatusCode::JobRunQueued.into()), last_heartbeat_at: None, building_partitions: value.building_partitions, servicing_wants: value.want_attributed_partitions, + read_deps: vec![], + read_partition_uuids: HashMap::new(), + wrote_partition_uuids: HashMap::new(), + derivative_want_ids: vec![], } } } @@ -210,7 +217,10 @@ impl Into for Option { impl Into for Option { fn into(self) -> GetWantResponse { - GetWantResponse { data: self } + GetWantResponse { + data: self, + index: None, + } } } diff --git a/databuild/http_server.rs b/databuild/http_server.rs index ba3ede9..c99d816 100644 --- a/databuild/http_server.rs +++ b/databuild/http_server.rs @@ -433,7 +433,7 @@ async fn list_wants_json( .into_response(); } }; - let response = build_state.list_wants(¶ms); + let response = build_state.list_wants_with_index(¶ms); (StatusCode::OK, Json(response)).into_response() } @@ -456,10 +456,8 @@ async fn get_want_json( .into_response(); } }; - let response = build_state.get_want(&want_id); - - match response { - Some(want) => (StatusCode::OK, Json(GetWantResponse { data: Some(want) })).into_response(), + match build_state.get_want_with_index(&want_id) { + Some(response) => (StatusCode::OK, Json(response)).into_response(), None => { tracing::debug!("Want not found: {}", want_id); ( @@ -608,7 +606,7 @@ async fn list_partitions_json( .into_response(); } }; - let response = build_state.list_partitions(¶ms); + let response = build_state.list_partitions_with_index(¶ms); (StatusCode::OK, Json(response)).into_response() } @@ -631,7 +629,7 @@ async fn list_job_runs_json( .into_response(); } }; - let response = build_state.list_job_runs(¶ms); + let response = build_state.list_job_runs_with_index(¶ms); (StatusCode::OK, Json(response)).into_response() } diff --git a/databuild/job_run.rs b/databuild/job_run.rs index ded201e..199600a 100644 --- a/databuild/job_run.rs +++ b/databuild/job_run.rs @@ -256,6 +256,7 @@ impl SubProcessCompleted { pub fn to_event(&self, job_run_id: &Uuid) -> Event { Event::JobRunSuccessV1(JobRunSuccessEventV1 { job_run_id: job_run_id.to_string(), + read_deps: self.read_deps.clone(), }) } } @@ -394,6 +395,7 @@ impl ToEvent for SubProcessCompleted { fn to_event(&self, job_run_id: &Uuid) -> Event { Event::JobRunSuccessV1(JobRunSuccessEventV1 { job_run_id: job_run_id.to_string(), + read_deps: self.read_deps.clone(), }) } } diff --git a/databuild/job_run_state.rs b/databuild/job_run_state.rs index c50b896..a4b3933 100644 --- a/databuild/job_run_state.rs +++ b/databuild/job_run_state.rs @@ -1,9 +1,11 @@ use crate::partition_state::{BuildingPartitionRef, FailedPartitionRef, LivePartitionRef}; -use crate::util::current_timestamp; +use crate::util::{HasRelatedIds, RelatedIds, current_timestamp}; use crate::{ EventSource, JobRunDetail, JobRunStatusCode, MissingDeps, PartitionRef, ReadDeps, WantAttributedPartitions, }; +use std::collections::BTreeMap; +use uuid::Uuid; /// State: Job has been queued but not yet started #[derive(Debug, Clone)] @@ -22,6 +24,12 @@ pub struct RunningState { #[derive(Debug, Clone)] pub struct SucceededState { pub completed_at: u64, + /// The read dependencies reported by the job, preserving impacted→read relationships + pub read_deps: Vec, + /// Resolved UUIDs for partitions that were read (ref → UUID at read time) + pub read_partition_uuids: BTreeMap, + /// Resolved UUIDs for partitions that were written (ref → UUID) + pub wrote_partition_uuids: BTreeMap, } /// State: Job failed during execution @@ -113,11 +121,20 @@ impl JobRunWithState { } /// Transition from Running to Succeeded - pub fn succeed(self, timestamp: u64) -> JobRunWithState { + pub fn succeed( + self, + timestamp: u64, + read_deps: Vec, + read_partition_uuids: BTreeMap, + wrote_partition_uuids: BTreeMap, + ) -> JobRunWithState { JobRunWithState { info: self.info, state: SucceededState { completed_at: timestamp, + read_deps, + read_partition_uuids, + wrote_partition_uuids, }, } } @@ -231,6 +248,21 @@ impl JobRunWithState { .map(|p| LivePartitionRef(p.clone())) .collect() } + + /// Get the read dependencies reported by the job + pub fn get_read_deps(&self) -> &[ReadDeps] { + &self.state.read_deps + } + + /// Get the resolved UUIDs for partitions that were read + pub fn get_read_partition_uuids(&self) -> &BTreeMap { + &self.state.read_partition_uuids + } + + /// Get the resolved UUIDs for partitions that were written + pub fn get_wrote_partition_uuids(&self) -> &BTreeMap { + &self.state.wrote_partition_uuids + } } impl JobRunWithState { @@ -290,10 +322,128 @@ impl JobRunWithState { } } +// ==================== HasRelatedIds trait implementation ==================== + +impl HasRelatedIds for JobRun { + /// Get the IDs of all entities this job run references. + /// Note: derivative_want_ids come from BuildState, not from JobRun itself. + fn related_ids(&self) -> RelatedIds { + // Partition refs from building_partitions (all states have this) + let partition_refs: Vec = match self { + JobRun::Queued(jr) => jr + .info + .building_partitions + .iter() + .map(|p| p.r#ref.clone()) + .collect(), + JobRun::Running(jr) => jr + .info + .building_partitions + .iter() + .map(|p| p.r#ref.clone()) + .collect(), + JobRun::Succeeded(jr) => jr + .info + .building_partitions + .iter() + .map(|p| p.r#ref.clone()) + .collect(), + JobRun::Failed(jr) => jr + .info + .building_partitions + .iter() + .map(|p| p.r#ref.clone()) + .collect(), + JobRun::DepMiss(jr) => jr + .info + .building_partitions + .iter() + .map(|p| p.r#ref.clone()) + .collect(), + JobRun::Canceled(jr) => jr + .info + .building_partitions + .iter() + .map(|p| p.r#ref.clone()) + .collect(), + }; + + // Partition UUIDs from read/write lineage (only Succeeded state has these) + let partition_uuids: Vec = match self { + JobRun::Succeeded(jr) => { + let mut uuids = Vec::new(); + for uuid_str in jr.state.read_partition_uuids.values() { + if let Ok(uuid) = Uuid::parse_str(uuid_str) { + uuids.push(uuid); + } + } + for uuid_str in jr.state.wrote_partition_uuids.values() { + if let Ok(uuid) = Uuid::parse_str(uuid_str) { + if !uuids.contains(&uuid) { + uuids.push(uuid); + } + } + } + uuids + } + _ => vec![], + }; + + // Want IDs from servicing_wants (all states have this) + let want_ids: Vec = match self { + JobRun::Queued(jr) => jr + .info + .servicing_wants + .iter() + .map(|w| w.want_id.clone()) + .collect(), + JobRun::Running(jr) => jr + .info + .servicing_wants + .iter() + .map(|w| w.want_id.clone()) + .collect(), + JobRun::Succeeded(jr) => jr + .info + .servicing_wants + .iter() + .map(|w| w.want_id.clone()) + .collect(), + JobRun::Failed(jr) => jr + .info + .servicing_wants + .iter() + .map(|w| w.want_id.clone()) + .collect(), + JobRun::DepMiss(jr) => jr + .info + .servicing_wants + .iter() + .map(|w| w.want_id.clone()) + .collect(), + JobRun::Canceled(jr) => jr + .info + .servicing_wants + .iter() + .map(|w| w.want_id.clone()) + .collect(), + }; + + RelatedIds { + partition_refs, + partition_uuids, + job_run_ids: vec![], + want_ids, + } + } +} + // ==================== Conversion to JobRunDetail for API ==================== impl JobRun { pub fn to_detail(&self) -> JobRunDetail { + use std::collections::HashMap; + match self { JobRun::Queued(queued) => JobRunDetail { id: queued.info.id.clone(), @@ -301,6 +451,10 @@ impl JobRun { last_heartbeat_at: None, building_partitions: queued.info.building_partitions.clone(), servicing_wants: queued.info.servicing_wants.clone(), + read_deps: vec![], + read_partition_uuids: HashMap::new(), + wrote_partition_uuids: HashMap::new(), + derivative_want_ids: vec![], }, JobRun::Running(running) => JobRunDetail { id: running.info.id.clone(), @@ -308,6 +462,10 @@ impl JobRun { last_heartbeat_at: Some(running.state.last_heartbeat_at), building_partitions: running.info.building_partitions.clone(), servicing_wants: running.info.servicing_wants.clone(), + read_deps: vec![], + read_partition_uuids: HashMap::new(), + wrote_partition_uuids: HashMap::new(), + derivative_want_ids: vec![], }, JobRun::Succeeded(succeeded) => JobRunDetail { id: succeeded.info.id.clone(), @@ -315,6 +473,20 @@ impl JobRun { last_heartbeat_at: None, building_partitions: succeeded.info.building_partitions.clone(), servicing_wants: succeeded.info.servicing_wants.clone(), + read_deps: succeeded.state.read_deps.clone(), + read_partition_uuids: succeeded + .state + .read_partition_uuids + .clone() + .into_iter() + .collect(), + wrote_partition_uuids: succeeded + .state + .wrote_partition_uuids + .clone() + .into_iter() + .collect(), + derivative_want_ids: vec![], }, JobRun::Failed(failed) => JobRunDetail { id: failed.info.id.clone(), @@ -322,6 +494,10 @@ impl JobRun { last_heartbeat_at: None, building_partitions: failed.info.building_partitions.clone(), servicing_wants: failed.info.servicing_wants.clone(), + read_deps: vec![], + read_partition_uuids: HashMap::new(), + wrote_partition_uuids: HashMap::new(), + derivative_want_ids: vec![], }, JobRun::DepMiss(dep_miss) => JobRunDetail { id: dep_miss.info.id.clone(), @@ -329,6 +505,12 @@ impl JobRun { last_heartbeat_at: None, building_partitions: dep_miss.info.building_partitions.clone(), servicing_wants: dep_miss.info.servicing_wants.clone(), + read_deps: dep_miss.state.read_deps.clone(), + read_partition_uuids: HashMap::new(), + wrote_partition_uuids: HashMap::new(), + // Note: derivative_want_ids would need to be populated from BuildState + // since the job doesn't track which wants it spawned (BEL does) + derivative_want_ids: vec![], }, JobRun::Canceled(canceled) => JobRunDetail { id: canceled.info.id.clone(), @@ -336,6 +518,10 @@ impl JobRun { last_heartbeat_at: None, building_partitions: canceled.info.building_partitions.clone(), servicing_wants: canceled.info.servicing_wants.clone(), + read_deps: vec![], + read_partition_uuids: HashMap::new(), + wrote_partition_uuids: HashMap::new(), + derivative_want_ids: vec![], }, } } diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index a27822d..cff4f24 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -1073,6 +1073,8 @@ echo 'Beta succeeded' comment: None, status: None, last_updated_timestamp: 0, + job_run_ids: vec![], + derivative_want_ids: vec![], } } diff --git a/databuild/partition_state.rs b/databuild/partition_state.rs index b310344..6438efe 100644 --- a/databuild/partition_state.rs +++ b/databuild/partition_state.rs @@ -1,3 +1,4 @@ +use crate::util::{HasRelatedIds, RelatedIds}; use crate::{PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; @@ -58,6 +59,8 @@ pub struct UpstreamFailedState { pub struct TaintedState { pub tainted_at: u64, pub taint_ids: Vec, + /// Job run that originally built this partition (before it was tainted) + pub built_by: String, } /// Generic partition struct parameterized by state. @@ -250,6 +253,7 @@ impl PartitionWithState { state: TaintedState { tainted_at: timestamp, taint_ids: vec![taint_id], + built_by: self.state.built_by, }, } } @@ -337,9 +341,57 @@ impl Partition { pub fn is_tainted(&self) -> bool { matches!(self, Partition::Tainted(_)) } +} +// ==================== HasRelatedIds trait implementation ==================== + +impl HasRelatedIds for Partition { + /// Get the IDs of all entities this partition references. + /// Note: downstream_partition_uuids and want_ids come from BuildState indexes, + /// not from Partition itself. + fn related_ids(&self) -> RelatedIds { + // Job run ID from the builder (for states that track it) + let job_run_ids: Vec = match self { + Partition::Building(p) => vec![p.state.job_run_id.clone()], + Partition::UpstreamBuilding(p) => vec![p.state.job_run_id.clone()], + Partition::UpForRetry(p) => vec![p.state.original_job_run_id.clone()], + Partition::Live(p) => vec![p.state.built_by.clone()], + Partition::Failed(p) => vec![p.state.failed_by.clone()], + Partition::UpstreamFailed(_) => vec![], + Partition::Tainted(p) => vec![p.state.built_by.clone()], + }; + + // Partition refs from missing deps (for UpstreamBuilding state) + let partition_refs: Vec = match self { + Partition::UpstreamBuilding(p) => p + .state + .missing_deps + .iter() + .map(|d| d.r#ref.clone()) + .collect(), + Partition::UpstreamFailed(p) => p + .state + .failed_upstream_refs + .iter() + .map(|d| d.r#ref.clone()) + .collect(), + _ => vec![], + }; + + RelatedIds { + partition_refs, + partition_uuids: vec![], + job_run_ids, + want_ids: vec![], + } + } +} + +impl Partition { /// Convert to PartitionDetail for API responses and queries. - /// Note: want_ids is now empty - this will be populated by BuildState from the inverted index. + /// Note: want_ids and downstream_partition_uuids are empty here and will be + /// populated by BuildState from its inverted indexes. + /// Upstream lineage is resolved via built_by_job_run_id → job run's read_deps. pub fn to_detail(&self) -> PartitionDetail { match self { Partition::Building(p) => PartitionDetail { @@ -353,6 +405,8 @@ impl Partition { taint_ids: vec![], last_updated_timestamp: None, uuid: p.uuid.to_string(), + built_by_job_run_id: None, + downstream_partition_uuids: vec![], // Populated by BuildState }, Partition::UpstreamBuilding(p) => PartitionDetail { r#ref: Some(p.partition_ref.clone()), @@ -365,6 +419,8 @@ impl Partition { taint_ids: vec![], last_updated_timestamp: None, uuid: p.uuid.to_string(), + built_by_job_run_id: None, + downstream_partition_uuids: vec![], // Populated by BuildState }, Partition::UpForRetry(p) => PartitionDetail { r#ref: Some(p.partition_ref.clone()), @@ -377,6 +433,8 @@ impl Partition { taint_ids: vec![], last_updated_timestamp: None, uuid: p.uuid.to_string(), + built_by_job_run_id: None, + downstream_partition_uuids: vec![], // Populated by BuildState }, Partition::Live(p) => PartitionDetail { r#ref: Some(p.partition_ref.clone()), @@ -389,6 +447,8 @@ impl Partition { taint_ids: vec![], last_updated_timestamp: Some(p.state.built_at), uuid: p.uuid.to_string(), + built_by_job_run_id: Some(p.state.built_by.clone()), + downstream_partition_uuids: vec![], // Populated by BuildState }, Partition::Failed(p) => PartitionDetail { r#ref: Some(p.partition_ref.clone()), @@ -401,6 +461,8 @@ impl Partition { taint_ids: vec![], last_updated_timestamp: Some(p.state.failed_at), uuid: p.uuid.to_string(), + built_by_job_run_id: None, + downstream_partition_uuids: vec![], // Populated by BuildState }, Partition::UpstreamFailed(p) => PartitionDetail { r#ref: Some(p.partition_ref.clone()), @@ -413,6 +475,8 @@ impl Partition { taint_ids: vec![], last_updated_timestamp: Some(p.state.failed_at), uuid: p.uuid.to_string(), + built_by_job_run_id: None, + downstream_partition_uuids: vec![], // Populated by BuildState }, Partition::Tainted(p) => PartitionDetail { r#ref: Some(p.partition_ref.clone()), @@ -421,10 +485,12 @@ impl Partition { name: "PartitionTainted".to_string(), }), want_ids: vec![], // Populated by BuildState - job_run_ids: vec![], + job_run_ids: vec![p.state.built_by.clone()], taint_ids: p.state.taint_ids.clone(), last_updated_timestamp: Some(p.state.tainted_at), uuid: p.uuid.to_string(), + built_by_job_run_id: Some(p.state.built_by.clone()), + downstream_partition_uuids: vec![], // Populated by BuildState }, } } diff --git a/databuild/util.rs b/databuild/util.rs index ab5e6c7..3b295a3 100644 --- a/databuild/util.rs +++ b/databuild/util.rs @@ -1,5 +1,57 @@ use std::backtrace::Backtrace; use std::time::{SystemTime, UNIX_EPOCH}; +use uuid::Uuid; + +// ============================================================================ +// Related IDs - for building RelatedEntities index in API responses +// ============================================================================ + +/// IDs of related entities that an object references. +/// Used by the query layer to build the RelatedEntities index for API responses. +#[derive(Debug, Clone, Default)] +pub struct RelatedIds { + /// Partition refs (e.g., from Want.partitions, JobRun.building_partitions) + pub partition_refs: Vec, + /// Partition UUIDs (e.g., from read/write lineage, consumer index) + pub partition_uuids: Vec, + /// Job run IDs (e.g., from built_by, consumer jobs) + pub job_run_ids: Vec, + /// Want IDs (e.g., derivative wants, upstream wants, servicing wants) + pub want_ids: Vec, +} + +impl RelatedIds { + /// Merge another RelatedIds into this one, deduplicating + pub fn merge(&mut self, other: RelatedIds) { + for r in other.partition_refs { + if !self.partition_refs.contains(&r) { + self.partition_refs.push(r); + } + } + for u in other.partition_uuids { + if !self.partition_uuids.contains(&u) { + self.partition_uuids.push(u); + } + } + for j in other.job_run_ids { + if !self.job_run_ids.contains(&j) { + self.job_run_ids.push(j); + } + } + for w in other.want_ids { + if !self.want_ids.contains(&w) { + self.want_ids.push(w); + } + } + } +} + +/// Trait for entities that can report their related entity IDs. +/// Used by the query layer to build RelatedEntities indexes for API responses. +/// Implementing types: Want, JobRun, Partition +pub trait HasRelatedIds { + fn related_ids(&self) -> RelatedIds; +} pub fn current_timestamp() -> u64 { let now = SystemTime::now(); @@ -89,3 +141,173 @@ impl std::fmt::Display for DatabuildError { write!(f, "{}", self.msg) } } + +// ============================================================================ +// Test Scenarios - reusable BEL event sequences for testing +// ============================================================================ + +#[cfg(test)] +pub mod test_scenarios { + use crate::data_build_event::Event; + use crate::{ + JobRunBufferEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, + JobRunSuccessEventV1, MissingDeps, PartitionRef, ReadDeps, WantAttributedPartitions, + WantCreateEventV1, + }; + + /// IDs used in the multihop scenario for easy reference in tests + pub struct MultihopIds { + pub beta_want_id: String, + pub alpha_want_id: String, + pub beta_job_1_id: String, + pub beta_job_2_id: String, + pub alpha_job_id: String, + } + + impl Default for MultihopIds { + fn default() -> Self { + Self { + beta_want_id: "beta-want".to_string(), + alpha_want_id: "alpha-want".to_string(), + beta_job_1_id: "beta-job-1".to_string(), + beta_job_2_id: "beta-job-2".to_string(), + alpha_job_id: "alpha-job".to_string(), + } + } + } + + /// Creates a multihop dependency scenario: + /// 1. Want for data/beta is created + /// 2. Job beta-job-1 starts, discovers missing dep on data/alpha + /// 3. Derivative want for data/alpha is created + /// 4. Job alpha-job builds data/alpha successfully + /// 5. Job beta-job-2 retries and succeeds + /// + /// This exercises: want creation, job buffering, dep-miss, derivative wants, + /// job success with read_deps, and retry logic. + pub fn multihop_scenario() -> (Vec, MultihopIds) { + let ids = MultihopIds::default(); + let mut events = vec![]; + + // 1. Create want for data/beta + events.push(Event::WantCreateV1(WantCreateEventV1 { + want_id: ids.beta_want_id.clone(), + partitions: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + ..Default::default() + })); + + // 2. Queue beta job (first attempt) + events.push(Event::JobRunBufferV1(JobRunBufferEventV1 { + job_run_id: ids.beta_job_1_id.clone(), + job_label: "//job_beta".to_string(), + want_attributed_partitions: vec![WantAttributedPartitions { + want_id: ids.beta_want_id.clone(), + partitions: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + }], + building_partitions: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + ..Default::default() + })); + + // 3. Beta job starts running + events.push(Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 { + job_run_id: ids.beta_job_1_id.clone(), + ..Default::default() + })); + + // 4. Beta job reports missing dependency on data/alpha + events.push(Event::JobRunMissingDepsV1(JobRunMissingDepsEventV1 { + job_run_id: ids.beta_job_1_id.clone(), + missing_deps: vec![MissingDeps { + impacted: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + missing: vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }], + }], + ..Default::default() + })); + + // 5. Create derivative want for data/alpha + events.push(Event::WantCreateV1(WantCreateEventV1 { + want_id: ids.alpha_want_id.clone(), + partitions: vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }], + ..Default::default() + })); + + // 6. Queue alpha job + events.push(Event::JobRunBufferV1(JobRunBufferEventV1 { + job_run_id: ids.alpha_job_id.clone(), + job_label: "//job_alpha".to_string(), + want_attributed_partitions: vec![WantAttributedPartitions { + want_id: ids.alpha_want_id.clone(), + partitions: vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }], + }], + building_partitions: vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }], + ..Default::default() + })); + + // 7. Alpha job starts running + events.push(Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 { + job_run_id: ids.alpha_job_id.clone(), + ..Default::default() + })); + + // 8. Alpha job succeeds (no read deps for leaf node) + events.push(Event::JobRunSuccessV1(JobRunSuccessEventV1 { + job_run_id: ids.alpha_job_id.clone(), + read_deps: vec![], + ..Default::default() + })); + + // 9. Queue beta job again (second attempt - retry) + events.push(Event::JobRunBufferV1(JobRunBufferEventV1 { + job_run_id: ids.beta_job_2_id.clone(), + job_label: "//job_beta".to_string(), + want_attributed_partitions: vec![WantAttributedPartitions { + want_id: ids.beta_want_id.clone(), + partitions: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + }], + building_partitions: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + ..Default::default() + })); + + // 10. Beta job 2 starts running + events.push(Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 { + job_run_id: ids.beta_job_2_id.clone(), + ..Default::default() + })); + + // 11. Beta job 2 succeeds with read_deps showing it read data/alpha + events.push(Event::JobRunSuccessV1(JobRunSuccessEventV1 { + job_run_id: ids.beta_job_2_id.clone(), + read_deps: vec![ReadDeps { + impacted: vec![PartitionRef { + r#ref: "data/beta".to_string(), + }], + read: vec![PartitionRef { + r#ref: "data/alpha".to_string(), + }], + }], + ..Default::default() + })); + + (events, ids) + } +} diff --git a/databuild/want_state.rs b/databuild/want_state.rs index e31fea0..05677b3 100644 --- a/databuild/want_state.rs +++ b/databuild/want_state.rs @@ -1,5 +1,5 @@ use crate::partition_state::FailedPartitionRef; -use crate::util::current_timestamp; +use crate::util::{HasRelatedIds, RelatedIds, current_timestamp}; use crate::{EventSource, PartitionRef, WantCreateEventV1, WantDetail, WantStatusCode}; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -464,6 +464,35 @@ impl WantWithState { } } +// ==================== HasRelatedIds trait implementation ==================== + +impl HasRelatedIds for Want { + /// Get the IDs of all entities this want references. + /// Note: job_run_ids come from inverted indexes in BuildState, not from Want itself. + fn related_ids(&self) -> RelatedIds { + let partition_refs = self + .want() + .partitions + .iter() + .map(|p| p.r#ref.clone()) + .collect(); + + // Collect want IDs from state-specific relationships + let want_ids = match self { + Want::UpstreamBuilding(w) => w.state.upstream_want_ids.clone(), + Want::UpstreamFailed(w) => w.state.failed_wants.clone(), + _ => vec![], + }; + + RelatedIds { + partition_refs, + partition_uuids: vec![], + job_run_ids: vec![], + want_ids, + } + } +} + // Helper methods on the Want enum impl Want { /// Create a new want in the Idle state @@ -522,7 +551,9 @@ impl Want { } } - /// Convert to WantDetail for API responses and queries + /// Convert to WantDetail for API responses and queries. + /// Note: job_run_ids and derivative_want_ids are empty here and will be + /// populated by BuildState from its inverted indexes. pub fn to_detail(&self) -> WantDetail { WantDetail { want_id: self.want().want_id.clone(), @@ -544,6 +575,8 @@ impl Want { Want::UpstreamFailed(_) => Some(WantStatusCode::WantUpstreamFailed.into()), Want::Canceled(_) => Some(WantStatusCode::WantCanceled.into()), }, + job_run_ids: vec![], // Populated by BuildState + derivative_want_ids: vec![], // Populated by BuildState } } } diff --git a/databuild/web/templates.rs b/databuild/web/templates.rs index d99f472..5b95e96 100644 --- a/databuild/web/templates.rs +++ b/databuild/web/templates.rs @@ -3,7 +3,7 @@ use askama::Template; use crate::{ - JobRunDetail, JobRunStatus, PartitionDetail, PartitionRef, PartitionStatus, + JobRunDetail, JobRunStatus, PartitionDetail, PartitionRef, PartitionStatus, ReadDeps, WantAttributedPartitions, WantDetail, WantStatus, }; @@ -84,6 +84,9 @@ pub struct WantDetailView { pub comment_display: String, pub status: Option, pub last_updated_timestamp: u64, + // Lineage fields + pub job_run_ids: Vec, + pub derivative_want_ids: Vec, } impl From<&WantDetail> for WantDetailView { @@ -99,6 +102,8 @@ impl From<&WantDetail> for WantDetailView { comment_display: w.comment.as_deref().unwrap_or("-").to_string(), status: w.status.as_ref().map(WantStatusView::from), last_updated_timestamp: w.last_updated_timestamp, + job_run_ids: w.job_run_ids.clone(), + derivative_want_ids: w.derivative_want_ids.clone(), } } } @@ -119,6 +124,9 @@ pub struct PartitionDetailView { pub want_ids: Vec, pub taint_ids: Vec, pub uuid: String, + // Lineage fields + pub built_by_job_run_id: Option, + pub downstream_partition_uuids: Vec, } impl From<&PartitionDetail> for PartitionDetailView { @@ -141,6 +149,8 @@ impl From<&PartitionDetail> for PartitionDetailView { want_ids: p.want_ids.clone(), taint_ids: p.taint_ids.clone(), uuid: p.uuid.clone(), + built_by_job_run_id: p.built_by_job_run_id.clone(), + downstream_partition_uuids: p.downstream_partition_uuids.clone(), } } } @@ -165,16 +175,65 @@ impl From<&WantAttributedPartitions> for WantAttributedPartitionsView { } } +/// View for read dependency entries (impacted → read relationships) +pub struct ReadDepsView { + pub impacted: Vec, + pub read: Vec, +} + +impl From<&ReadDeps> for ReadDepsView { + fn from(rd: &ReadDeps) -> Self { + Self { + impacted: rd.impacted.iter().map(PartitionRefView::from).collect(), + read: rd.read.iter().map(PartitionRefView::from).collect(), + } + } +} + +/// View for partition ref with its resolved UUID (for lineage display) +pub struct PartitionRefWithUuidView { + pub partition_ref: String, + pub partition_ref_encoded: String, + pub uuid: String, +} + pub struct JobRunDetailView { pub id: String, pub status: Option, pub last_heartbeat_at: Option, pub building_partitions: Vec, pub servicing_wants: Vec, + // Lineage fields (populated for Succeeded/DepMiss states) + pub read_deps: Vec, + pub read_partitions: Vec, + pub wrote_partitions: Vec, + pub derivative_want_ids: Vec, } impl From<&JobRunDetail> for JobRunDetailView { fn from(jr: &JobRunDetail) -> Self { + // Build read_partitions from read_partition_uuids map + let read_partitions: Vec = jr + .read_partition_uuids + .iter() + .map(|(partition_ref, uuid)| PartitionRefWithUuidView { + partition_ref: partition_ref.clone(), + partition_ref_encoded: urlencoding::encode(partition_ref).into_owned(), + uuid: uuid.clone(), + }) + .collect(); + + // Build wrote_partitions from wrote_partition_uuids map + let wrote_partitions: Vec = jr + .wrote_partition_uuids + .iter() + .map(|(partition_ref, uuid)| PartitionRefWithUuidView { + partition_ref: partition_ref.clone(), + partition_ref_encoded: urlencoding::encode(partition_ref).into_owned(), + uuid: uuid.clone(), + }) + .collect(); + Self { id: jr.id.clone(), status: jr.status.as_ref().map(JobRunStatusView::from), @@ -189,6 +248,10 @@ impl From<&JobRunDetail> for JobRunDetailView { .iter() .map(WantAttributedPartitionsView::from) .collect(), + read_deps: jr.read_deps.iter().map(ReadDepsView::from).collect(), + read_partitions, + wrote_partitions, + derivative_want_ids: jr.derivative_want_ids.clone(), } } } @@ -361,3 +424,91 @@ pub struct JobRunDetailPage { pub struct WantCreatePage { pub base: BaseContext, } + +// ============================================================================= +// Tests +// ============================================================================= + +#[cfg(test)] +mod tests { + use super::*; + use crate::build_state::BuildState; + use crate::util::test_scenarios::multihop_scenario; + use askama::Template; + + /// Helper to replay events into a fresh BuildState + fn build_state_from_events(events: &[crate::data_build_event::Event]) -> BuildState { + let mut state = BuildState::default(); + for event in events { + state.handle_event(event); + } + state + } + + mod want_detail_page { + use super::*; + + /// Tests that the want detail page shows the job runs that serviced this want. + /// This is the "Fulfillment - Job Runs" section in the UI. + /// + /// Given: The multihop scenario completes (beta job dep-misses, alpha built, beta retries) + /// When: We render the beta want detail page + /// Then: It should show both beta job runs (beta-job-1 and beta-job-2) + #[test] + fn test_shows_servicing_job_runs() { + let (events, ids) = multihop_scenario(); + let state = build_state_from_events(&events); + + // Get beta want and render its detail page + let want_detail = state + .get_want(&ids.beta_want_id) + .expect("beta want should exist"); + let template = WantDetailPage { + base: BaseContext::default(), + want: WantDetailView::from(want_detail), + }; + let html = template.render().expect("template should render"); + + // Verify the Fulfillment section exists and contains both job run IDs + assert!( + html.contains(&ids.beta_job_1_id), + "Should show beta-job-1 in fulfillment section. HTML:\n{}", + html + ); + assert!( + html.contains(&ids.beta_job_2_id), + "Should show beta-job-2 in fulfillment section. HTML:\n{}", + html + ); + } + + /// Tests that the want detail page shows derivative wants spawned by dep-miss. + /// This is the "Fulfillment - Derivative Wants" section in the UI. + /// + /// Given: The multihop scenario completes (beta job dep-misses, spawning alpha want) + /// When: We render the beta want detail page + /// Then: It should show the alpha want as a derivative + #[test] + fn test_shows_derivative_wants() { + let (events, ids) = multihop_scenario(); + let state = build_state_from_events(&events); + + // Get beta want and render its detail page + let want_detail = state + .get_want(&ids.beta_want_id) + .expect("beta want should exist"); + let template = WantDetailPage { + base: BaseContext::default(), + want: WantDetailView::from(want_detail), + }; + let html = template.render().expect("template should render"); + + // Verify the Fulfillment section exists and contains the derivative want + assert!( + html.contains(&ids.alpha_want_id), + "Should show alpha-want as derivative want. HTML:\n{}", + html + ); + } + } +} diff --git a/databuild/web/templates/job_runs/detail.html b/databuild/web/templates/job_runs/detail.html index a398114..b487db3 100644 --- a/databuild/web/templates/job_runs/detail.html +++ b/databuild/web/templates/job_runs/detail.html @@ -54,4 +54,43 @@ {% endif %} +{% if !job_run.read_partitions.is_empty() %} +
+

Read Partitions ({{ job_run.read_partitions.len() }})

+
    + {% for p in job_run.read_partitions %} +
  • + {{ p.partition_ref }} + uuid: {{ p.uuid }} +
  • + {% endfor %} +
+
+{% endif %} + +{% if !job_run.wrote_partitions.is_empty() %} +
+

Wrote Partitions ({{ job_run.wrote_partitions.len() }})

+
    + {% for p in job_run.wrote_partitions %} +
  • + {{ p.partition_ref }} + uuid: {{ p.uuid }} +
  • + {% endfor %} +
+
+{% endif %} + +{% if !job_run.derivative_want_ids.is_empty() %} +
+

Derivative Wants ({{ job_run.derivative_want_ids.len() }})

+
    + {% for id in job_run.derivative_want_ids %} +
  • {{ id }}
  • + {% endfor %} +
+
+{% endif %} + {% call base::footer() %} diff --git a/databuild/web/templates/partitions/detail.html b/databuild/web/templates/partitions/detail.html index 8c9112e..28689bf 100644 --- a/databuild/web/templates/partitions/detail.html +++ b/databuild/web/templates/partitions/detail.html @@ -32,9 +32,34 @@ +{% match partition.built_by_job_run_id %} +{% when Some with (job_run_id) %} +
+

Lineage - Built By

+

+ {{ job_run_id }} + (view job run for input partitions) +

+
+{% when None %} +{% endmatch %} + +{% if !partition.downstream_partition_uuids.is_empty() %} +
+

Lineage - Downstream Consumers ({{ partition.downstream_partition_uuids.len() }})

+
    + {% for uuid in partition.downstream_partition_uuids %} +
  • + {{ uuid }} +
  • + {% endfor %} +
+
+{% endif %} + {% if !partition.job_run_ids.is_empty() %}
-

Job Runs ({{ partition.job_run_ids.len() }})

+

Related Job Runs ({{ partition.job_run_ids.len() }})

    {% for id in partition.job_run_ids %}
  • {{ id }}
  • diff --git a/databuild/web/templates/wants/detail.html b/databuild/web/templates/wants/detail.html index 4eab6d3..9e38120 100644 --- a/databuild/web/templates/wants/detail.html +++ b/databuild/web/templates/wants/detail.html @@ -65,4 +65,26 @@
{% endif %} +{% if !want.job_run_ids.is_empty() %} +
+

Fulfillment - Job Runs ({{ want.job_run_ids.len() }})

+
    + {% for id in want.job_run_ids %} +
  • {{ id }}
  • + {% endfor %} +
+
+{% endif %} + +{% if !want.derivative_want_ids.is_empty() %} +
+

Fulfillment - Derivative Wants ({{ want.derivative_want_ids.len() }})

+
    + {% for id in want.derivative_want_ids %} +
  • {{ id }}
  • + {% endfor %} +
+
+{% endif %} + {% call base::footer() %} diff --git a/examples/multihop/databuild.json b/examples/multihop/databuild.json index 43fc22b..11772b2 100644 --- a/examples/multihop/databuild.json +++ b/examples/multihop/databuild.json @@ -3,7 +3,7 @@ "jobs": [ { "label": "//examples/multihop:job_alpha", - "entrypoint": "./examples/multihop/job_alpha.sh", + "entrypoint": "./job_alpha.sh", "environment": { "JOB_NAME": "alpha" }, @@ -11,7 +11,7 @@ }, { "label": "//examples/multihop:job_beta", - "entrypoint": "./examples/multihop/job_beta.sh", + "entrypoint": "./job_beta.sh", "environment": { "JOB_NAME": "beta" },