parent
f353660f97
commit
17d5987517
17 changed files with 1124 additions and 138 deletions
|
|
@ -340,18 +340,51 @@ impl BuildState {
|
|||
}
|
||||
|
||||
pub(crate) fn handle_job_run_success(&mut self, event: &JobRunSuccessEventV1) -> Vec<Event> {
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
|
||||
"BUG: Job run {} must exist when success event received",
|
||||
event.job_run_id
|
||||
));
|
||||
|
||||
// Resolve read partition UUIDs from the read_deps in the event
|
||||
let mut read_partition_uuids: BTreeMap<String, String> = BTreeMap::new();
|
||||
for read_dep in &event.read_deps {
|
||||
for read_ref in &read_dep.read {
|
||||
if let Some(uuid) = self.get_canonical_partition_uuid(&read_ref.r#ref) {
|
||||
read_partition_uuids.insert(read_ref.r#ref.clone(), uuid.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get the building partitions from the job run to resolve wrote_partition_uuids
|
||||
let building_partitions = match &job_run {
|
||||
JobRun::Running(running) => running.info.building_partitions.clone(),
|
||||
_ => vec![],
|
||||
};
|
||||
|
||||
// Resolve wrote partition UUIDs - these will be set when we transition partitions to Live
|
||||
// For now, we compute them based on job_run_id (same logic as transition_partitions_to_live)
|
||||
let mut wrote_partition_uuids: BTreeMap<String, String> = BTreeMap::new();
|
||||
for pref in &building_partitions {
|
||||
let uuid =
|
||||
crate::partition_state::derive_partition_uuid(&event.job_run_id, &pref.r#ref);
|
||||
wrote_partition_uuids.insert(pref.r#ref.clone(), uuid.to_string());
|
||||
}
|
||||
|
||||
let succeeded = match job_run {
|
||||
JobRun::Running(running) => {
|
||||
tracing::info!(
|
||||
job_run_id = %event.job_run_id,
|
||||
read_deps_count = event.read_deps.len(),
|
||||
"JobRun: Running → Succeeded"
|
||||
);
|
||||
running.succeed(current_timestamp())
|
||||
running.succeed(
|
||||
current_timestamp(),
|
||||
event.read_deps.clone(),
|
||||
read_partition_uuids.clone(),
|
||||
wrote_partition_uuids.clone(),
|
||||
)
|
||||
}
|
||||
_ => {
|
||||
panic!(
|
||||
|
|
@ -361,6 +394,34 @@ impl BuildState {
|
|||
}
|
||||
};
|
||||
|
||||
// Populate the consumer index from read_deps (using UUIDs for historical lineage)
|
||||
// For each read partition UUID, record that the written partition UUIDs consumed it
|
||||
for read_dep in &event.read_deps {
|
||||
for read_ref in &read_dep.read {
|
||||
// Look up the read partition's UUID
|
||||
if let Some(read_uuid) = read_partition_uuids.get(&read_ref.r#ref) {
|
||||
if let Ok(read_uuid) = uuid::Uuid::parse_str(read_uuid) {
|
||||
let consumers = self
|
||||
.partition_consumers
|
||||
.entry(read_uuid)
|
||||
.or_insert_with(Vec::new);
|
||||
for impacted_ref in &read_dep.impacted {
|
||||
// Look up the impacted (output) partition's UUID
|
||||
if let Some(wrote_uuid) = wrote_partition_uuids.get(&impacted_ref.r#ref)
|
||||
{
|
||||
if let Ok(wrote_uuid) = uuid::Uuid::parse_str(wrote_uuid) {
|
||||
let entry = (wrote_uuid, event.job_run_id.clone());
|
||||
if !consumers.contains(&entry) {
|
||||
consumers.push(entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Job run success is SOURCE of truth that partitions are live
|
||||
let newly_live_partitions = succeeded.get_completed_partitions();
|
||||
|
||||
|
|
@ -642,7 +703,6 @@ impl BuildState {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{MissingDeps, WantAttributedPartitions};
|
||||
|
||||
mod want {
|
||||
use super::*;
|
||||
|
|
@ -686,133 +746,25 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_multihop_dependency_replay() {
|
||||
use crate::{
|
||||
JobRunBufferEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1,
|
||||
JobRunSuccessEventV1, MissingDeps, PartitionRef, WantAttributedPartitions,
|
||||
WantCreateEventV1,
|
||||
};
|
||||
use crate::util::test_scenarios::multihop_scenario;
|
||||
|
||||
let mut state = BuildState::default();
|
||||
let mut events = vec![];
|
||||
|
||||
// 1. Create want for data/beta
|
||||
let beta_want_id = "beta-want".to_string();
|
||||
let mut create_beta = WantCreateEventV1::default();
|
||||
create_beta.want_id = beta_want_id.clone();
|
||||
create_beta.partitions = vec![PartitionRef {
|
||||
r#ref: "data/beta".to_string(),
|
||||
}];
|
||||
events.push(Event::WantCreateV1(create_beta));
|
||||
|
||||
// 2. Queue beta job (first attempt)
|
||||
let beta_job_1_id = "beta-job-1".to_string();
|
||||
let mut buffer_beta_1 = JobRunBufferEventV1::default();
|
||||
buffer_beta_1.job_run_id = beta_job_1_id.clone();
|
||||
buffer_beta_1.job_label = "//job_beta".to_string();
|
||||
buffer_beta_1.want_attributed_partitions = vec![WantAttributedPartitions {
|
||||
want_id: beta_want_id.clone(),
|
||||
partitions: vec![PartitionRef {
|
||||
r#ref: "data/beta".to_string(),
|
||||
}],
|
||||
}];
|
||||
buffer_beta_1.building_partitions = vec![PartitionRef {
|
||||
r#ref: "data/beta".to_string(),
|
||||
}];
|
||||
events.push(Event::JobRunBufferV1(buffer_beta_1));
|
||||
|
||||
// 3. Beta job starts running
|
||||
let mut heartbeat_beta_1 = JobRunHeartbeatEventV1::default();
|
||||
heartbeat_beta_1.job_run_id = beta_job_1_id.clone();
|
||||
events.push(Event::JobRunHeartbeatV1(heartbeat_beta_1));
|
||||
|
||||
// 4. Beta job reports missing dependency on data/alpha
|
||||
let mut dep_miss_beta_1 = JobRunMissingDepsEventV1::default();
|
||||
dep_miss_beta_1.job_run_id = beta_job_1_id.clone();
|
||||
dep_miss_beta_1.missing_deps = vec![MissingDeps {
|
||||
impacted: vec![PartitionRef {
|
||||
r#ref: "data/beta".to_string(),
|
||||
}],
|
||||
missing: vec![PartitionRef {
|
||||
r#ref: "data/alpha".to_string(),
|
||||
}],
|
||||
}];
|
||||
events.push(Event::JobRunMissingDepsV1(dep_miss_beta_1));
|
||||
|
||||
// 5. Create derivative want for data/alpha
|
||||
let alpha_want_id = "alpha-want".to_string();
|
||||
let mut create_alpha = WantCreateEventV1::default();
|
||||
create_alpha.want_id = alpha_want_id.clone();
|
||||
create_alpha.partitions = vec![PartitionRef {
|
||||
r#ref: "data/alpha".to_string(),
|
||||
}];
|
||||
events.push(Event::WantCreateV1(create_alpha));
|
||||
|
||||
// 6. Queue alpha job
|
||||
let alpha_job_id = "alpha-job".to_string();
|
||||
let mut buffer_alpha = JobRunBufferEventV1::default();
|
||||
buffer_alpha.job_run_id = alpha_job_id.clone();
|
||||
buffer_alpha.job_label = "//job_alpha".to_string();
|
||||
buffer_alpha.want_attributed_partitions = vec![WantAttributedPartitions {
|
||||
want_id: alpha_want_id.clone(),
|
||||
partitions: vec![PartitionRef {
|
||||
r#ref: "data/alpha".to_string(),
|
||||
}],
|
||||
}];
|
||||
buffer_alpha.building_partitions = vec![PartitionRef {
|
||||
r#ref: "data/alpha".to_string(),
|
||||
}];
|
||||
events.push(Event::JobRunBufferV1(buffer_alpha));
|
||||
|
||||
// 7. Alpha job starts running
|
||||
let mut heartbeat_alpha = JobRunHeartbeatEventV1::default();
|
||||
heartbeat_alpha.job_run_id = alpha_job_id.clone();
|
||||
events.push(Event::JobRunHeartbeatV1(heartbeat_alpha));
|
||||
|
||||
// 8. Alpha job succeeds
|
||||
let mut success_alpha = JobRunSuccessEventV1::default();
|
||||
success_alpha.job_run_id = alpha_job_id.clone();
|
||||
events.push(Event::JobRunSuccessV1(success_alpha));
|
||||
|
||||
// 9. Queue beta job again (second attempt) - THIS IS THE CRITICAL MOMENT
|
||||
let beta_job_2_id = "beta-job-2".to_string();
|
||||
let mut buffer_beta_2 = JobRunBufferEventV1::default();
|
||||
buffer_beta_2.job_run_id = beta_job_2_id.clone();
|
||||
buffer_beta_2.job_label = "//job_beta".to_string();
|
||||
buffer_beta_2.want_attributed_partitions = vec![WantAttributedPartitions {
|
||||
want_id: beta_want_id.clone(),
|
||||
partitions: vec![PartitionRef {
|
||||
r#ref: "data/beta".to_string(),
|
||||
}],
|
||||
}];
|
||||
buffer_beta_2.building_partitions = vec![PartitionRef {
|
||||
r#ref: "data/beta".to_string(),
|
||||
}];
|
||||
events.push(Event::JobRunBufferV1(buffer_beta_2));
|
||||
|
||||
// 10. Beta job starts running
|
||||
let mut heartbeat_beta_2 = JobRunHeartbeatEventV1::default();
|
||||
heartbeat_beta_2.job_run_id = beta_job_2_id.clone();
|
||||
events.push(Event::JobRunHeartbeatV1(heartbeat_beta_2));
|
||||
|
||||
// 11. Beta job succeeds
|
||||
let mut success_beta_2 = JobRunSuccessEventV1::default();
|
||||
success_beta_2.job_run_id = beta_job_2_id.clone();
|
||||
events.push(Event::JobRunSuccessV1(success_beta_2));
|
||||
let (events, ids) = multihop_scenario();
|
||||
|
||||
// Process all events - this simulates replay
|
||||
let mut state = BuildState::default();
|
||||
for event in &events {
|
||||
state.handle_event(event);
|
||||
}
|
||||
|
||||
// Verify final state
|
||||
let beta_want = state.get_want(&beta_want_id).unwrap();
|
||||
let beta_want = state.get_want(&ids.beta_want_id).unwrap();
|
||||
assert_eq!(
|
||||
beta_want.status,
|
||||
Some(crate::WantStatusCode::WantSuccessful.into()),
|
||||
"Beta want should be successful after multi-hop dependency resolution"
|
||||
);
|
||||
|
||||
let alpha_want = state.get_want(&alpha_want_id).unwrap();
|
||||
let alpha_want = state.get_want(&ids.alpha_want_id).unwrap();
|
||||
assert_eq!(
|
||||
alpha_want.status,
|
||||
Some(crate::WantStatusCode::WantSuccessful.into()),
|
||||
|
|
|
|||
|
|
@ -61,6 +61,11 @@ pub struct BuildState {
|
|||
// Inverted indexes
|
||||
pub(crate) wants_for_partition: BTreeMap<String, Vec<String>>, // partition ref → want_ids
|
||||
pub(crate) downstream_waiting: BTreeMap<String, Vec<Uuid>>, // upstream ref → partition UUIDs waiting for it
|
||||
|
||||
// Consumer index for lineage queries: input_uuid → list of (output_uuid, job_run_id)
|
||||
// Uses UUIDs (not refs) to preserve historical lineage across partition rebuilds
|
||||
// Populated from read_deps on job success
|
||||
pub(crate) partition_consumers: BTreeMap<Uuid, Vec<(Uuid, String)>>,
|
||||
}
|
||||
|
||||
impl BuildState {
|
||||
|
|
@ -118,6 +123,15 @@ impl BuildState {
|
|||
.unwrap_or(&[])
|
||||
}
|
||||
|
||||
/// Get consumers for a partition UUID (downstream partitions that read this one)
|
||||
/// Returns list of (output_uuid, job_run_id) tuples
|
||||
pub fn get_partition_consumers(&self, uuid: &Uuid) -> &[(Uuid, String)] {
|
||||
self.partition_consumers
|
||||
.get(uuid)
|
||||
.map(|v| v.as_slice())
|
||||
.unwrap_or(&[])
|
||||
}
|
||||
|
||||
/// Register a want in the wants_for_partition inverted index
|
||||
pub(crate) fn register_want_for_partitions(
|
||||
&mut self,
|
||||
|
|
|
|||
|
|
@ -2,10 +2,12 @@
|
|||
//!
|
||||
//! Read-only methods for accessing state (get_*, list_*) used by the API layer.
|
||||
|
||||
use crate::util::{HasRelatedIds, RelatedIds};
|
||||
use crate::{
|
||||
JobRunDetail, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest,
|
||||
ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest,
|
||||
ListWantsResponse, PartitionDetail, TaintDetail, WantDetail,
|
||||
GetJobRunResponse, GetPartitionResponse, GetWantResponse, JobRunDetail, ListJobRunsRequest,
|
||||
ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest,
|
||||
ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, RelatedEntities,
|
||||
TaintDetail, WantDetail,
|
||||
};
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
|
|
@ -49,6 +51,7 @@ impl BuildState {
|
|||
match_count: self.wants.len() as u64,
|
||||
page,
|
||||
page_size,
|
||||
index: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -60,6 +63,7 @@ impl BuildState {
|
|||
match_count: self.wants.len() as u64,
|
||||
page,
|
||||
page_size,
|
||||
index: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -81,6 +85,7 @@ impl BuildState {
|
|||
match_count: self.canonical_partitions.len() as u64,
|
||||
page,
|
||||
page_size,
|
||||
index: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -102,6 +107,7 @@ impl BuildState {
|
|||
match_count: self.job_runs.len() as u64,
|
||||
page,
|
||||
page_size,
|
||||
index: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -116,3 +122,214 @@ fn list_state_items<T: Clone>(map: &BTreeMap<String, T>, page: u64, page_size: u
|
|||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Response builders with RelatedEntities index
|
||||
// ============================================================================
|
||||
|
||||
impl BuildState {
|
||||
/// Resolve RelatedIds to a RelatedEntities index by looking up entities in BuildState.
|
||||
/// This is the central method for building the index from collected IDs.
|
||||
pub fn resolve_related_ids(&self, ids: &RelatedIds) -> RelatedEntities {
|
||||
let mut index = RelatedEntities::default();
|
||||
|
||||
// Resolve partition refs
|
||||
for partition_ref in &ids.partition_refs {
|
||||
if !index.partitions.contains_key(partition_ref) {
|
||||
if let Some(p) = self.get_canonical_partition(partition_ref) {
|
||||
index
|
||||
.partitions
|
||||
.insert(partition_ref.clone(), p.to_detail());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve partition UUIDs
|
||||
for uuid in &ids.partition_uuids {
|
||||
if let Some(p) = self.partitions_by_uuid.get(uuid) {
|
||||
let detail = p.to_detail();
|
||||
if let Some(ref pref) = detail.r#ref {
|
||||
if !index.partitions.contains_key(&pref.r#ref) {
|
||||
index.partitions.insert(pref.r#ref.clone(), detail);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve job run IDs
|
||||
for job_run_id in &ids.job_run_ids {
|
||||
if !index.job_runs.contains_key(job_run_id) {
|
||||
if let Some(jr) = self.job_runs.get(job_run_id) {
|
||||
index.job_runs.insert(job_run_id.clone(), jr.to_detail());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve want IDs
|
||||
for want_id in &ids.want_ids {
|
||||
if !index.wants.contains_key(want_id) {
|
||||
if let Some(w) = self.wants.get(want_id) {
|
||||
index.wants.insert(want_id.clone(), w.to_detail());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
index
|
||||
}
|
||||
|
||||
/// Get a want with its related entities (job runs, partitions)
|
||||
pub fn get_want_with_index(&self, want_id: &str) -> Option<GetWantResponse> {
|
||||
let want = self.wants.get(want_id)?;
|
||||
let want_detail = want.to_detail();
|
||||
let ids = want.related_ids();
|
||||
let index = self.resolve_related_ids(&ids);
|
||||
|
||||
Some(GetWantResponse {
|
||||
data: Some(want_detail),
|
||||
index: Some(index),
|
||||
})
|
||||
}
|
||||
|
||||
/// Get a partition with its related entities (builder job run, downstream consumers)
|
||||
pub fn get_partition_with_index(&self, partition_ref: &str) -> Option<GetPartitionResponse> {
|
||||
let partition = self.get_canonical_partition(partition_ref)?;
|
||||
let partition_detail = partition.to_detail();
|
||||
|
||||
let mut ids = partition.related_ids();
|
||||
|
||||
// Add downstream consumers from the consumer index (not stored on partition)
|
||||
let uuid = partition.uuid();
|
||||
for (output_uuid, job_run_id) in self.get_partition_consumers(&uuid) {
|
||||
if !ids.partition_uuids.contains(output_uuid) {
|
||||
ids.partition_uuids.push(*output_uuid);
|
||||
}
|
||||
if !ids.job_run_ids.contains(job_run_id) {
|
||||
ids.job_run_ids.push(job_run_id.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Add wants that reference this partition (from inverted index)
|
||||
for want_id in self.get_wants_for_partition(partition_ref) {
|
||||
if !ids.want_ids.contains(want_id) {
|
||||
ids.want_ids.push(want_id.clone());
|
||||
}
|
||||
}
|
||||
|
||||
let index = self.resolve_related_ids(&ids);
|
||||
|
||||
Some(GetPartitionResponse {
|
||||
data: Some(partition_detail),
|
||||
index: Some(index),
|
||||
})
|
||||
}
|
||||
|
||||
/// Get a job run with its related entities (read/wrote partitions, derivative wants)
|
||||
pub fn get_job_run_with_index(&self, job_run_id: &str) -> Option<GetJobRunResponse> {
|
||||
let job_run = self.job_runs.get(job_run_id)?;
|
||||
let job_run_detail = job_run.to_detail();
|
||||
let ids = job_run.related_ids();
|
||||
let index = self.resolve_related_ids(&ids);
|
||||
|
||||
Some(GetJobRunResponse {
|
||||
data: Some(job_run_detail),
|
||||
index: Some(index),
|
||||
})
|
||||
}
|
||||
|
||||
/// List wants with related entities index
|
||||
pub fn list_wants_with_index(&self, request: &ListWantsRequest) -> ListWantsResponse {
|
||||
let page = request.page.unwrap_or(0);
|
||||
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
|
||||
let start = page * page_size;
|
||||
|
||||
let wants: Vec<_> = self
|
||||
.wants
|
||||
.values()
|
||||
.skip(start as usize)
|
||||
.take(page_size as usize)
|
||||
.collect();
|
||||
|
||||
// Collect related IDs from all wants
|
||||
let mut all_ids = RelatedIds::default();
|
||||
for want in &wants {
|
||||
all_ids.merge(want.related_ids());
|
||||
}
|
||||
|
||||
let data: Vec<WantDetail> = wants.iter().map(|w| w.to_detail()).collect();
|
||||
let index = self.resolve_related_ids(&all_ids);
|
||||
|
||||
ListWantsResponse {
|
||||
data,
|
||||
match_count: self.wants.len() as u64,
|
||||
page,
|
||||
page_size,
|
||||
index: Some(index),
|
||||
}
|
||||
}
|
||||
|
||||
/// List partitions with related entities index
|
||||
pub fn list_partitions_with_index(
|
||||
&self,
|
||||
request: &ListPartitionsRequest,
|
||||
) -> ListPartitionsResponse {
|
||||
let page = request.page.unwrap_or(0);
|
||||
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
|
||||
let start = page * page_size;
|
||||
|
||||
let partitions: Vec<_> = self
|
||||
.canonical_partitions
|
||||
.iter()
|
||||
.skip(start as usize)
|
||||
.take(page_size as usize)
|
||||
.filter_map(|(_, uuid)| self.partitions_by_uuid.get(uuid))
|
||||
.collect();
|
||||
|
||||
// Collect related IDs from all partitions
|
||||
let mut all_ids = RelatedIds::default();
|
||||
for partition in &partitions {
|
||||
all_ids.merge(partition.related_ids());
|
||||
}
|
||||
|
||||
let data: Vec<PartitionDetail> = partitions.iter().map(|p| p.to_detail()).collect();
|
||||
let index = self.resolve_related_ids(&all_ids);
|
||||
|
||||
ListPartitionsResponse {
|
||||
data,
|
||||
match_count: self.canonical_partitions.len() as u64,
|
||||
page,
|
||||
page_size,
|
||||
index: Some(index),
|
||||
}
|
||||
}
|
||||
|
||||
/// List job runs with related entities index
|
||||
pub fn list_job_runs_with_index(&self, request: &ListJobRunsRequest) -> ListJobRunsResponse {
|
||||
let page = request.page.unwrap_or(0);
|
||||
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
|
||||
let start = page * page_size;
|
||||
|
||||
let job_runs: Vec<_> = self
|
||||
.job_runs
|
||||
.values()
|
||||
.skip(start as usize)
|
||||
.take(page_size as usize)
|
||||
.collect();
|
||||
|
||||
// Collect related IDs from all job runs
|
||||
let mut all_ids = RelatedIds::default();
|
||||
for job_run in &job_runs {
|
||||
all_ids.merge(job_run.related_ids());
|
||||
}
|
||||
|
||||
let data: Vec<JobRunDetail> = job_runs.iter().map(|jr| jr.to_detail()).collect();
|
||||
let index = self.resolve_related_ids(&all_ids);
|
||||
|
||||
ListJobRunsResponse {
|
||||
data,
|
||||
match_count: self.job_runs.len() as u64,
|
||||
page,
|
||||
page_size,
|
||||
index: Some(index),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -85,9 +85,11 @@ message JobRunHeartbeatEventV1 {
|
|||
string job_run_id = 1;
|
||||
// TODO reentrance?
|
||||
}
|
||||
// Simply indicates that the job has succeeded.
|
||||
// Indicates that the job has succeeded, including what data was read.
|
||||
message JobRunSuccessEventV1 {
|
||||
string job_run_id = 1;
|
||||
// The read dependencies for this job run, preserving impacted→read relationships
|
||||
repeated ReadDeps read_deps = 2;
|
||||
}
|
||||
// Simply indicates that the job has failed. Depending on retry logic defined in job, it may retry.
|
||||
message JobRunFailureEventV1 {
|
||||
|
|
@ -212,7 +214,10 @@ message WantDetail {
|
|||
optional string comment = 8;
|
||||
WantStatus status = 9;
|
||||
uint64 last_updated_timestamp = 10;
|
||||
// TODO
|
||||
// Lineage: all job runs that have serviced this want
|
||||
repeated string job_run_ids = 11;
|
||||
// Lineage: derivative wants spawned by this want's job dep-misses
|
||||
repeated string derivative_want_ids = 12;
|
||||
}
|
||||
|
||||
message PartitionDetail {
|
||||
|
|
@ -230,6 +235,11 @@ message PartitionDetail {
|
|||
// The unique identifier for this partition instance (UUID as string)
|
||||
// Each time a partition is built, it gets a new UUID derived from the job_run_id
|
||||
string uuid = 7;
|
||||
// Lineage: job run that built this partition (for Live/Tainted partitions)
|
||||
// Upstream lineage is resolved via this job run's read_deps (job run is source of truth)
|
||||
optional string built_by_job_run_id = 8;
|
||||
// Lineage: downstream partition UUIDs that consumed this (from consumer index)
|
||||
repeated string downstream_partition_uuids = 9;
|
||||
}
|
||||
message PartitionStatus {
|
||||
PartitionStatusCode code = 1;
|
||||
|
|
@ -289,10 +299,26 @@ message JobRunDetail {
|
|||
optional uint64 last_heartbeat_at = 3;
|
||||
repeated PartitionRef building_partitions = 4;
|
||||
repeated WantAttributedPartitions servicing_wants = 5;
|
||||
// Lineage: read dependencies with resolved UUIDs (for Succeeded jobs)
|
||||
repeated ReadDeps read_deps = 6;
|
||||
// Lineage: resolved UUIDs for read partitions (ref → UUID)
|
||||
map<string, string> read_partition_uuids = 7;
|
||||
// Lineage: resolved UUIDs for written partitions (ref → UUID)
|
||||
map<string, string> wrote_partition_uuids = 8;
|
||||
// Lineage: derivative wants spawned by this job's dep-miss (for DepMiss jobs)
|
||||
repeated string derivative_want_ids = 9;
|
||||
}
|
||||
|
||||
|
||||
|
||||
// Related entities index - used in API responses for deduplication and O(1) lookup
|
||||
// Each entity appears once in the index, even if referenced by multiple items in `data`
|
||||
message RelatedEntities {
|
||||
map<string, PartitionDetail> partitions = 1;
|
||||
map<string, JobRunDetail> job_runs = 2;
|
||||
map<string, WantDetail> wants = 3;
|
||||
}
|
||||
|
||||
message EventFilter {
|
||||
// IDs of wants to get relevant events for
|
||||
repeated string want_ids = 1;
|
||||
|
|
@ -308,6 +334,7 @@ message ListWantsResponse {
|
|||
uint64 match_count = 2;
|
||||
uint64 page = 3;
|
||||
uint64 page_size = 4;
|
||||
RelatedEntities index = 5;
|
||||
}
|
||||
|
||||
message ListTaintsRequest {
|
||||
|
|
@ -320,6 +347,7 @@ message ListTaintsResponse {
|
|||
uint64 match_count = 2;
|
||||
uint64 page = 3;
|
||||
uint64 page_size = 4;
|
||||
RelatedEntities index = 5;
|
||||
}
|
||||
|
||||
message ListPartitionsRequest {
|
||||
|
|
@ -332,6 +360,7 @@ message ListPartitionsResponse {
|
|||
uint64 match_count = 2;
|
||||
uint64 page = 3;
|
||||
uint64 page_size = 4;
|
||||
RelatedEntities index = 5;
|
||||
}
|
||||
|
||||
message ListJobRunsRequest {
|
||||
|
|
@ -344,6 +373,7 @@ message ListJobRunsResponse {
|
|||
uint64 match_count = 2;
|
||||
uint64 page = 3;
|
||||
uint64 page_size = 4;
|
||||
RelatedEntities index = 5;
|
||||
}
|
||||
|
||||
message CreateWantRequest {
|
||||
|
|
@ -372,6 +402,23 @@ message GetWantRequest {
|
|||
}
|
||||
message GetWantResponse {
|
||||
WantDetail data = 1;
|
||||
RelatedEntities index = 2;
|
||||
}
|
||||
|
||||
message GetPartitionRequest {
|
||||
string partition_ref = 1;
|
||||
}
|
||||
message GetPartitionResponse {
|
||||
PartitionDetail data = 1;
|
||||
RelatedEntities index = 2;
|
||||
}
|
||||
|
||||
message GetJobRunRequest {
|
||||
string job_run_id = 1;
|
||||
}
|
||||
message GetJobRunResponse {
|
||||
JobRunDetail data = 1;
|
||||
RelatedEntities index = 2;
|
||||
}
|
||||
|
||||
message CreateTaintRequest {
|
||||
|
|
|
|||
|
|
@ -30,6 +30,8 @@ impl From<WantCreateEventV1> for WantDetail {
|
|||
comment: e.comment,
|
||||
status: Some(WantStatusCode::WantIdle.into()),
|
||||
last_updated_timestamp: current_timestamp(),
|
||||
job_run_ids: vec![],
|
||||
derivative_want_ids: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -74,12 +76,17 @@ impl From<WantStatusCode> for WantStatus {
|
|||
|
||||
impl From<JobRunBufferEventV1> for JobRunDetail {
|
||||
fn from(value: JobRunBufferEventV1) -> Self {
|
||||
use std::collections::HashMap;
|
||||
Self {
|
||||
id: value.job_run_id,
|
||||
status: Some(JobRunStatusCode::JobRunQueued.into()),
|
||||
last_heartbeat_at: None,
|
||||
building_partitions: value.building_partitions,
|
||||
servicing_wants: value.want_attributed_partitions,
|
||||
read_deps: vec![],
|
||||
read_partition_uuids: HashMap::new(),
|
||||
wrote_partition_uuids: HashMap::new(),
|
||||
derivative_want_ids: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -210,7 +217,10 @@ impl Into<CreateWantResponse> for Option<WantDetail> {
|
|||
|
||||
impl Into<GetWantResponse> for Option<WantDetail> {
|
||||
fn into(self) -> GetWantResponse {
|
||||
GetWantResponse { data: self }
|
||||
GetWantResponse {
|
||||
data: self,
|
||||
index: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -433,7 +433,7 @@ async fn list_wants_json(
|
|||
.into_response();
|
||||
}
|
||||
};
|
||||
let response = build_state.list_wants(¶ms);
|
||||
let response = build_state.list_wants_with_index(¶ms);
|
||||
|
||||
(StatusCode::OK, Json(response)).into_response()
|
||||
}
|
||||
|
|
@ -456,10 +456,8 @@ async fn get_want_json(
|
|||
.into_response();
|
||||
}
|
||||
};
|
||||
let response = build_state.get_want(&want_id);
|
||||
|
||||
match response {
|
||||
Some(want) => (StatusCode::OK, Json(GetWantResponse { data: Some(want) })).into_response(),
|
||||
match build_state.get_want_with_index(&want_id) {
|
||||
Some(response) => (StatusCode::OK, Json(response)).into_response(),
|
||||
None => {
|
||||
tracing::debug!("Want not found: {}", want_id);
|
||||
(
|
||||
|
|
@ -608,7 +606,7 @@ async fn list_partitions_json(
|
|||
.into_response();
|
||||
}
|
||||
};
|
||||
let response = build_state.list_partitions(¶ms);
|
||||
let response = build_state.list_partitions_with_index(¶ms);
|
||||
|
||||
(StatusCode::OK, Json(response)).into_response()
|
||||
}
|
||||
|
|
@ -631,7 +629,7 @@ async fn list_job_runs_json(
|
|||
.into_response();
|
||||
}
|
||||
};
|
||||
let response = build_state.list_job_runs(¶ms);
|
||||
let response = build_state.list_job_runs_with_index(¶ms);
|
||||
|
||||
(StatusCode::OK, Json(response)).into_response()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -256,6 +256,7 @@ impl SubProcessCompleted {
|
|||
pub fn to_event(&self, job_run_id: &Uuid) -> Event {
|
||||
Event::JobRunSuccessV1(JobRunSuccessEventV1 {
|
||||
job_run_id: job_run_id.to_string(),
|
||||
read_deps: self.read_deps.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -394,6 +395,7 @@ impl ToEvent for SubProcessCompleted {
|
|||
fn to_event(&self, job_run_id: &Uuid) -> Event {
|
||||
Event::JobRunSuccessV1(JobRunSuccessEventV1 {
|
||||
job_run_id: job_run_id.to_string(),
|
||||
read_deps: self.read_deps.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,9 +1,11 @@
|
|||
use crate::partition_state::{BuildingPartitionRef, FailedPartitionRef, LivePartitionRef};
|
||||
use crate::util::current_timestamp;
|
||||
use crate::util::{HasRelatedIds, RelatedIds, current_timestamp};
|
||||
use crate::{
|
||||
EventSource, JobRunDetail, JobRunStatusCode, MissingDeps, PartitionRef, ReadDeps,
|
||||
WantAttributedPartitions,
|
||||
};
|
||||
use std::collections::BTreeMap;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// State: Job has been queued but not yet started
|
||||
#[derive(Debug, Clone)]
|
||||
|
|
@ -22,6 +24,12 @@ pub struct RunningState {
|
|||
#[derive(Debug, Clone)]
|
||||
pub struct SucceededState {
|
||||
pub completed_at: u64,
|
||||
/// The read dependencies reported by the job, preserving impacted→read relationships
|
||||
pub read_deps: Vec<ReadDeps>,
|
||||
/// Resolved UUIDs for partitions that were read (ref → UUID at read time)
|
||||
pub read_partition_uuids: BTreeMap<String, String>,
|
||||
/// Resolved UUIDs for partitions that were written (ref → UUID)
|
||||
pub wrote_partition_uuids: BTreeMap<String, String>,
|
||||
}
|
||||
|
||||
/// State: Job failed during execution
|
||||
|
|
@ -113,11 +121,20 @@ impl JobRunWithState<RunningState> {
|
|||
}
|
||||
|
||||
/// Transition from Running to Succeeded
|
||||
pub fn succeed(self, timestamp: u64) -> JobRunWithState<SucceededState> {
|
||||
pub fn succeed(
|
||||
self,
|
||||
timestamp: u64,
|
||||
read_deps: Vec<ReadDeps>,
|
||||
read_partition_uuids: BTreeMap<String, String>,
|
||||
wrote_partition_uuids: BTreeMap<String, String>,
|
||||
) -> JobRunWithState<SucceededState> {
|
||||
JobRunWithState {
|
||||
info: self.info,
|
||||
state: SucceededState {
|
||||
completed_at: timestamp,
|
||||
read_deps,
|
||||
read_partition_uuids,
|
||||
wrote_partition_uuids,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
@ -231,6 +248,21 @@ impl JobRunWithState<SucceededState> {
|
|||
.map(|p| LivePartitionRef(p.clone()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Get the read dependencies reported by the job
|
||||
pub fn get_read_deps(&self) -> &[ReadDeps] {
|
||||
&self.state.read_deps
|
||||
}
|
||||
|
||||
/// Get the resolved UUIDs for partitions that were read
|
||||
pub fn get_read_partition_uuids(&self) -> &BTreeMap<String, String> {
|
||||
&self.state.read_partition_uuids
|
||||
}
|
||||
|
||||
/// Get the resolved UUIDs for partitions that were written
|
||||
pub fn get_wrote_partition_uuids(&self) -> &BTreeMap<String, String> {
|
||||
&self.state.wrote_partition_uuids
|
||||
}
|
||||
}
|
||||
|
||||
impl JobRunWithState<FailedState> {
|
||||
|
|
@ -290,10 +322,128 @@ impl JobRunWithState<CanceledState> {
|
|||
}
|
||||
}
|
||||
|
||||
// ==================== HasRelatedIds trait implementation ====================
|
||||
|
||||
impl HasRelatedIds for JobRun {
|
||||
/// Get the IDs of all entities this job run references.
|
||||
/// Note: derivative_want_ids come from BuildState, not from JobRun itself.
|
||||
fn related_ids(&self) -> RelatedIds {
|
||||
// Partition refs from building_partitions (all states have this)
|
||||
let partition_refs: Vec<String> = match self {
|
||||
JobRun::Queued(jr) => jr
|
||||
.info
|
||||
.building_partitions
|
||||
.iter()
|
||||
.map(|p| p.r#ref.clone())
|
||||
.collect(),
|
||||
JobRun::Running(jr) => jr
|
||||
.info
|
||||
.building_partitions
|
||||
.iter()
|
||||
.map(|p| p.r#ref.clone())
|
||||
.collect(),
|
||||
JobRun::Succeeded(jr) => jr
|
||||
.info
|
||||
.building_partitions
|
||||
.iter()
|
||||
.map(|p| p.r#ref.clone())
|
||||
.collect(),
|
||||
JobRun::Failed(jr) => jr
|
||||
.info
|
||||
.building_partitions
|
||||
.iter()
|
||||
.map(|p| p.r#ref.clone())
|
||||
.collect(),
|
||||
JobRun::DepMiss(jr) => jr
|
||||
.info
|
||||
.building_partitions
|
||||
.iter()
|
||||
.map(|p| p.r#ref.clone())
|
||||
.collect(),
|
||||
JobRun::Canceled(jr) => jr
|
||||
.info
|
||||
.building_partitions
|
||||
.iter()
|
||||
.map(|p| p.r#ref.clone())
|
||||
.collect(),
|
||||
};
|
||||
|
||||
// Partition UUIDs from read/write lineage (only Succeeded state has these)
|
||||
let partition_uuids: Vec<Uuid> = match self {
|
||||
JobRun::Succeeded(jr) => {
|
||||
let mut uuids = Vec::new();
|
||||
for uuid_str in jr.state.read_partition_uuids.values() {
|
||||
if let Ok(uuid) = Uuid::parse_str(uuid_str) {
|
||||
uuids.push(uuid);
|
||||
}
|
||||
}
|
||||
for uuid_str in jr.state.wrote_partition_uuids.values() {
|
||||
if let Ok(uuid) = Uuid::parse_str(uuid_str) {
|
||||
if !uuids.contains(&uuid) {
|
||||
uuids.push(uuid);
|
||||
}
|
||||
}
|
||||
}
|
||||
uuids
|
||||
}
|
||||
_ => vec![],
|
||||
};
|
||||
|
||||
// Want IDs from servicing_wants (all states have this)
|
||||
let want_ids: Vec<String> = match self {
|
||||
JobRun::Queued(jr) => jr
|
||||
.info
|
||||
.servicing_wants
|
||||
.iter()
|
||||
.map(|w| w.want_id.clone())
|
||||
.collect(),
|
||||
JobRun::Running(jr) => jr
|
||||
.info
|
||||
.servicing_wants
|
||||
.iter()
|
||||
.map(|w| w.want_id.clone())
|
||||
.collect(),
|
||||
JobRun::Succeeded(jr) => jr
|
||||
.info
|
||||
.servicing_wants
|
||||
.iter()
|
||||
.map(|w| w.want_id.clone())
|
||||
.collect(),
|
||||
JobRun::Failed(jr) => jr
|
||||
.info
|
||||
.servicing_wants
|
||||
.iter()
|
||||
.map(|w| w.want_id.clone())
|
||||
.collect(),
|
||||
JobRun::DepMiss(jr) => jr
|
||||
.info
|
||||
.servicing_wants
|
||||
.iter()
|
||||
.map(|w| w.want_id.clone())
|
||||
.collect(),
|
||||
JobRun::Canceled(jr) => jr
|
||||
.info
|
||||
.servicing_wants
|
||||
.iter()
|
||||
.map(|w| w.want_id.clone())
|
||||
.collect(),
|
||||
};
|
||||
|
||||
RelatedIds {
|
||||
partition_refs,
|
||||
partition_uuids,
|
||||
job_run_ids: vec![],
|
||||
want_ids,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== Conversion to JobRunDetail for API ====================
|
||||
|
||||
impl JobRun {
|
||||
pub fn to_detail(&self) -> JobRunDetail {
|
||||
use std::collections::HashMap;
|
||||
|
||||
match self {
|
||||
JobRun::Queued(queued) => JobRunDetail {
|
||||
id: queued.info.id.clone(),
|
||||
|
|
@ -301,6 +451,10 @@ impl JobRun {
|
|||
last_heartbeat_at: None,
|
||||
building_partitions: queued.info.building_partitions.clone(),
|
||||
servicing_wants: queued.info.servicing_wants.clone(),
|
||||
read_deps: vec![],
|
||||
read_partition_uuids: HashMap::new(),
|
||||
wrote_partition_uuids: HashMap::new(),
|
||||
derivative_want_ids: vec![],
|
||||
},
|
||||
JobRun::Running(running) => JobRunDetail {
|
||||
id: running.info.id.clone(),
|
||||
|
|
@ -308,6 +462,10 @@ impl JobRun {
|
|||
last_heartbeat_at: Some(running.state.last_heartbeat_at),
|
||||
building_partitions: running.info.building_partitions.clone(),
|
||||
servicing_wants: running.info.servicing_wants.clone(),
|
||||
read_deps: vec![],
|
||||
read_partition_uuids: HashMap::new(),
|
||||
wrote_partition_uuids: HashMap::new(),
|
||||
derivative_want_ids: vec![],
|
||||
},
|
||||
JobRun::Succeeded(succeeded) => JobRunDetail {
|
||||
id: succeeded.info.id.clone(),
|
||||
|
|
@ -315,6 +473,20 @@ impl JobRun {
|
|||
last_heartbeat_at: None,
|
||||
building_partitions: succeeded.info.building_partitions.clone(),
|
||||
servicing_wants: succeeded.info.servicing_wants.clone(),
|
||||
read_deps: succeeded.state.read_deps.clone(),
|
||||
read_partition_uuids: succeeded
|
||||
.state
|
||||
.read_partition_uuids
|
||||
.clone()
|
||||
.into_iter()
|
||||
.collect(),
|
||||
wrote_partition_uuids: succeeded
|
||||
.state
|
||||
.wrote_partition_uuids
|
||||
.clone()
|
||||
.into_iter()
|
||||
.collect(),
|
||||
derivative_want_ids: vec![],
|
||||
},
|
||||
JobRun::Failed(failed) => JobRunDetail {
|
||||
id: failed.info.id.clone(),
|
||||
|
|
@ -322,6 +494,10 @@ impl JobRun {
|
|||
last_heartbeat_at: None,
|
||||
building_partitions: failed.info.building_partitions.clone(),
|
||||
servicing_wants: failed.info.servicing_wants.clone(),
|
||||
read_deps: vec![],
|
||||
read_partition_uuids: HashMap::new(),
|
||||
wrote_partition_uuids: HashMap::new(),
|
||||
derivative_want_ids: vec![],
|
||||
},
|
||||
JobRun::DepMiss(dep_miss) => JobRunDetail {
|
||||
id: dep_miss.info.id.clone(),
|
||||
|
|
@ -329,6 +505,12 @@ impl JobRun {
|
|||
last_heartbeat_at: None,
|
||||
building_partitions: dep_miss.info.building_partitions.clone(),
|
||||
servicing_wants: dep_miss.info.servicing_wants.clone(),
|
||||
read_deps: dep_miss.state.read_deps.clone(),
|
||||
read_partition_uuids: HashMap::new(),
|
||||
wrote_partition_uuids: HashMap::new(),
|
||||
// Note: derivative_want_ids would need to be populated from BuildState
|
||||
// since the job doesn't track which wants it spawned (BEL does)
|
||||
derivative_want_ids: vec![],
|
||||
},
|
||||
JobRun::Canceled(canceled) => JobRunDetail {
|
||||
id: canceled.info.id.clone(),
|
||||
|
|
@ -336,6 +518,10 @@ impl JobRun {
|
|||
last_heartbeat_at: None,
|
||||
building_partitions: canceled.info.building_partitions.clone(),
|
||||
servicing_wants: canceled.info.servicing_wants.clone(),
|
||||
read_deps: vec![],
|
||||
read_partition_uuids: HashMap::new(),
|
||||
wrote_partition_uuids: HashMap::new(),
|
||||
derivative_want_ids: vec![],
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1073,6 +1073,8 @@ echo 'Beta succeeded'
|
|||
comment: None,
|
||||
status: None,
|
||||
last_updated_timestamp: 0,
|
||||
job_run_ids: vec![],
|
||||
derivative_want_ids: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
use crate::util::{HasRelatedIds, RelatedIds};
|
||||
use crate::{PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sha2::{Digest, Sha256};
|
||||
|
|
@ -58,6 +59,8 @@ pub struct UpstreamFailedState {
|
|||
pub struct TaintedState {
|
||||
pub tainted_at: u64,
|
||||
pub taint_ids: Vec<String>,
|
||||
/// Job run that originally built this partition (before it was tainted)
|
||||
pub built_by: String,
|
||||
}
|
||||
|
||||
/// Generic partition struct parameterized by state.
|
||||
|
|
@ -250,6 +253,7 @@ impl PartitionWithState<LiveState> {
|
|||
state: TaintedState {
|
||||
tainted_at: timestamp,
|
||||
taint_ids: vec![taint_id],
|
||||
built_by: self.state.built_by,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
@ -337,9 +341,57 @@ impl Partition {
|
|||
pub fn is_tainted(&self) -> bool {
|
||||
matches!(self, Partition::Tainted(_))
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== HasRelatedIds trait implementation ====================
|
||||
|
||||
impl HasRelatedIds for Partition {
|
||||
/// Get the IDs of all entities this partition references.
|
||||
/// Note: downstream_partition_uuids and want_ids come from BuildState indexes,
|
||||
/// not from Partition itself.
|
||||
fn related_ids(&self) -> RelatedIds {
|
||||
// Job run ID from the builder (for states that track it)
|
||||
let job_run_ids: Vec<String> = match self {
|
||||
Partition::Building(p) => vec![p.state.job_run_id.clone()],
|
||||
Partition::UpstreamBuilding(p) => vec![p.state.job_run_id.clone()],
|
||||
Partition::UpForRetry(p) => vec![p.state.original_job_run_id.clone()],
|
||||
Partition::Live(p) => vec![p.state.built_by.clone()],
|
||||
Partition::Failed(p) => vec![p.state.failed_by.clone()],
|
||||
Partition::UpstreamFailed(_) => vec![],
|
||||
Partition::Tainted(p) => vec![p.state.built_by.clone()],
|
||||
};
|
||||
|
||||
// Partition refs from missing deps (for UpstreamBuilding state)
|
||||
let partition_refs: Vec<String> = match self {
|
||||
Partition::UpstreamBuilding(p) => p
|
||||
.state
|
||||
.missing_deps
|
||||
.iter()
|
||||
.map(|d| d.r#ref.clone())
|
||||
.collect(),
|
||||
Partition::UpstreamFailed(p) => p
|
||||
.state
|
||||
.failed_upstream_refs
|
||||
.iter()
|
||||
.map(|d| d.r#ref.clone())
|
||||
.collect(),
|
||||
_ => vec![],
|
||||
};
|
||||
|
||||
RelatedIds {
|
||||
partition_refs,
|
||||
partition_uuids: vec![],
|
||||
job_run_ids,
|
||||
want_ids: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Partition {
|
||||
/// Convert to PartitionDetail for API responses and queries.
|
||||
/// Note: want_ids is now empty - this will be populated by BuildState from the inverted index.
|
||||
/// Note: want_ids and downstream_partition_uuids are empty here and will be
|
||||
/// populated by BuildState from its inverted indexes.
|
||||
/// Upstream lineage is resolved via built_by_job_run_id → job run's read_deps.
|
||||
pub fn to_detail(&self) -> PartitionDetail {
|
||||
match self {
|
||||
Partition::Building(p) => PartitionDetail {
|
||||
|
|
@ -353,6 +405,8 @@ impl Partition {
|
|||
taint_ids: vec![],
|
||||
last_updated_timestamp: None,
|
||||
uuid: p.uuid.to_string(),
|
||||
built_by_job_run_id: None,
|
||||
downstream_partition_uuids: vec![], // Populated by BuildState
|
||||
},
|
||||
Partition::UpstreamBuilding(p) => PartitionDetail {
|
||||
r#ref: Some(p.partition_ref.clone()),
|
||||
|
|
@ -365,6 +419,8 @@ impl Partition {
|
|||
taint_ids: vec![],
|
||||
last_updated_timestamp: None,
|
||||
uuid: p.uuid.to_string(),
|
||||
built_by_job_run_id: None,
|
||||
downstream_partition_uuids: vec![], // Populated by BuildState
|
||||
},
|
||||
Partition::UpForRetry(p) => PartitionDetail {
|
||||
r#ref: Some(p.partition_ref.clone()),
|
||||
|
|
@ -377,6 +433,8 @@ impl Partition {
|
|||
taint_ids: vec![],
|
||||
last_updated_timestamp: None,
|
||||
uuid: p.uuid.to_string(),
|
||||
built_by_job_run_id: None,
|
||||
downstream_partition_uuids: vec![], // Populated by BuildState
|
||||
},
|
||||
Partition::Live(p) => PartitionDetail {
|
||||
r#ref: Some(p.partition_ref.clone()),
|
||||
|
|
@ -389,6 +447,8 @@ impl Partition {
|
|||
taint_ids: vec![],
|
||||
last_updated_timestamp: Some(p.state.built_at),
|
||||
uuid: p.uuid.to_string(),
|
||||
built_by_job_run_id: Some(p.state.built_by.clone()),
|
||||
downstream_partition_uuids: vec![], // Populated by BuildState
|
||||
},
|
||||
Partition::Failed(p) => PartitionDetail {
|
||||
r#ref: Some(p.partition_ref.clone()),
|
||||
|
|
@ -401,6 +461,8 @@ impl Partition {
|
|||
taint_ids: vec![],
|
||||
last_updated_timestamp: Some(p.state.failed_at),
|
||||
uuid: p.uuid.to_string(),
|
||||
built_by_job_run_id: None,
|
||||
downstream_partition_uuids: vec![], // Populated by BuildState
|
||||
},
|
||||
Partition::UpstreamFailed(p) => PartitionDetail {
|
||||
r#ref: Some(p.partition_ref.clone()),
|
||||
|
|
@ -413,6 +475,8 @@ impl Partition {
|
|||
taint_ids: vec![],
|
||||
last_updated_timestamp: Some(p.state.failed_at),
|
||||
uuid: p.uuid.to_string(),
|
||||
built_by_job_run_id: None,
|
||||
downstream_partition_uuids: vec![], // Populated by BuildState
|
||||
},
|
||||
Partition::Tainted(p) => PartitionDetail {
|
||||
r#ref: Some(p.partition_ref.clone()),
|
||||
|
|
@ -421,10 +485,12 @@ impl Partition {
|
|||
name: "PartitionTainted".to_string(),
|
||||
}),
|
||||
want_ids: vec![], // Populated by BuildState
|
||||
job_run_ids: vec![],
|
||||
job_run_ids: vec![p.state.built_by.clone()],
|
||||
taint_ids: p.state.taint_ids.clone(),
|
||||
last_updated_timestamp: Some(p.state.tainted_at),
|
||||
uuid: p.uuid.to_string(),
|
||||
built_by_job_run_id: Some(p.state.built_by.clone()),
|
||||
downstream_partition_uuids: vec![], // Populated by BuildState
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,57 @@
|
|||
use std::backtrace::Backtrace;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use uuid::Uuid;
|
||||
|
||||
// ============================================================================
|
||||
// Related IDs - for building RelatedEntities index in API responses
|
||||
// ============================================================================
|
||||
|
||||
/// IDs of related entities that an object references.
|
||||
/// Used by the query layer to build the RelatedEntities index for API responses.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct RelatedIds {
|
||||
/// Partition refs (e.g., from Want.partitions, JobRun.building_partitions)
|
||||
pub partition_refs: Vec<String>,
|
||||
/// Partition UUIDs (e.g., from read/write lineage, consumer index)
|
||||
pub partition_uuids: Vec<Uuid>,
|
||||
/// Job run IDs (e.g., from built_by, consumer jobs)
|
||||
pub job_run_ids: Vec<String>,
|
||||
/// Want IDs (e.g., derivative wants, upstream wants, servicing wants)
|
||||
pub want_ids: Vec<String>,
|
||||
}
|
||||
|
||||
impl RelatedIds {
|
||||
/// Merge another RelatedIds into this one, deduplicating
|
||||
pub fn merge(&mut self, other: RelatedIds) {
|
||||
for r in other.partition_refs {
|
||||
if !self.partition_refs.contains(&r) {
|
||||
self.partition_refs.push(r);
|
||||
}
|
||||
}
|
||||
for u in other.partition_uuids {
|
||||
if !self.partition_uuids.contains(&u) {
|
||||
self.partition_uuids.push(u);
|
||||
}
|
||||
}
|
||||
for j in other.job_run_ids {
|
||||
if !self.job_run_ids.contains(&j) {
|
||||
self.job_run_ids.push(j);
|
||||
}
|
||||
}
|
||||
for w in other.want_ids {
|
||||
if !self.want_ids.contains(&w) {
|
||||
self.want_ids.push(w);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Trait for entities that can report their related entity IDs.
|
||||
/// Used by the query layer to build RelatedEntities indexes for API responses.
|
||||
/// Implementing types: Want, JobRun, Partition
|
||||
pub trait HasRelatedIds {
|
||||
fn related_ids(&self) -> RelatedIds;
|
||||
}
|
||||
|
||||
pub fn current_timestamp() -> u64 {
|
||||
let now = SystemTime::now();
|
||||
|
|
@ -89,3 +141,173 @@ impl std::fmt::Display for DatabuildError {
|
|||
write!(f, "{}", self.msg)
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Test Scenarios - reusable BEL event sequences for testing
|
||||
// ============================================================================
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod test_scenarios {
|
||||
use crate::data_build_event::Event;
|
||||
use crate::{
|
||||
JobRunBufferEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1,
|
||||
JobRunSuccessEventV1, MissingDeps, PartitionRef, ReadDeps, WantAttributedPartitions,
|
||||
WantCreateEventV1,
|
||||
};
|
||||
|
||||
/// IDs used in the multihop scenario for easy reference in tests
|
||||
pub struct MultihopIds {
|
||||
pub beta_want_id: String,
|
||||
pub alpha_want_id: String,
|
||||
pub beta_job_1_id: String,
|
||||
pub beta_job_2_id: String,
|
||||
pub alpha_job_id: String,
|
||||
}
|
||||
|
||||
impl Default for MultihopIds {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
beta_want_id: "beta-want".to_string(),
|
||||
alpha_want_id: "alpha-want".to_string(),
|
||||
beta_job_1_id: "beta-job-1".to_string(),
|
||||
beta_job_2_id: "beta-job-2".to_string(),
|
||||
alpha_job_id: "alpha-job".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a multihop dependency scenario:
|
||||
/// 1. Want for data/beta is created
|
||||
/// 2. Job beta-job-1 starts, discovers missing dep on data/alpha
|
||||
/// 3. Derivative want for data/alpha is created
|
||||
/// 4. Job alpha-job builds data/alpha successfully
|
||||
/// 5. Job beta-job-2 retries and succeeds
|
||||
///
|
||||
/// This exercises: want creation, job buffering, dep-miss, derivative wants,
|
||||
/// job success with read_deps, and retry logic.
|
||||
pub fn multihop_scenario() -> (Vec<Event>, MultihopIds) {
|
||||
let ids = MultihopIds::default();
|
||||
let mut events = vec![];
|
||||
|
||||
// 1. Create want for data/beta
|
||||
events.push(Event::WantCreateV1(WantCreateEventV1 {
|
||||
want_id: ids.beta_want_id.clone(),
|
||||
partitions: vec![PartitionRef {
|
||||
r#ref: "data/beta".to_string(),
|
||||
}],
|
||||
..Default::default()
|
||||
}));
|
||||
|
||||
// 2. Queue beta job (first attempt)
|
||||
events.push(Event::JobRunBufferV1(JobRunBufferEventV1 {
|
||||
job_run_id: ids.beta_job_1_id.clone(),
|
||||
job_label: "//job_beta".to_string(),
|
||||
want_attributed_partitions: vec![WantAttributedPartitions {
|
||||
want_id: ids.beta_want_id.clone(),
|
||||
partitions: vec![PartitionRef {
|
||||
r#ref: "data/beta".to_string(),
|
||||
}],
|
||||
}],
|
||||
building_partitions: vec![PartitionRef {
|
||||
r#ref: "data/beta".to_string(),
|
||||
}],
|
||||
..Default::default()
|
||||
}));
|
||||
|
||||
// 3. Beta job starts running
|
||||
events.push(Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 {
|
||||
job_run_id: ids.beta_job_1_id.clone(),
|
||||
..Default::default()
|
||||
}));
|
||||
|
||||
// 4. Beta job reports missing dependency on data/alpha
|
||||
events.push(Event::JobRunMissingDepsV1(JobRunMissingDepsEventV1 {
|
||||
job_run_id: ids.beta_job_1_id.clone(),
|
||||
missing_deps: vec![MissingDeps {
|
||||
impacted: vec![PartitionRef {
|
||||
r#ref: "data/beta".to_string(),
|
||||
}],
|
||||
missing: vec![PartitionRef {
|
||||
r#ref: "data/alpha".to_string(),
|
||||
}],
|
||||
}],
|
||||
..Default::default()
|
||||
}));
|
||||
|
||||
// 5. Create derivative want for data/alpha
|
||||
events.push(Event::WantCreateV1(WantCreateEventV1 {
|
||||
want_id: ids.alpha_want_id.clone(),
|
||||
partitions: vec![PartitionRef {
|
||||
r#ref: "data/alpha".to_string(),
|
||||
}],
|
||||
..Default::default()
|
||||
}));
|
||||
|
||||
// 6. Queue alpha job
|
||||
events.push(Event::JobRunBufferV1(JobRunBufferEventV1 {
|
||||
job_run_id: ids.alpha_job_id.clone(),
|
||||
job_label: "//job_alpha".to_string(),
|
||||
want_attributed_partitions: vec![WantAttributedPartitions {
|
||||
want_id: ids.alpha_want_id.clone(),
|
||||
partitions: vec![PartitionRef {
|
||||
r#ref: "data/alpha".to_string(),
|
||||
}],
|
||||
}],
|
||||
building_partitions: vec![PartitionRef {
|
||||
r#ref: "data/alpha".to_string(),
|
||||
}],
|
||||
..Default::default()
|
||||
}));
|
||||
|
||||
// 7. Alpha job starts running
|
||||
events.push(Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 {
|
||||
job_run_id: ids.alpha_job_id.clone(),
|
||||
..Default::default()
|
||||
}));
|
||||
|
||||
// 8. Alpha job succeeds (no read deps for leaf node)
|
||||
events.push(Event::JobRunSuccessV1(JobRunSuccessEventV1 {
|
||||
job_run_id: ids.alpha_job_id.clone(),
|
||||
read_deps: vec![],
|
||||
..Default::default()
|
||||
}));
|
||||
|
||||
// 9. Queue beta job again (second attempt - retry)
|
||||
events.push(Event::JobRunBufferV1(JobRunBufferEventV1 {
|
||||
job_run_id: ids.beta_job_2_id.clone(),
|
||||
job_label: "//job_beta".to_string(),
|
||||
want_attributed_partitions: vec![WantAttributedPartitions {
|
||||
want_id: ids.beta_want_id.clone(),
|
||||
partitions: vec![PartitionRef {
|
||||
r#ref: "data/beta".to_string(),
|
||||
}],
|
||||
}],
|
||||
building_partitions: vec![PartitionRef {
|
||||
r#ref: "data/beta".to_string(),
|
||||
}],
|
||||
..Default::default()
|
||||
}));
|
||||
|
||||
// 10. Beta job 2 starts running
|
||||
events.push(Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 {
|
||||
job_run_id: ids.beta_job_2_id.clone(),
|
||||
..Default::default()
|
||||
}));
|
||||
|
||||
// 11. Beta job 2 succeeds with read_deps showing it read data/alpha
|
||||
events.push(Event::JobRunSuccessV1(JobRunSuccessEventV1 {
|
||||
job_run_id: ids.beta_job_2_id.clone(),
|
||||
read_deps: vec![ReadDeps {
|
||||
impacted: vec![PartitionRef {
|
||||
r#ref: "data/beta".to_string(),
|
||||
}],
|
||||
read: vec![PartitionRef {
|
||||
r#ref: "data/alpha".to_string(),
|
||||
}],
|
||||
}],
|
||||
..Default::default()
|
||||
}));
|
||||
|
||||
(events, ids)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
use crate::partition_state::FailedPartitionRef;
|
||||
use crate::util::current_timestamp;
|
||||
use crate::util::{HasRelatedIds, RelatedIds, current_timestamp};
|
||||
use crate::{EventSource, PartitionRef, WantCreateEventV1, WantDetail, WantStatusCode};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
|
@ -464,6 +464,35 @@ impl WantWithState<UpstreamBuildingState> {
|
|||
}
|
||||
}
|
||||
|
||||
// ==================== HasRelatedIds trait implementation ====================
|
||||
|
||||
impl HasRelatedIds for Want {
|
||||
/// Get the IDs of all entities this want references.
|
||||
/// Note: job_run_ids come from inverted indexes in BuildState, not from Want itself.
|
||||
fn related_ids(&self) -> RelatedIds {
|
||||
let partition_refs = self
|
||||
.want()
|
||||
.partitions
|
||||
.iter()
|
||||
.map(|p| p.r#ref.clone())
|
||||
.collect();
|
||||
|
||||
// Collect want IDs from state-specific relationships
|
||||
let want_ids = match self {
|
||||
Want::UpstreamBuilding(w) => w.state.upstream_want_ids.clone(),
|
||||
Want::UpstreamFailed(w) => w.state.failed_wants.clone(),
|
||||
_ => vec![],
|
||||
};
|
||||
|
||||
RelatedIds {
|
||||
partition_refs,
|
||||
partition_uuids: vec![],
|
||||
job_run_ids: vec![],
|
||||
want_ids,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper methods on the Want enum
|
||||
impl Want {
|
||||
/// Create a new want in the Idle state
|
||||
|
|
@ -522,7 +551,9 @@ impl Want {
|
|||
}
|
||||
}
|
||||
|
||||
/// Convert to WantDetail for API responses and queries
|
||||
/// Convert to WantDetail for API responses and queries.
|
||||
/// Note: job_run_ids and derivative_want_ids are empty here and will be
|
||||
/// populated by BuildState from its inverted indexes.
|
||||
pub fn to_detail(&self) -> WantDetail {
|
||||
WantDetail {
|
||||
want_id: self.want().want_id.clone(),
|
||||
|
|
@ -544,6 +575,8 @@ impl Want {
|
|||
Want::UpstreamFailed(_) => Some(WantStatusCode::WantUpstreamFailed.into()),
|
||||
Want::Canceled(_) => Some(WantStatusCode::WantCanceled.into()),
|
||||
},
|
||||
job_run_ids: vec![], // Populated by BuildState
|
||||
derivative_want_ids: vec![], // Populated by BuildState
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@
|
|||
use askama::Template;
|
||||
|
||||
use crate::{
|
||||
JobRunDetail, JobRunStatus, PartitionDetail, PartitionRef, PartitionStatus,
|
||||
JobRunDetail, JobRunStatus, PartitionDetail, PartitionRef, PartitionStatus, ReadDeps,
|
||||
WantAttributedPartitions, WantDetail, WantStatus,
|
||||
};
|
||||
|
||||
|
|
@ -84,6 +84,9 @@ pub struct WantDetailView {
|
|||
pub comment_display: String,
|
||||
pub status: Option<WantStatusView>,
|
||||
pub last_updated_timestamp: u64,
|
||||
// Lineage fields
|
||||
pub job_run_ids: Vec<String>,
|
||||
pub derivative_want_ids: Vec<String>,
|
||||
}
|
||||
|
||||
impl From<&WantDetail> for WantDetailView {
|
||||
|
|
@ -99,6 +102,8 @@ impl From<&WantDetail> for WantDetailView {
|
|||
comment_display: w.comment.as_deref().unwrap_or("-").to_string(),
|
||||
status: w.status.as_ref().map(WantStatusView::from),
|
||||
last_updated_timestamp: w.last_updated_timestamp,
|
||||
job_run_ids: w.job_run_ids.clone(),
|
||||
derivative_want_ids: w.derivative_want_ids.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -119,6 +124,9 @@ pub struct PartitionDetailView {
|
|||
pub want_ids: Vec<String>,
|
||||
pub taint_ids: Vec<String>,
|
||||
pub uuid: String,
|
||||
// Lineage fields
|
||||
pub built_by_job_run_id: Option<String>,
|
||||
pub downstream_partition_uuids: Vec<String>,
|
||||
}
|
||||
|
||||
impl From<&PartitionDetail> for PartitionDetailView {
|
||||
|
|
@ -141,6 +149,8 @@ impl From<&PartitionDetail> for PartitionDetailView {
|
|||
want_ids: p.want_ids.clone(),
|
||||
taint_ids: p.taint_ids.clone(),
|
||||
uuid: p.uuid.clone(),
|
||||
built_by_job_run_id: p.built_by_job_run_id.clone(),
|
||||
downstream_partition_uuids: p.downstream_partition_uuids.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -165,16 +175,65 @@ impl From<&WantAttributedPartitions> for WantAttributedPartitionsView {
|
|||
}
|
||||
}
|
||||
|
||||
/// View for read dependency entries (impacted → read relationships)
|
||||
pub struct ReadDepsView {
|
||||
pub impacted: Vec<PartitionRefView>,
|
||||
pub read: Vec<PartitionRefView>,
|
||||
}
|
||||
|
||||
impl From<&ReadDeps> for ReadDepsView {
|
||||
fn from(rd: &ReadDeps) -> Self {
|
||||
Self {
|
||||
impacted: rd.impacted.iter().map(PartitionRefView::from).collect(),
|
||||
read: rd.read.iter().map(PartitionRefView::from).collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// View for partition ref with its resolved UUID (for lineage display)
|
||||
pub struct PartitionRefWithUuidView {
|
||||
pub partition_ref: String,
|
||||
pub partition_ref_encoded: String,
|
||||
pub uuid: String,
|
||||
}
|
||||
|
||||
pub struct JobRunDetailView {
|
||||
pub id: String,
|
||||
pub status: Option<JobRunStatusView>,
|
||||
pub last_heartbeat_at: Option<u64>,
|
||||
pub building_partitions: Vec<PartitionRefView>,
|
||||
pub servicing_wants: Vec<WantAttributedPartitionsView>,
|
||||
// Lineage fields (populated for Succeeded/DepMiss states)
|
||||
pub read_deps: Vec<ReadDepsView>,
|
||||
pub read_partitions: Vec<PartitionRefWithUuidView>,
|
||||
pub wrote_partitions: Vec<PartitionRefWithUuidView>,
|
||||
pub derivative_want_ids: Vec<String>,
|
||||
}
|
||||
|
||||
impl From<&JobRunDetail> for JobRunDetailView {
|
||||
fn from(jr: &JobRunDetail) -> Self {
|
||||
// Build read_partitions from read_partition_uuids map
|
||||
let read_partitions: Vec<PartitionRefWithUuidView> = jr
|
||||
.read_partition_uuids
|
||||
.iter()
|
||||
.map(|(partition_ref, uuid)| PartitionRefWithUuidView {
|
||||
partition_ref: partition_ref.clone(),
|
||||
partition_ref_encoded: urlencoding::encode(partition_ref).into_owned(),
|
||||
uuid: uuid.clone(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Build wrote_partitions from wrote_partition_uuids map
|
||||
let wrote_partitions: Vec<PartitionRefWithUuidView> = jr
|
||||
.wrote_partition_uuids
|
||||
.iter()
|
||||
.map(|(partition_ref, uuid)| PartitionRefWithUuidView {
|
||||
partition_ref: partition_ref.clone(),
|
||||
partition_ref_encoded: urlencoding::encode(partition_ref).into_owned(),
|
||||
uuid: uuid.clone(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
Self {
|
||||
id: jr.id.clone(),
|
||||
status: jr.status.as_ref().map(JobRunStatusView::from),
|
||||
|
|
@ -189,6 +248,10 @@ impl From<&JobRunDetail> for JobRunDetailView {
|
|||
.iter()
|
||||
.map(WantAttributedPartitionsView::from)
|
||||
.collect(),
|
||||
read_deps: jr.read_deps.iter().map(ReadDepsView::from).collect(),
|
||||
read_partitions,
|
||||
wrote_partitions,
|
||||
derivative_want_ids: jr.derivative_want_ids.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -361,3 +424,91 @@ pub struct JobRunDetailPage {
|
|||
pub struct WantCreatePage {
|
||||
pub base: BaseContext,
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Tests
|
||||
// =============================================================================
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::build_state::BuildState;
|
||||
use crate::util::test_scenarios::multihop_scenario;
|
||||
use askama::Template;
|
||||
|
||||
/// Helper to replay events into a fresh BuildState
|
||||
fn build_state_from_events(events: &[crate::data_build_event::Event]) -> BuildState {
|
||||
let mut state = BuildState::default();
|
||||
for event in events {
|
||||
state.handle_event(event);
|
||||
}
|
||||
state
|
||||
}
|
||||
|
||||
mod want_detail_page {
|
||||
use super::*;
|
||||
|
||||
/// Tests that the want detail page shows the job runs that serviced this want.
|
||||
/// This is the "Fulfillment - Job Runs" section in the UI.
|
||||
///
|
||||
/// Given: The multihop scenario completes (beta job dep-misses, alpha built, beta retries)
|
||||
/// When: We render the beta want detail page
|
||||
/// Then: It should show both beta job runs (beta-job-1 and beta-job-2)
|
||||
#[test]
|
||||
fn test_shows_servicing_job_runs() {
|
||||
let (events, ids) = multihop_scenario();
|
||||
let state = build_state_from_events(&events);
|
||||
|
||||
// Get beta want and render its detail page
|
||||
let want_detail = state
|
||||
.get_want(&ids.beta_want_id)
|
||||
.expect("beta want should exist");
|
||||
let template = WantDetailPage {
|
||||
base: BaseContext::default(),
|
||||
want: WantDetailView::from(want_detail),
|
||||
};
|
||||
let html = template.render().expect("template should render");
|
||||
|
||||
// Verify the Fulfillment section exists and contains both job run IDs
|
||||
assert!(
|
||||
html.contains(&ids.beta_job_1_id),
|
||||
"Should show beta-job-1 in fulfillment section. HTML:\n{}",
|
||||
html
|
||||
);
|
||||
assert!(
|
||||
html.contains(&ids.beta_job_2_id),
|
||||
"Should show beta-job-2 in fulfillment section. HTML:\n{}",
|
||||
html
|
||||
);
|
||||
}
|
||||
|
||||
/// Tests that the want detail page shows derivative wants spawned by dep-miss.
|
||||
/// This is the "Fulfillment - Derivative Wants" section in the UI.
|
||||
///
|
||||
/// Given: The multihop scenario completes (beta job dep-misses, spawning alpha want)
|
||||
/// When: We render the beta want detail page
|
||||
/// Then: It should show the alpha want as a derivative
|
||||
#[test]
|
||||
fn test_shows_derivative_wants() {
|
||||
let (events, ids) = multihop_scenario();
|
||||
let state = build_state_from_events(&events);
|
||||
|
||||
// Get beta want and render its detail page
|
||||
let want_detail = state
|
||||
.get_want(&ids.beta_want_id)
|
||||
.expect("beta want should exist");
|
||||
let template = WantDetailPage {
|
||||
base: BaseContext::default(),
|
||||
want: WantDetailView::from(want_detail),
|
||||
};
|
||||
let html = template.render().expect("template should render");
|
||||
|
||||
// Verify the Fulfillment section exists and contains the derivative want
|
||||
assert!(
|
||||
html.contains(&ids.alpha_want_id),
|
||||
"Should show alpha-want as derivative want. HTML:\n{}",
|
||||
html
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -54,4 +54,43 @@
|
|||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if !job_run.read_partitions.is_empty() %}
|
||||
<div class="detail-section">
|
||||
<h2>Read Partitions ({{ job_run.read_partitions.len() }})</h2>
|
||||
<ul class="partition-list">
|
||||
{% for p in job_run.read_partitions %}
|
||||
<li>
|
||||
<a href="/partitions/{{ p.partition_ref_encoded }}" class="partition-ref">{{ p.partition_ref }}</a>
|
||||
<span style="color:var(--color-text-muted);font-size:.75rem;font-family:monospace;margin-left:.5rem">uuid: {{ p.uuid }}</span>
|
||||
</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if !job_run.wrote_partitions.is_empty() %}
|
||||
<div class="detail-section">
|
||||
<h2>Wrote Partitions ({{ job_run.wrote_partitions.len() }})</h2>
|
||||
<ul class="partition-list">
|
||||
{% for p in job_run.wrote_partitions %}
|
||||
<li>
|
||||
<a href="/partitions/{{ p.partition_ref_encoded }}" class="partition-ref">{{ p.partition_ref }}</a>
|
||||
<span style="color:var(--color-text-muted);font-size:.75rem;font-family:monospace;margin-left:.5rem">uuid: {{ p.uuid }}</span>
|
||||
</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if !job_run.derivative_want_ids.is_empty() %}
|
||||
<div class="detail-section">
|
||||
<h2>Derivative Wants ({{ job_run.derivative_want_ids.len() }})</h2>
|
||||
<ul class="partition-list">
|
||||
{% for id in job_run.derivative_want_ids %}
|
||||
<li><a href="/wants/{{ id }}">{{ id }}</a></li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% call base::footer() %}
|
||||
|
|
|
|||
|
|
@ -32,9 +32,34 @@
|
|||
</div>
|
||||
</div>
|
||||
|
||||
{% match partition.built_by_job_run_id %}
|
||||
{% when Some with (job_run_id) %}
|
||||
<div class="detail-section">
|
||||
<h2>Lineage - Built By</h2>
|
||||
<p>
|
||||
<a href="/job_runs/{{ job_run_id }}">{{ job_run_id }}</a>
|
||||
<span style="color:var(--color-text-muted);font-size:.75rem;margin-left:.5rem">(view job run for input partitions)</span>
|
||||
</p>
|
||||
</div>
|
||||
{% when None %}
|
||||
{% endmatch %}
|
||||
|
||||
{% if !partition.downstream_partition_uuids.is_empty() %}
|
||||
<div class="detail-section">
|
||||
<h2>Lineage - Downstream Consumers ({{ partition.downstream_partition_uuids.len() }})</h2>
|
||||
<ul class="partition-list">
|
||||
{% for uuid in partition.downstream_partition_uuids %}
|
||||
<li>
|
||||
<span style="font-family:monospace;font-size:.8125rem">{{ uuid }}</span>
|
||||
</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if !partition.job_run_ids.is_empty() %}
|
||||
<div class="detail-section">
|
||||
<h2>Job Runs ({{ partition.job_run_ids.len() }})</h2>
|
||||
<h2>Related Job Runs ({{ partition.job_run_ids.len() }})</h2>
|
||||
<ul class="partition-list">
|
||||
{% for id in partition.job_run_ids %}
|
||||
<li><a href="/job_runs/{{ id }}">{{ id }}</a></li>
|
||||
|
|
|
|||
|
|
@ -65,4 +65,26 @@
|
|||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if !want.job_run_ids.is_empty() %}
|
||||
<div class="detail-section">
|
||||
<h2>Fulfillment - Job Runs ({{ want.job_run_ids.len() }})</h2>
|
||||
<ul class="partition-list">
|
||||
{% for id in want.job_run_ids %}
|
||||
<li><a href="/job_runs/{{ id }}">{{ id }}</a></li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if !want.derivative_want_ids.is_empty() %}
|
||||
<div class="detail-section">
|
||||
<h2>Fulfillment - Derivative Wants ({{ want.derivative_want_ids.len() }})</h2>
|
||||
<ul class="partition-list">
|
||||
{% for id in want.derivative_want_ids %}
|
||||
<li><a href="/wants/{{ id }}">{{ id }}</a></li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% call base::footer() %}
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@
|
|||
"jobs": [
|
||||
{
|
||||
"label": "//examples/multihop:job_alpha",
|
||||
"entrypoint": "./examples/multihop/job_alpha.sh",
|
||||
"entrypoint": "./job_alpha.sh",
|
||||
"environment": {
|
||||
"JOB_NAME": "alpha"
|
||||
},
|
||||
|
|
@ -11,7 +11,7 @@
|
|||
},
|
||||
{
|
||||
"label": "//examples/multihop:job_beta",
|
||||
"entrypoint": "./examples/multihop/job_beta.sh",
|
||||
"entrypoint": "./job_beta.sh",
|
||||
"environment": {
|
||||
"JOB_NAME": "beta"
|
||||
},
|
||||
|
|
|
|||
Loading…
Reference in a new issue