Compare commits

...

4 commits

Author SHA1 Message Date
17d5987517 WIP
Some checks failed
/ setup (push) Has been cancelled
2025-11-28 12:48:54 +08:00
f353660f97 update docs 2025-11-28 12:41:11 +08:00
6cb11af642 update detail lineage plan 2025-11-27 15:59:38 +08:00
368558d9d8 add prd 2025-11-27 15:45:36 +08:00
19 changed files with 1361 additions and 138 deletions

View file

@ -340,18 +340,51 @@ impl BuildState {
}
pub(crate) fn handle_job_run_success(&mut self, event: &JobRunSuccessEventV1) -> Vec<Event> {
use std::collections::BTreeMap;
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
"BUG: Job run {} must exist when success event received",
event.job_run_id
));
// Resolve read partition UUIDs from the read_deps in the event
let mut read_partition_uuids: BTreeMap<String, String> = BTreeMap::new();
for read_dep in &event.read_deps {
for read_ref in &read_dep.read {
if let Some(uuid) = self.get_canonical_partition_uuid(&read_ref.r#ref) {
read_partition_uuids.insert(read_ref.r#ref.clone(), uuid.to_string());
}
}
}
// Get the building partitions from the job run to resolve wrote_partition_uuids
let building_partitions = match &job_run {
JobRun::Running(running) => running.info.building_partitions.clone(),
_ => vec![],
};
// Resolve wrote partition UUIDs - these will be set when we transition partitions to Live
// For now, we compute them based on job_run_id (same logic as transition_partitions_to_live)
let mut wrote_partition_uuids: BTreeMap<String, String> = BTreeMap::new();
for pref in &building_partitions {
let uuid =
crate::partition_state::derive_partition_uuid(&event.job_run_id, &pref.r#ref);
wrote_partition_uuids.insert(pref.r#ref.clone(), uuid.to_string());
}
let succeeded = match job_run {
JobRun::Running(running) => {
tracing::info!(
job_run_id = %event.job_run_id,
read_deps_count = event.read_deps.len(),
"JobRun: Running → Succeeded"
);
running.succeed(current_timestamp())
running.succeed(
current_timestamp(),
event.read_deps.clone(),
read_partition_uuids.clone(),
wrote_partition_uuids.clone(),
)
}
_ => {
panic!(
@ -361,6 +394,34 @@ impl BuildState {
}
};
// Populate the consumer index from read_deps (using UUIDs for historical lineage)
// For each read partition UUID, record that the written partition UUIDs consumed it
for read_dep in &event.read_deps {
for read_ref in &read_dep.read {
// Look up the read partition's UUID
if let Some(read_uuid) = read_partition_uuids.get(&read_ref.r#ref) {
if let Ok(read_uuid) = uuid::Uuid::parse_str(read_uuid) {
let consumers = self
.partition_consumers
.entry(read_uuid)
.or_insert_with(Vec::new);
for impacted_ref in &read_dep.impacted {
// Look up the impacted (output) partition's UUID
if let Some(wrote_uuid) = wrote_partition_uuids.get(&impacted_ref.r#ref)
{
if let Ok(wrote_uuid) = uuid::Uuid::parse_str(wrote_uuid) {
let entry = (wrote_uuid, event.job_run_id.clone());
if !consumers.contains(&entry) {
consumers.push(entry);
}
}
}
}
}
}
}
}
// Job run success is SOURCE of truth that partitions are live
let newly_live_partitions = succeeded.get_completed_partitions();
@ -642,7 +703,6 @@ impl BuildState {
#[cfg(test)]
mod tests {
use super::*;
use crate::{MissingDeps, WantAttributedPartitions};
mod want {
use super::*;
@ -686,133 +746,25 @@ mod tests {
#[test]
fn test_multihop_dependency_replay() {
use crate::{
JobRunBufferEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1,
JobRunSuccessEventV1, MissingDeps, PartitionRef, WantAttributedPartitions,
WantCreateEventV1,
};
use crate::util::test_scenarios::multihop_scenario;
let mut state = BuildState::default();
let mut events = vec![];
// 1. Create want for data/beta
let beta_want_id = "beta-want".to_string();
let mut create_beta = WantCreateEventV1::default();
create_beta.want_id = beta_want_id.clone();
create_beta.partitions = vec![PartitionRef {
r#ref: "data/beta".to_string(),
}];
events.push(Event::WantCreateV1(create_beta));
// 2. Queue beta job (first attempt)
let beta_job_1_id = "beta-job-1".to_string();
let mut buffer_beta_1 = JobRunBufferEventV1::default();
buffer_beta_1.job_run_id = beta_job_1_id.clone();
buffer_beta_1.job_label = "//job_beta".to_string();
buffer_beta_1.want_attributed_partitions = vec![WantAttributedPartitions {
want_id: beta_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
}];
buffer_beta_1.building_partitions = vec![PartitionRef {
r#ref: "data/beta".to_string(),
}];
events.push(Event::JobRunBufferV1(buffer_beta_1));
// 3. Beta job starts running
let mut heartbeat_beta_1 = JobRunHeartbeatEventV1::default();
heartbeat_beta_1.job_run_id = beta_job_1_id.clone();
events.push(Event::JobRunHeartbeatV1(heartbeat_beta_1));
// 4. Beta job reports missing dependency on data/alpha
let mut dep_miss_beta_1 = JobRunMissingDepsEventV1::default();
dep_miss_beta_1.job_run_id = beta_job_1_id.clone();
dep_miss_beta_1.missing_deps = vec![MissingDeps {
impacted: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
missing: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
}];
events.push(Event::JobRunMissingDepsV1(dep_miss_beta_1));
// 5. Create derivative want for data/alpha
let alpha_want_id = "alpha-want".to_string();
let mut create_alpha = WantCreateEventV1::default();
create_alpha.want_id = alpha_want_id.clone();
create_alpha.partitions = vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}];
events.push(Event::WantCreateV1(create_alpha));
// 6. Queue alpha job
let alpha_job_id = "alpha-job".to_string();
let mut buffer_alpha = JobRunBufferEventV1::default();
buffer_alpha.job_run_id = alpha_job_id.clone();
buffer_alpha.job_label = "//job_alpha".to_string();
buffer_alpha.want_attributed_partitions = vec![WantAttributedPartitions {
want_id: alpha_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
}];
buffer_alpha.building_partitions = vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}];
events.push(Event::JobRunBufferV1(buffer_alpha));
// 7. Alpha job starts running
let mut heartbeat_alpha = JobRunHeartbeatEventV1::default();
heartbeat_alpha.job_run_id = alpha_job_id.clone();
events.push(Event::JobRunHeartbeatV1(heartbeat_alpha));
// 8. Alpha job succeeds
let mut success_alpha = JobRunSuccessEventV1::default();
success_alpha.job_run_id = alpha_job_id.clone();
events.push(Event::JobRunSuccessV1(success_alpha));
// 9. Queue beta job again (second attempt) - THIS IS THE CRITICAL MOMENT
let beta_job_2_id = "beta-job-2".to_string();
let mut buffer_beta_2 = JobRunBufferEventV1::default();
buffer_beta_2.job_run_id = beta_job_2_id.clone();
buffer_beta_2.job_label = "//job_beta".to_string();
buffer_beta_2.want_attributed_partitions = vec![WantAttributedPartitions {
want_id: beta_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
}];
buffer_beta_2.building_partitions = vec![PartitionRef {
r#ref: "data/beta".to_string(),
}];
events.push(Event::JobRunBufferV1(buffer_beta_2));
// 10. Beta job starts running
let mut heartbeat_beta_2 = JobRunHeartbeatEventV1::default();
heartbeat_beta_2.job_run_id = beta_job_2_id.clone();
events.push(Event::JobRunHeartbeatV1(heartbeat_beta_2));
// 11. Beta job succeeds
let mut success_beta_2 = JobRunSuccessEventV1::default();
success_beta_2.job_run_id = beta_job_2_id.clone();
events.push(Event::JobRunSuccessV1(success_beta_2));
let (events, ids) = multihop_scenario();
// Process all events - this simulates replay
let mut state = BuildState::default();
for event in &events {
state.handle_event(event);
}
// Verify final state
let beta_want = state.get_want(&beta_want_id).unwrap();
let beta_want = state.get_want(&ids.beta_want_id).unwrap();
assert_eq!(
beta_want.status,
Some(crate::WantStatusCode::WantSuccessful.into()),
"Beta want should be successful after multi-hop dependency resolution"
);
let alpha_want = state.get_want(&alpha_want_id).unwrap();
let alpha_want = state.get_want(&ids.alpha_want_id).unwrap();
assert_eq!(
alpha_want.status,
Some(crate::WantStatusCode::WantSuccessful.into()),

View file

@ -61,6 +61,11 @@ pub struct BuildState {
// Inverted indexes
pub(crate) wants_for_partition: BTreeMap<String, Vec<String>>, // partition ref → want_ids
pub(crate) downstream_waiting: BTreeMap<String, Vec<Uuid>>, // upstream ref → partition UUIDs waiting for it
// Consumer index for lineage queries: input_uuid → list of (output_uuid, job_run_id)
// Uses UUIDs (not refs) to preserve historical lineage across partition rebuilds
// Populated from read_deps on job success
pub(crate) partition_consumers: BTreeMap<Uuid, Vec<(Uuid, String)>>,
}
impl BuildState {
@ -118,6 +123,15 @@ impl BuildState {
.unwrap_or(&[])
}
/// Get consumers for a partition UUID (downstream partitions that read this one)
/// Returns list of (output_uuid, job_run_id) tuples
pub fn get_partition_consumers(&self, uuid: &Uuid) -> &[(Uuid, String)] {
self.partition_consumers
.get(uuid)
.map(|v| v.as_slice())
.unwrap_or(&[])
}
/// Register a want in the wants_for_partition inverted index
pub(crate) fn register_want_for_partitions(
&mut self,

View file

@ -2,10 +2,12 @@
//!
//! Read-only methods for accessing state (get_*, list_*) used by the API layer.
use crate::util::{HasRelatedIds, RelatedIds};
use crate::{
JobRunDetail, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest,
ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest,
ListWantsResponse, PartitionDetail, TaintDetail, WantDetail,
GetJobRunResponse, GetPartitionResponse, GetWantResponse, JobRunDetail, ListJobRunsRequest,
ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest,
ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, RelatedEntities,
TaintDetail, WantDetail,
};
use std::collections::BTreeMap;
@ -49,6 +51,7 @@ impl BuildState {
match_count: self.wants.len() as u64,
page,
page_size,
index: None,
}
}
@ -60,6 +63,7 @@ impl BuildState {
match_count: self.wants.len() as u64,
page,
page_size,
index: None,
}
}
@ -81,6 +85,7 @@ impl BuildState {
match_count: self.canonical_partitions.len() as u64,
page,
page_size,
index: None,
}
}
@ -102,6 +107,7 @@ impl BuildState {
match_count: self.job_runs.len() as u64,
page,
page_size,
index: None,
}
}
}
@ -116,3 +122,214 @@ fn list_state_items<T: Clone>(map: &BTreeMap<String, T>, page: u64, page_size: u
.cloned()
.collect()
}
// ============================================================================
// Response builders with RelatedEntities index
// ============================================================================
impl BuildState {
/// Resolve RelatedIds to a RelatedEntities index by looking up entities in BuildState.
/// This is the central method for building the index from collected IDs.
pub fn resolve_related_ids(&self, ids: &RelatedIds) -> RelatedEntities {
let mut index = RelatedEntities::default();
// Resolve partition refs
for partition_ref in &ids.partition_refs {
if !index.partitions.contains_key(partition_ref) {
if let Some(p) = self.get_canonical_partition(partition_ref) {
index
.partitions
.insert(partition_ref.clone(), p.to_detail());
}
}
}
// Resolve partition UUIDs
for uuid in &ids.partition_uuids {
if let Some(p) = self.partitions_by_uuid.get(uuid) {
let detail = p.to_detail();
if let Some(ref pref) = detail.r#ref {
if !index.partitions.contains_key(&pref.r#ref) {
index.partitions.insert(pref.r#ref.clone(), detail);
}
}
}
}
// Resolve job run IDs
for job_run_id in &ids.job_run_ids {
if !index.job_runs.contains_key(job_run_id) {
if let Some(jr) = self.job_runs.get(job_run_id) {
index.job_runs.insert(job_run_id.clone(), jr.to_detail());
}
}
}
// Resolve want IDs
for want_id in &ids.want_ids {
if !index.wants.contains_key(want_id) {
if let Some(w) = self.wants.get(want_id) {
index.wants.insert(want_id.clone(), w.to_detail());
}
}
}
index
}
/// Get a want with its related entities (job runs, partitions)
pub fn get_want_with_index(&self, want_id: &str) -> Option<GetWantResponse> {
let want = self.wants.get(want_id)?;
let want_detail = want.to_detail();
let ids = want.related_ids();
let index = self.resolve_related_ids(&ids);
Some(GetWantResponse {
data: Some(want_detail),
index: Some(index),
})
}
/// Get a partition with its related entities (builder job run, downstream consumers)
pub fn get_partition_with_index(&self, partition_ref: &str) -> Option<GetPartitionResponse> {
let partition = self.get_canonical_partition(partition_ref)?;
let partition_detail = partition.to_detail();
let mut ids = partition.related_ids();
// Add downstream consumers from the consumer index (not stored on partition)
let uuid = partition.uuid();
for (output_uuid, job_run_id) in self.get_partition_consumers(&uuid) {
if !ids.partition_uuids.contains(output_uuid) {
ids.partition_uuids.push(*output_uuid);
}
if !ids.job_run_ids.contains(job_run_id) {
ids.job_run_ids.push(job_run_id.clone());
}
}
// Add wants that reference this partition (from inverted index)
for want_id in self.get_wants_for_partition(partition_ref) {
if !ids.want_ids.contains(want_id) {
ids.want_ids.push(want_id.clone());
}
}
let index = self.resolve_related_ids(&ids);
Some(GetPartitionResponse {
data: Some(partition_detail),
index: Some(index),
})
}
/// Get a job run with its related entities (read/wrote partitions, derivative wants)
pub fn get_job_run_with_index(&self, job_run_id: &str) -> Option<GetJobRunResponse> {
let job_run = self.job_runs.get(job_run_id)?;
let job_run_detail = job_run.to_detail();
let ids = job_run.related_ids();
let index = self.resolve_related_ids(&ids);
Some(GetJobRunResponse {
data: Some(job_run_detail),
index: Some(index),
})
}
/// List wants with related entities index
pub fn list_wants_with_index(&self, request: &ListWantsRequest) -> ListWantsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
let start = page * page_size;
let wants: Vec<_> = self
.wants
.values()
.skip(start as usize)
.take(page_size as usize)
.collect();
// Collect related IDs from all wants
let mut all_ids = RelatedIds::default();
for want in &wants {
all_ids.merge(want.related_ids());
}
let data: Vec<WantDetail> = wants.iter().map(|w| w.to_detail()).collect();
let index = self.resolve_related_ids(&all_ids);
ListWantsResponse {
data,
match_count: self.wants.len() as u64,
page,
page_size,
index: Some(index),
}
}
/// List partitions with related entities index
pub fn list_partitions_with_index(
&self,
request: &ListPartitionsRequest,
) -> ListPartitionsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
let start = page * page_size;
let partitions: Vec<_> = self
.canonical_partitions
.iter()
.skip(start as usize)
.take(page_size as usize)
.filter_map(|(_, uuid)| self.partitions_by_uuid.get(uuid))
.collect();
// Collect related IDs from all partitions
let mut all_ids = RelatedIds::default();
for partition in &partitions {
all_ids.merge(partition.related_ids());
}
let data: Vec<PartitionDetail> = partitions.iter().map(|p| p.to_detail()).collect();
let index = self.resolve_related_ids(&all_ids);
ListPartitionsResponse {
data,
match_count: self.canonical_partitions.len() as u64,
page,
page_size,
index: Some(index),
}
}
/// List job runs with related entities index
pub fn list_job_runs_with_index(&self, request: &ListJobRunsRequest) -> ListJobRunsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
let start = page * page_size;
let job_runs: Vec<_> = self
.job_runs
.values()
.skip(start as usize)
.take(page_size as usize)
.collect();
// Collect related IDs from all job runs
let mut all_ids = RelatedIds::default();
for job_run in &job_runs {
all_ids.merge(job_run.related_ids());
}
let data: Vec<JobRunDetail> = job_runs.iter().map(|jr| jr.to_detail()).collect();
let index = self.resolve_related_ids(&all_ids);
ListJobRunsResponse {
data,
match_count: self.job_runs.len() as u64,
page,
page_size,
index: Some(index),
}
}
}

View file

@ -85,9 +85,11 @@ message JobRunHeartbeatEventV1 {
string job_run_id = 1;
// TODO reentrance?
}
// Simply indicates that the job has succeeded.
// Indicates that the job has succeeded, including what data was read.
message JobRunSuccessEventV1 {
string job_run_id = 1;
// The read dependencies for this job run, preserving impactedread relationships
repeated ReadDeps read_deps = 2;
}
// Simply indicates that the job has failed. Depending on retry logic defined in job, it may retry.
message JobRunFailureEventV1 {
@ -212,7 +214,10 @@ message WantDetail {
optional string comment = 8;
WantStatus status = 9;
uint64 last_updated_timestamp = 10;
// TODO
// Lineage: all job runs that have serviced this want
repeated string job_run_ids = 11;
// Lineage: derivative wants spawned by this want's job dep-misses
repeated string derivative_want_ids = 12;
}
message PartitionDetail {
@ -230,6 +235,11 @@ message PartitionDetail {
// The unique identifier for this partition instance (UUID as string)
// Each time a partition is built, it gets a new UUID derived from the job_run_id
string uuid = 7;
// Lineage: job run that built this partition (for Live/Tainted partitions)
// Upstream lineage is resolved via this job run's read_deps (job run is source of truth)
optional string built_by_job_run_id = 8;
// Lineage: downstream partition UUIDs that consumed this (from consumer index)
repeated string downstream_partition_uuids = 9;
}
message PartitionStatus {
PartitionStatusCode code = 1;
@ -289,10 +299,26 @@ message JobRunDetail {
optional uint64 last_heartbeat_at = 3;
repeated PartitionRef building_partitions = 4;
repeated WantAttributedPartitions servicing_wants = 5;
// Lineage: read dependencies with resolved UUIDs (for Succeeded jobs)
repeated ReadDeps read_deps = 6;
// Lineage: resolved UUIDs for read partitions (ref UUID)
map<string, string> read_partition_uuids = 7;
// Lineage: resolved UUIDs for written partitions (ref UUID)
map<string, string> wrote_partition_uuids = 8;
// Lineage: derivative wants spawned by this job's dep-miss (for DepMiss jobs)
repeated string derivative_want_ids = 9;
}
// Related entities index - used in API responses for deduplication and O(1) lookup
// Each entity appears once in the index, even if referenced by multiple items in `data`
message RelatedEntities {
map<string, PartitionDetail> partitions = 1;
map<string, JobRunDetail> job_runs = 2;
map<string, WantDetail> wants = 3;
}
message EventFilter {
// IDs of wants to get relevant events for
repeated string want_ids = 1;
@ -308,6 +334,7 @@ message ListWantsResponse {
uint64 match_count = 2;
uint64 page = 3;
uint64 page_size = 4;
RelatedEntities index = 5;
}
message ListTaintsRequest {
@ -320,6 +347,7 @@ message ListTaintsResponse {
uint64 match_count = 2;
uint64 page = 3;
uint64 page_size = 4;
RelatedEntities index = 5;
}
message ListPartitionsRequest {
@ -332,6 +360,7 @@ message ListPartitionsResponse {
uint64 match_count = 2;
uint64 page = 3;
uint64 page_size = 4;
RelatedEntities index = 5;
}
message ListJobRunsRequest {
@ -344,6 +373,7 @@ message ListJobRunsResponse {
uint64 match_count = 2;
uint64 page = 3;
uint64 page_size = 4;
RelatedEntities index = 5;
}
message CreateWantRequest {
@ -372,6 +402,23 @@ message GetWantRequest {
}
message GetWantResponse {
WantDetail data = 1;
RelatedEntities index = 2;
}
message GetPartitionRequest {
string partition_ref = 1;
}
message GetPartitionResponse {
PartitionDetail data = 1;
RelatedEntities index = 2;
}
message GetJobRunRequest {
string job_run_id = 1;
}
message GetJobRunResponse {
JobRunDetail data = 1;
RelatedEntities index = 2;
}
message CreateTaintRequest {

View file

@ -30,6 +30,8 @@ impl From<WantCreateEventV1> for WantDetail {
comment: e.comment,
status: Some(WantStatusCode::WantIdle.into()),
last_updated_timestamp: current_timestamp(),
job_run_ids: vec![],
derivative_want_ids: vec![],
}
}
}
@ -74,12 +76,17 @@ impl From<WantStatusCode> for WantStatus {
impl From<JobRunBufferEventV1> for JobRunDetail {
fn from(value: JobRunBufferEventV1) -> Self {
use std::collections::HashMap;
Self {
id: value.job_run_id,
status: Some(JobRunStatusCode::JobRunQueued.into()),
last_heartbeat_at: None,
building_partitions: value.building_partitions,
servicing_wants: value.want_attributed_partitions,
read_deps: vec![],
read_partition_uuids: HashMap::new(),
wrote_partition_uuids: HashMap::new(),
derivative_want_ids: vec![],
}
}
}
@ -210,7 +217,10 @@ impl Into<CreateWantResponse> for Option<WantDetail> {
impl Into<GetWantResponse> for Option<WantDetail> {
fn into(self) -> GetWantResponse {
GetWantResponse { data: self }
GetWantResponse {
data: self,
index: None,
}
}
}

View file

@ -433,7 +433,7 @@ async fn list_wants_json(
.into_response();
}
};
let response = build_state.list_wants(&params);
let response = build_state.list_wants_with_index(&params);
(StatusCode::OK, Json(response)).into_response()
}
@ -456,10 +456,8 @@ async fn get_want_json(
.into_response();
}
};
let response = build_state.get_want(&want_id);
match response {
Some(want) => (StatusCode::OK, Json(GetWantResponse { data: Some(want) })).into_response(),
match build_state.get_want_with_index(&want_id) {
Some(response) => (StatusCode::OK, Json(response)).into_response(),
None => {
tracing::debug!("Want not found: {}", want_id);
(
@ -608,7 +606,7 @@ async fn list_partitions_json(
.into_response();
}
};
let response = build_state.list_partitions(&params);
let response = build_state.list_partitions_with_index(&params);
(StatusCode::OK, Json(response)).into_response()
}
@ -631,7 +629,7 @@ async fn list_job_runs_json(
.into_response();
}
};
let response = build_state.list_job_runs(&params);
let response = build_state.list_job_runs_with_index(&params);
(StatusCode::OK, Json(response)).into_response()
}

View file

@ -256,6 +256,7 @@ impl SubProcessCompleted {
pub fn to_event(&self, job_run_id: &Uuid) -> Event {
Event::JobRunSuccessV1(JobRunSuccessEventV1 {
job_run_id: job_run_id.to_string(),
read_deps: self.read_deps.clone(),
})
}
}
@ -394,6 +395,7 @@ impl ToEvent for SubProcessCompleted {
fn to_event(&self, job_run_id: &Uuid) -> Event {
Event::JobRunSuccessV1(JobRunSuccessEventV1 {
job_run_id: job_run_id.to_string(),
read_deps: self.read_deps.clone(),
})
}
}

View file

@ -1,9 +1,11 @@
use crate::partition_state::{BuildingPartitionRef, FailedPartitionRef, LivePartitionRef};
use crate::util::current_timestamp;
use crate::util::{HasRelatedIds, RelatedIds, current_timestamp};
use crate::{
EventSource, JobRunDetail, JobRunStatusCode, MissingDeps, PartitionRef, ReadDeps,
WantAttributedPartitions,
};
use std::collections::BTreeMap;
use uuid::Uuid;
/// State: Job has been queued but not yet started
#[derive(Debug, Clone)]
@ -22,6 +24,12 @@ pub struct RunningState {
#[derive(Debug, Clone)]
pub struct SucceededState {
pub completed_at: u64,
/// The read dependencies reported by the job, preserving impacted→read relationships
pub read_deps: Vec<ReadDeps>,
/// Resolved UUIDs for partitions that were read (ref → UUID at read time)
pub read_partition_uuids: BTreeMap<String, String>,
/// Resolved UUIDs for partitions that were written (ref → UUID)
pub wrote_partition_uuids: BTreeMap<String, String>,
}
/// State: Job failed during execution
@ -113,11 +121,20 @@ impl JobRunWithState<RunningState> {
}
/// Transition from Running to Succeeded
pub fn succeed(self, timestamp: u64) -> JobRunWithState<SucceededState> {
pub fn succeed(
self,
timestamp: u64,
read_deps: Vec<ReadDeps>,
read_partition_uuids: BTreeMap<String, String>,
wrote_partition_uuids: BTreeMap<String, String>,
) -> JobRunWithState<SucceededState> {
JobRunWithState {
info: self.info,
state: SucceededState {
completed_at: timestamp,
read_deps,
read_partition_uuids,
wrote_partition_uuids,
},
}
}
@ -231,6 +248,21 @@ impl JobRunWithState<SucceededState> {
.map(|p| LivePartitionRef(p.clone()))
.collect()
}
/// Get the read dependencies reported by the job
pub fn get_read_deps(&self) -> &[ReadDeps] {
&self.state.read_deps
}
/// Get the resolved UUIDs for partitions that were read
pub fn get_read_partition_uuids(&self) -> &BTreeMap<String, String> {
&self.state.read_partition_uuids
}
/// Get the resolved UUIDs for partitions that were written
pub fn get_wrote_partition_uuids(&self) -> &BTreeMap<String, String> {
&self.state.wrote_partition_uuids
}
}
impl JobRunWithState<FailedState> {
@ -290,10 +322,128 @@ impl JobRunWithState<CanceledState> {
}
}
// ==================== HasRelatedIds trait implementation ====================
impl HasRelatedIds for JobRun {
/// Get the IDs of all entities this job run references.
/// Note: derivative_want_ids come from BuildState, not from JobRun itself.
fn related_ids(&self) -> RelatedIds {
// Partition refs from building_partitions (all states have this)
let partition_refs: Vec<String> = match self {
JobRun::Queued(jr) => jr
.info
.building_partitions
.iter()
.map(|p| p.r#ref.clone())
.collect(),
JobRun::Running(jr) => jr
.info
.building_partitions
.iter()
.map(|p| p.r#ref.clone())
.collect(),
JobRun::Succeeded(jr) => jr
.info
.building_partitions
.iter()
.map(|p| p.r#ref.clone())
.collect(),
JobRun::Failed(jr) => jr
.info
.building_partitions
.iter()
.map(|p| p.r#ref.clone())
.collect(),
JobRun::DepMiss(jr) => jr
.info
.building_partitions
.iter()
.map(|p| p.r#ref.clone())
.collect(),
JobRun::Canceled(jr) => jr
.info
.building_partitions
.iter()
.map(|p| p.r#ref.clone())
.collect(),
};
// Partition UUIDs from read/write lineage (only Succeeded state has these)
let partition_uuids: Vec<Uuid> = match self {
JobRun::Succeeded(jr) => {
let mut uuids = Vec::new();
for uuid_str in jr.state.read_partition_uuids.values() {
if let Ok(uuid) = Uuid::parse_str(uuid_str) {
uuids.push(uuid);
}
}
for uuid_str in jr.state.wrote_partition_uuids.values() {
if let Ok(uuid) = Uuid::parse_str(uuid_str) {
if !uuids.contains(&uuid) {
uuids.push(uuid);
}
}
}
uuids
}
_ => vec![],
};
// Want IDs from servicing_wants (all states have this)
let want_ids: Vec<String> = match self {
JobRun::Queued(jr) => jr
.info
.servicing_wants
.iter()
.map(|w| w.want_id.clone())
.collect(),
JobRun::Running(jr) => jr
.info
.servicing_wants
.iter()
.map(|w| w.want_id.clone())
.collect(),
JobRun::Succeeded(jr) => jr
.info
.servicing_wants
.iter()
.map(|w| w.want_id.clone())
.collect(),
JobRun::Failed(jr) => jr
.info
.servicing_wants
.iter()
.map(|w| w.want_id.clone())
.collect(),
JobRun::DepMiss(jr) => jr
.info
.servicing_wants
.iter()
.map(|w| w.want_id.clone())
.collect(),
JobRun::Canceled(jr) => jr
.info
.servicing_wants
.iter()
.map(|w| w.want_id.clone())
.collect(),
};
RelatedIds {
partition_refs,
partition_uuids,
job_run_ids: vec![],
want_ids,
}
}
}
// ==================== Conversion to JobRunDetail for API ====================
impl JobRun {
pub fn to_detail(&self) -> JobRunDetail {
use std::collections::HashMap;
match self {
JobRun::Queued(queued) => JobRunDetail {
id: queued.info.id.clone(),
@ -301,6 +451,10 @@ impl JobRun {
last_heartbeat_at: None,
building_partitions: queued.info.building_partitions.clone(),
servicing_wants: queued.info.servicing_wants.clone(),
read_deps: vec![],
read_partition_uuids: HashMap::new(),
wrote_partition_uuids: HashMap::new(),
derivative_want_ids: vec![],
},
JobRun::Running(running) => JobRunDetail {
id: running.info.id.clone(),
@ -308,6 +462,10 @@ impl JobRun {
last_heartbeat_at: Some(running.state.last_heartbeat_at),
building_partitions: running.info.building_partitions.clone(),
servicing_wants: running.info.servicing_wants.clone(),
read_deps: vec![],
read_partition_uuids: HashMap::new(),
wrote_partition_uuids: HashMap::new(),
derivative_want_ids: vec![],
},
JobRun::Succeeded(succeeded) => JobRunDetail {
id: succeeded.info.id.clone(),
@ -315,6 +473,20 @@ impl JobRun {
last_heartbeat_at: None,
building_partitions: succeeded.info.building_partitions.clone(),
servicing_wants: succeeded.info.servicing_wants.clone(),
read_deps: succeeded.state.read_deps.clone(),
read_partition_uuids: succeeded
.state
.read_partition_uuids
.clone()
.into_iter()
.collect(),
wrote_partition_uuids: succeeded
.state
.wrote_partition_uuids
.clone()
.into_iter()
.collect(),
derivative_want_ids: vec![],
},
JobRun::Failed(failed) => JobRunDetail {
id: failed.info.id.clone(),
@ -322,6 +494,10 @@ impl JobRun {
last_heartbeat_at: None,
building_partitions: failed.info.building_partitions.clone(),
servicing_wants: failed.info.servicing_wants.clone(),
read_deps: vec![],
read_partition_uuids: HashMap::new(),
wrote_partition_uuids: HashMap::new(),
derivative_want_ids: vec![],
},
JobRun::DepMiss(dep_miss) => JobRunDetail {
id: dep_miss.info.id.clone(),
@ -329,6 +505,12 @@ impl JobRun {
last_heartbeat_at: None,
building_partitions: dep_miss.info.building_partitions.clone(),
servicing_wants: dep_miss.info.servicing_wants.clone(),
read_deps: dep_miss.state.read_deps.clone(),
read_partition_uuids: HashMap::new(),
wrote_partition_uuids: HashMap::new(),
// Note: derivative_want_ids would need to be populated from BuildState
// since the job doesn't track which wants it spawned (BEL does)
derivative_want_ids: vec![],
},
JobRun::Canceled(canceled) => JobRunDetail {
id: canceled.info.id.clone(),
@ -336,6 +518,10 @@ impl JobRun {
last_heartbeat_at: None,
building_partitions: canceled.info.building_partitions.clone(),
servicing_wants: canceled.info.servicing_wants.clone(),
read_deps: vec![],
read_partition_uuids: HashMap::new(),
wrote_partition_uuids: HashMap::new(),
derivative_want_ids: vec![],
},
}
}

View file

@ -1073,6 +1073,8 @@ echo 'Beta succeeded'
comment: None,
status: None,
last_updated_timestamp: 0,
job_run_ids: vec![],
derivative_want_ids: vec![],
}
}

View file

@ -1,3 +1,4 @@
use crate::util::{HasRelatedIds, RelatedIds};
use crate::{PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
@ -58,6 +59,8 @@ pub struct UpstreamFailedState {
pub struct TaintedState {
pub tainted_at: u64,
pub taint_ids: Vec<String>,
/// Job run that originally built this partition (before it was tainted)
pub built_by: String,
}
/// Generic partition struct parameterized by state.
@ -250,6 +253,7 @@ impl PartitionWithState<LiveState> {
state: TaintedState {
tainted_at: timestamp,
taint_ids: vec![taint_id],
built_by: self.state.built_by,
},
}
}
@ -337,9 +341,57 @@ impl Partition {
pub fn is_tainted(&self) -> bool {
matches!(self, Partition::Tainted(_))
}
}
// ==================== HasRelatedIds trait implementation ====================
impl HasRelatedIds for Partition {
/// Get the IDs of all entities this partition references.
/// Note: downstream_partition_uuids and want_ids come from BuildState indexes,
/// not from Partition itself.
fn related_ids(&self) -> RelatedIds {
// Job run ID from the builder (for states that track it)
let job_run_ids: Vec<String> = match self {
Partition::Building(p) => vec![p.state.job_run_id.clone()],
Partition::UpstreamBuilding(p) => vec![p.state.job_run_id.clone()],
Partition::UpForRetry(p) => vec![p.state.original_job_run_id.clone()],
Partition::Live(p) => vec![p.state.built_by.clone()],
Partition::Failed(p) => vec![p.state.failed_by.clone()],
Partition::UpstreamFailed(_) => vec![],
Partition::Tainted(p) => vec![p.state.built_by.clone()],
};
// Partition refs from missing deps (for UpstreamBuilding state)
let partition_refs: Vec<String> = match self {
Partition::UpstreamBuilding(p) => p
.state
.missing_deps
.iter()
.map(|d| d.r#ref.clone())
.collect(),
Partition::UpstreamFailed(p) => p
.state
.failed_upstream_refs
.iter()
.map(|d| d.r#ref.clone())
.collect(),
_ => vec![],
};
RelatedIds {
partition_refs,
partition_uuids: vec![],
job_run_ids,
want_ids: vec![],
}
}
}
impl Partition {
/// Convert to PartitionDetail for API responses and queries.
/// Note: want_ids is now empty - this will be populated by BuildState from the inverted index.
/// Note: want_ids and downstream_partition_uuids are empty here and will be
/// populated by BuildState from its inverted indexes.
/// Upstream lineage is resolved via built_by_job_run_id → job run's read_deps.
pub fn to_detail(&self) -> PartitionDetail {
match self {
Partition::Building(p) => PartitionDetail {
@ -353,6 +405,8 @@ impl Partition {
taint_ids: vec![],
last_updated_timestamp: None,
uuid: p.uuid.to_string(),
built_by_job_run_id: None,
downstream_partition_uuids: vec![], // Populated by BuildState
},
Partition::UpstreamBuilding(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
@ -365,6 +419,8 @@ impl Partition {
taint_ids: vec![],
last_updated_timestamp: None,
uuid: p.uuid.to_string(),
built_by_job_run_id: None,
downstream_partition_uuids: vec![], // Populated by BuildState
},
Partition::UpForRetry(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
@ -377,6 +433,8 @@ impl Partition {
taint_ids: vec![],
last_updated_timestamp: None,
uuid: p.uuid.to_string(),
built_by_job_run_id: None,
downstream_partition_uuids: vec![], // Populated by BuildState
},
Partition::Live(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
@ -389,6 +447,8 @@ impl Partition {
taint_ids: vec![],
last_updated_timestamp: Some(p.state.built_at),
uuid: p.uuid.to_string(),
built_by_job_run_id: Some(p.state.built_by.clone()),
downstream_partition_uuids: vec![], // Populated by BuildState
},
Partition::Failed(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
@ -401,6 +461,8 @@ impl Partition {
taint_ids: vec![],
last_updated_timestamp: Some(p.state.failed_at),
uuid: p.uuid.to_string(),
built_by_job_run_id: None,
downstream_partition_uuids: vec![], // Populated by BuildState
},
Partition::UpstreamFailed(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
@ -413,6 +475,8 @@ impl Partition {
taint_ids: vec![],
last_updated_timestamp: Some(p.state.failed_at),
uuid: p.uuid.to_string(),
built_by_job_run_id: None,
downstream_partition_uuids: vec![], // Populated by BuildState
},
Partition::Tainted(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
@ -421,10 +485,12 @@ impl Partition {
name: "PartitionTainted".to_string(),
}),
want_ids: vec![], // Populated by BuildState
job_run_ids: vec![],
job_run_ids: vec![p.state.built_by.clone()],
taint_ids: p.state.taint_ids.clone(),
last_updated_timestamp: Some(p.state.tainted_at),
uuid: p.uuid.to_string(),
built_by_job_run_id: Some(p.state.built_by.clone()),
downstream_partition_uuids: vec![], // Populated by BuildState
},
}
}

View file

@ -1,5 +1,57 @@
use std::backtrace::Backtrace;
use std::time::{SystemTime, UNIX_EPOCH};
use uuid::Uuid;
// ============================================================================
// Related IDs - for building RelatedEntities index in API responses
// ============================================================================
/// IDs of related entities that an object references.
/// Used by the query layer to build the RelatedEntities index for API responses.
#[derive(Debug, Clone, Default)]
pub struct RelatedIds {
/// Partition refs (e.g., from Want.partitions, JobRun.building_partitions)
pub partition_refs: Vec<String>,
/// Partition UUIDs (e.g., from read/write lineage, consumer index)
pub partition_uuids: Vec<Uuid>,
/// Job run IDs (e.g., from built_by, consumer jobs)
pub job_run_ids: Vec<String>,
/// Want IDs (e.g., derivative wants, upstream wants, servicing wants)
pub want_ids: Vec<String>,
}
impl RelatedIds {
/// Merge another RelatedIds into this one, deduplicating
pub fn merge(&mut self, other: RelatedIds) {
for r in other.partition_refs {
if !self.partition_refs.contains(&r) {
self.partition_refs.push(r);
}
}
for u in other.partition_uuids {
if !self.partition_uuids.contains(&u) {
self.partition_uuids.push(u);
}
}
for j in other.job_run_ids {
if !self.job_run_ids.contains(&j) {
self.job_run_ids.push(j);
}
}
for w in other.want_ids {
if !self.want_ids.contains(&w) {
self.want_ids.push(w);
}
}
}
}
/// Trait for entities that can report their related entity IDs.
/// Used by the query layer to build RelatedEntities indexes for API responses.
/// Implementing types: Want, JobRun, Partition
pub trait HasRelatedIds {
fn related_ids(&self) -> RelatedIds;
}
pub fn current_timestamp() -> u64 {
let now = SystemTime::now();
@ -89,3 +141,173 @@ impl std::fmt::Display for DatabuildError {
write!(f, "{}", self.msg)
}
}
// ============================================================================
// Test Scenarios - reusable BEL event sequences for testing
// ============================================================================
#[cfg(test)]
pub mod test_scenarios {
use crate::data_build_event::Event;
use crate::{
JobRunBufferEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1,
JobRunSuccessEventV1, MissingDeps, PartitionRef, ReadDeps, WantAttributedPartitions,
WantCreateEventV1,
};
/// IDs used in the multihop scenario for easy reference in tests
pub struct MultihopIds {
pub beta_want_id: String,
pub alpha_want_id: String,
pub beta_job_1_id: String,
pub beta_job_2_id: String,
pub alpha_job_id: String,
}
impl Default for MultihopIds {
fn default() -> Self {
Self {
beta_want_id: "beta-want".to_string(),
alpha_want_id: "alpha-want".to_string(),
beta_job_1_id: "beta-job-1".to_string(),
beta_job_2_id: "beta-job-2".to_string(),
alpha_job_id: "alpha-job".to_string(),
}
}
}
/// Creates a multihop dependency scenario:
/// 1. Want for data/beta is created
/// 2. Job beta-job-1 starts, discovers missing dep on data/alpha
/// 3. Derivative want for data/alpha is created
/// 4. Job alpha-job builds data/alpha successfully
/// 5. Job beta-job-2 retries and succeeds
///
/// This exercises: want creation, job buffering, dep-miss, derivative wants,
/// job success with read_deps, and retry logic.
pub fn multihop_scenario() -> (Vec<Event>, MultihopIds) {
let ids = MultihopIds::default();
let mut events = vec![];
// 1. Create want for data/beta
events.push(Event::WantCreateV1(WantCreateEventV1 {
want_id: ids.beta_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
..Default::default()
}));
// 2. Queue beta job (first attempt)
events.push(Event::JobRunBufferV1(JobRunBufferEventV1 {
job_run_id: ids.beta_job_1_id.clone(),
job_label: "//job_beta".to_string(),
want_attributed_partitions: vec![WantAttributedPartitions {
want_id: ids.beta_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
}],
building_partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
..Default::default()
}));
// 3. Beta job starts running
events.push(Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 {
job_run_id: ids.beta_job_1_id.clone(),
..Default::default()
}));
// 4. Beta job reports missing dependency on data/alpha
events.push(Event::JobRunMissingDepsV1(JobRunMissingDepsEventV1 {
job_run_id: ids.beta_job_1_id.clone(),
missing_deps: vec![MissingDeps {
impacted: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
missing: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
}],
..Default::default()
}));
// 5. Create derivative want for data/alpha
events.push(Event::WantCreateV1(WantCreateEventV1 {
want_id: ids.alpha_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
..Default::default()
}));
// 6. Queue alpha job
events.push(Event::JobRunBufferV1(JobRunBufferEventV1 {
job_run_id: ids.alpha_job_id.clone(),
job_label: "//job_alpha".to_string(),
want_attributed_partitions: vec![WantAttributedPartitions {
want_id: ids.alpha_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
}],
building_partitions: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
..Default::default()
}));
// 7. Alpha job starts running
events.push(Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 {
job_run_id: ids.alpha_job_id.clone(),
..Default::default()
}));
// 8. Alpha job succeeds (no read deps for leaf node)
events.push(Event::JobRunSuccessV1(JobRunSuccessEventV1 {
job_run_id: ids.alpha_job_id.clone(),
read_deps: vec![],
..Default::default()
}));
// 9. Queue beta job again (second attempt - retry)
events.push(Event::JobRunBufferV1(JobRunBufferEventV1 {
job_run_id: ids.beta_job_2_id.clone(),
job_label: "//job_beta".to_string(),
want_attributed_partitions: vec![WantAttributedPartitions {
want_id: ids.beta_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
}],
building_partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
..Default::default()
}));
// 10. Beta job 2 starts running
events.push(Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 {
job_run_id: ids.beta_job_2_id.clone(),
..Default::default()
}));
// 11. Beta job 2 succeeds with read_deps showing it read data/alpha
events.push(Event::JobRunSuccessV1(JobRunSuccessEventV1 {
job_run_id: ids.beta_job_2_id.clone(),
read_deps: vec![ReadDeps {
impacted: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
read: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
}],
..Default::default()
}));
(events, ids)
}
}

View file

@ -1,5 +1,5 @@
use crate::partition_state::FailedPartitionRef;
use crate::util::current_timestamp;
use crate::util::{HasRelatedIds, RelatedIds, current_timestamp};
use crate::{EventSource, PartitionRef, WantCreateEventV1, WantDetail, WantStatusCode};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
@ -464,6 +464,35 @@ impl WantWithState<UpstreamBuildingState> {
}
}
// ==================== HasRelatedIds trait implementation ====================
impl HasRelatedIds for Want {
/// Get the IDs of all entities this want references.
/// Note: job_run_ids come from inverted indexes in BuildState, not from Want itself.
fn related_ids(&self) -> RelatedIds {
let partition_refs = self
.want()
.partitions
.iter()
.map(|p| p.r#ref.clone())
.collect();
// Collect want IDs from state-specific relationships
let want_ids = match self {
Want::UpstreamBuilding(w) => w.state.upstream_want_ids.clone(),
Want::UpstreamFailed(w) => w.state.failed_wants.clone(),
_ => vec![],
};
RelatedIds {
partition_refs,
partition_uuids: vec![],
job_run_ids: vec![],
want_ids,
}
}
}
// Helper methods on the Want enum
impl Want {
/// Create a new want in the Idle state
@ -522,7 +551,9 @@ impl Want {
}
}
/// Convert to WantDetail for API responses and queries
/// Convert to WantDetail for API responses and queries.
/// Note: job_run_ids and derivative_want_ids are empty here and will be
/// populated by BuildState from its inverted indexes.
pub fn to_detail(&self) -> WantDetail {
WantDetail {
want_id: self.want().want_id.clone(),
@ -544,6 +575,8 @@ impl Want {
Want::UpstreamFailed(_) => Some(WantStatusCode::WantUpstreamFailed.into()),
Want::Canceled(_) => Some(WantStatusCode::WantCanceled.into()),
},
job_run_ids: vec![], // Populated by BuildState
derivative_want_ids: vec![], // Populated by BuildState
}
}
}

View file

@ -3,7 +3,7 @@
use askama::Template;
use crate::{
JobRunDetail, JobRunStatus, PartitionDetail, PartitionRef, PartitionStatus,
JobRunDetail, JobRunStatus, PartitionDetail, PartitionRef, PartitionStatus, ReadDeps,
WantAttributedPartitions, WantDetail, WantStatus,
};
@ -84,6 +84,9 @@ pub struct WantDetailView {
pub comment_display: String,
pub status: Option<WantStatusView>,
pub last_updated_timestamp: u64,
// Lineage fields
pub job_run_ids: Vec<String>,
pub derivative_want_ids: Vec<String>,
}
impl From<&WantDetail> for WantDetailView {
@ -99,6 +102,8 @@ impl From<&WantDetail> for WantDetailView {
comment_display: w.comment.as_deref().unwrap_or("-").to_string(),
status: w.status.as_ref().map(WantStatusView::from),
last_updated_timestamp: w.last_updated_timestamp,
job_run_ids: w.job_run_ids.clone(),
derivative_want_ids: w.derivative_want_ids.clone(),
}
}
}
@ -119,6 +124,9 @@ pub struct PartitionDetailView {
pub want_ids: Vec<String>,
pub taint_ids: Vec<String>,
pub uuid: String,
// Lineage fields
pub built_by_job_run_id: Option<String>,
pub downstream_partition_uuids: Vec<String>,
}
impl From<&PartitionDetail> for PartitionDetailView {
@ -141,6 +149,8 @@ impl From<&PartitionDetail> for PartitionDetailView {
want_ids: p.want_ids.clone(),
taint_ids: p.taint_ids.clone(),
uuid: p.uuid.clone(),
built_by_job_run_id: p.built_by_job_run_id.clone(),
downstream_partition_uuids: p.downstream_partition_uuids.clone(),
}
}
}
@ -165,16 +175,65 @@ impl From<&WantAttributedPartitions> for WantAttributedPartitionsView {
}
}
/// View for read dependency entries (impacted → read relationships)
pub struct ReadDepsView {
pub impacted: Vec<PartitionRefView>,
pub read: Vec<PartitionRefView>,
}
impl From<&ReadDeps> for ReadDepsView {
fn from(rd: &ReadDeps) -> Self {
Self {
impacted: rd.impacted.iter().map(PartitionRefView::from).collect(),
read: rd.read.iter().map(PartitionRefView::from).collect(),
}
}
}
/// View for partition ref with its resolved UUID (for lineage display)
pub struct PartitionRefWithUuidView {
pub partition_ref: String,
pub partition_ref_encoded: String,
pub uuid: String,
}
pub struct JobRunDetailView {
pub id: String,
pub status: Option<JobRunStatusView>,
pub last_heartbeat_at: Option<u64>,
pub building_partitions: Vec<PartitionRefView>,
pub servicing_wants: Vec<WantAttributedPartitionsView>,
// Lineage fields (populated for Succeeded/DepMiss states)
pub read_deps: Vec<ReadDepsView>,
pub read_partitions: Vec<PartitionRefWithUuidView>,
pub wrote_partitions: Vec<PartitionRefWithUuidView>,
pub derivative_want_ids: Vec<String>,
}
impl From<&JobRunDetail> for JobRunDetailView {
fn from(jr: &JobRunDetail) -> Self {
// Build read_partitions from read_partition_uuids map
let read_partitions: Vec<PartitionRefWithUuidView> = jr
.read_partition_uuids
.iter()
.map(|(partition_ref, uuid)| PartitionRefWithUuidView {
partition_ref: partition_ref.clone(),
partition_ref_encoded: urlencoding::encode(partition_ref).into_owned(),
uuid: uuid.clone(),
})
.collect();
// Build wrote_partitions from wrote_partition_uuids map
let wrote_partitions: Vec<PartitionRefWithUuidView> = jr
.wrote_partition_uuids
.iter()
.map(|(partition_ref, uuid)| PartitionRefWithUuidView {
partition_ref: partition_ref.clone(),
partition_ref_encoded: urlencoding::encode(partition_ref).into_owned(),
uuid: uuid.clone(),
})
.collect();
Self {
id: jr.id.clone(),
status: jr.status.as_ref().map(JobRunStatusView::from),
@ -189,6 +248,10 @@ impl From<&JobRunDetail> for JobRunDetailView {
.iter()
.map(WantAttributedPartitionsView::from)
.collect(),
read_deps: jr.read_deps.iter().map(ReadDepsView::from).collect(),
read_partitions,
wrote_partitions,
derivative_want_ids: jr.derivative_want_ids.clone(),
}
}
}
@ -361,3 +424,91 @@ pub struct JobRunDetailPage {
pub struct WantCreatePage {
pub base: BaseContext,
}
// =============================================================================
// Tests
// =============================================================================
#[cfg(test)]
mod tests {
use super::*;
use crate::build_state::BuildState;
use crate::util::test_scenarios::multihop_scenario;
use askama::Template;
/// Helper to replay events into a fresh BuildState
fn build_state_from_events(events: &[crate::data_build_event::Event]) -> BuildState {
let mut state = BuildState::default();
for event in events {
state.handle_event(event);
}
state
}
mod want_detail_page {
use super::*;
/// Tests that the want detail page shows the job runs that serviced this want.
/// This is the "Fulfillment - Job Runs" section in the UI.
///
/// Given: The multihop scenario completes (beta job dep-misses, alpha built, beta retries)
/// When: We render the beta want detail page
/// Then: It should show both beta job runs (beta-job-1 and beta-job-2)
#[test]
fn test_shows_servicing_job_runs() {
let (events, ids) = multihop_scenario();
let state = build_state_from_events(&events);
// Get beta want and render its detail page
let want_detail = state
.get_want(&ids.beta_want_id)
.expect("beta want should exist");
let template = WantDetailPage {
base: BaseContext::default(),
want: WantDetailView::from(want_detail),
};
let html = template.render().expect("template should render");
// Verify the Fulfillment section exists and contains both job run IDs
assert!(
html.contains(&ids.beta_job_1_id),
"Should show beta-job-1 in fulfillment section. HTML:\n{}",
html
);
assert!(
html.contains(&ids.beta_job_2_id),
"Should show beta-job-2 in fulfillment section. HTML:\n{}",
html
);
}
/// Tests that the want detail page shows derivative wants spawned by dep-miss.
/// This is the "Fulfillment - Derivative Wants" section in the UI.
///
/// Given: The multihop scenario completes (beta job dep-misses, spawning alpha want)
/// When: We render the beta want detail page
/// Then: It should show the alpha want as a derivative
#[test]
fn test_shows_derivative_wants() {
let (events, ids) = multihop_scenario();
let state = build_state_from_events(&events);
// Get beta want and render its detail page
let want_detail = state
.get_want(&ids.beta_want_id)
.expect("beta want should exist");
let template = WantDetailPage {
base: BaseContext::default(),
want: WantDetailView::from(want_detail),
};
let html = template.render().expect("template should render");
// Verify the Fulfillment section exists and contains the derivative want
assert!(
html.contains(&ids.alpha_want_id),
"Should show alpha-want as derivative want. HTML:\n{}",
html
);
}
}
}

View file

@ -54,4 +54,43 @@
</div>
{% endif %}
{% if !job_run.read_partitions.is_empty() %}
<div class="detail-section">
<h2>Read Partitions ({{ job_run.read_partitions.len() }})</h2>
<ul class="partition-list">
{% for p in job_run.read_partitions %}
<li>
<a href="/partitions/{{ p.partition_ref_encoded }}" class="partition-ref">{{ p.partition_ref }}</a>
<span style="color:var(--color-text-muted);font-size:.75rem;font-family:monospace;margin-left:.5rem">uuid: {{ p.uuid }}</span>
</li>
{% endfor %}
</ul>
</div>
{% endif %}
{% if !job_run.wrote_partitions.is_empty() %}
<div class="detail-section">
<h2>Wrote Partitions ({{ job_run.wrote_partitions.len() }})</h2>
<ul class="partition-list">
{% for p in job_run.wrote_partitions %}
<li>
<a href="/partitions/{{ p.partition_ref_encoded }}" class="partition-ref">{{ p.partition_ref }}</a>
<span style="color:var(--color-text-muted);font-size:.75rem;font-family:monospace;margin-left:.5rem">uuid: {{ p.uuid }}</span>
</li>
{% endfor %}
</ul>
</div>
{% endif %}
{% if !job_run.derivative_want_ids.is_empty() %}
<div class="detail-section">
<h2>Derivative Wants ({{ job_run.derivative_want_ids.len() }})</h2>
<ul class="partition-list">
{% for id in job_run.derivative_want_ids %}
<li><a href="/wants/{{ id }}">{{ id }}</a></li>
{% endfor %}
</ul>
</div>
{% endif %}
{% call base::footer() %}

View file

@ -32,9 +32,34 @@
</div>
</div>
{% match partition.built_by_job_run_id %}
{% when Some with (job_run_id) %}
<div class="detail-section">
<h2>Lineage - Built By</h2>
<p>
<a href="/job_runs/{{ job_run_id }}">{{ job_run_id }}</a>
<span style="color:var(--color-text-muted);font-size:.75rem;margin-left:.5rem">(view job run for input partitions)</span>
</p>
</div>
{% when None %}
{% endmatch %}
{% if !partition.downstream_partition_uuids.is_empty() %}
<div class="detail-section">
<h2>Lineage - Downstream Consumers ({{ partition.downstream_partition_uuids.len() }})</h2>
<ul class="partition-list">
{% for uuid in partition.downstream_partition_uuids %}
<li>
<span style="font-family:monospace;font-size:.8125rem">{{ uuid }}</span>
</li>
{% endfor %}
</ul>
</div>
{% endif %}
{% if !partition.job_run_ids.is_empty() %}
<div class="detail-section">
<h2>Job Runs ({{ partition.job_run_ids.len() }})</h2>
<h2>Related Job Runs ({{ partition.job_run_ids.len() }})</h2>
<ul class="partition-list">
{% for id in partition.job_run_ids %}
<li><a href="/job_runs/{{ id }}">{{ id }}</a></li>

View file

@ -65,4 +65,26 @@
</div>
{% endif %}
{% if !want.job_run_ids.is_empty() %}
<div class="detail-section">
<h2>Fulfillment - Job Runs ({{ want.job_run_ids.len() }})</h2>
<ul class="partition-list">
{% for id in want.job_run_ids %}
<li><a href="/job_runs/{{ id }}">{{ id }}</a></li>
{% endfor %}
</ul>
</div>
{% endif %}
{% if !want.derivative_want_ids.is_empty() %}
<div class="detail-section">
<h2>Fulfillment - Derivative Wants ({{ want.derivative_want_ids.len() }})</h2>
<ul class="partition-list">
{% for id in want.derivative_want_ids %}
<li><a href="/wants/{{ id }}">{{ id }}</a></li>
{% endfor %}
</ul>
</div>
{% endif %}
{% call base::footer() %}

View file

@ -0,0 +1,7 @@
# What LLMs Don't Do
- Create and cultivate technical strategy
- Don't have a specific vision of the organizing formalization of the problem + the technical solution
- Adhere to technical strategy
- Please please please just read the relevant docs!

View file

@ -0,0 +1,230 @@
# Detail & Lineage Views
## Vision
Provide rich, navigable views into databuild's execution history that answer operational questions:
- **"What work was done to fulfill this want?"** - The full DAG of partitions built and jobs run
- **"Where did this data come from?"** - Trace a partition's lineage back through its inputs
- **"What downstream data uses this?"** - Understand impact before tainting or debugging staleness
## Three Distinct Views
### 1. Want Fulfillment View
Shows the work tree rooted at a want: all partitions built, jobs run, and derivative wants spawned to fulfill it.
```
W-001 "data/gamma" [Successful]
├── data/gamma [Live, uuid:abc]
│ └── JR-789 [Succeeded]
│ ├── read: data/beta [Live, uuid:def]
│ └── read: data/alpha [Live, uuid:ghi]
└── derivative: W-002 "data/beta" [Successful]
│ └── triggered by: JR-456 dep-miss
└── data/beta [Live, uuid:def]
└── JR-456 [DepMiss → retry → Succeeded]
└── read: data/alpha [Live, uuid:ghi]
```
Key insight: This shows **specific partition instances** (by UUID), not just refs. A want's fulfillment is a concrete snapshot of what was built.
### 2. Partition Lineage View
The data flow graph: partition ↔ job_run alternating. Navigable upstream (inputs) and downstream (consumers).
```
UPSTREAM
┌────────────┼────────────┐
▼ ▼ ▼
[data/a] [data/b] [data/c]
│ │ │
└────────────┼────────────┘
JR-xyz [Succeeded]
══════════════════
║ data/beta ║ ← FOCUS
║ [Live] ║
══════════════════
JR-abc [Running]
┌────────────┼────────────┐
▼ ▼ ▼
[data/x] [data/y] [data/z]
DOWNSTREAM
```
This view answers: "What data flows into/out of this partition?" Click to navigate.
### 3. JobRun Detail View
Not a graph - just the immediate context of a single job execution:
- **Scheduled for**: Which want(s) triggered this job
- **Read**: Input partitions (with UUIDs - the specific versions read)
- **Wrote**: Output partitions (with UUIDs)
- **Status history**: Queued → Running → Succeeded/Failed/DepMiss
- **If DepMiss**: Which derivative wants were spawned
## Data Requirements
### Track read_deps on success
Currently only captured on dep-miss. Need to extend `JobRunSuccessEventV1`:
```protobuf
message JobRunSuccessEventV1 {
string job_run_id = 1;
repeated ReadDeps read_deps = 2; // NEW
}
```
### Inverted consumer index
To answer "what reads this partition", need:
```rust
partition_consumers: BTreeMap<Uuid, Vec<(Uuid, String)>> // input_uuid → (output_uuid, job_run_id)
```
Indexed by UUID (not ref) because partition refs get reused across rebuilds, but UUIDs are immutable per instance. This preserves historical lineage correctly.
Built from read_deps when processing JobRunSuccessEventV1.
## Design Decisions
1. **Retries**: List all job runs triggered by a want, collapsing retries in the UI (expandable)
2. **Lineage UUIDs**: Resolve partition refs to canonical UUIDs at job success time (jobs don't need to know about UUIDs)
3. **High fan-out**: Truncate to N items with "+X more" expansion
4. **Consumer index by UUID**: Index consumers by partition UUID (not ref) since refs get reused across rebuilds but UUIDs are immutable per instance
5. **Job run as lineage source of truth**: Partition details don't duplicate upstream info - they reference their builder job run, which holds the read_deps
## API Response Pattern
Detail and list endpoints return a wrapper with the primary data plus a shared index of related entities:
```protobuf
message GetJobRunResponse {
JobRunDetail data = 1;
RelatedEntities index = 2;
}
message ListJobRunsResponse {
repeated JobRunDetail data = 1;
RelatedEntities index = 2; // shared across all items
}
message RelatedEntities {
map<string, PartitionDetail> partitions = 1;
map<string, JobRunDetail> job_runs = 2;
map<string, WantDetail> wants = 3;
}
```
**Why this pattern:**
- **No recursion** - Detail types stay flat, don't embed each other
- **Deduplication** - Each entity appears once in the index, even if referenced by multiple items in `data`
- **O(1) lookup** - Templates access `index.partitions["data/beta"]` directly
- **Composable** - Same pattern works for single-item and list endpoints
## Implementation Plan
### ✅ Phase 1: Data Model (Complete)
**1.1 Extend JobRunSuccessEventV1**
```protobuf
message JobRunSuccessEventV1 {
string job_run_id = 1;
repeated ReadDeps read_deps = 2; // preserves impacted→read relationships
}
```
**1.2 Extend SucceededState to store resolved UUIDs**
```rust
pub struct SucceededState {
pub succeeded_at: u64,
pub read_deps: Vec<ReadDeps>, // from event
pub read_partition_uuids: BTreeMap<String, Uuid>, // ref → UUID at read time
pub wrote_partition_uuids: BTreeMap<String, Uuid>, // ref → UUID (from building_partitions)
}
```
UUIDs resolved by looking up canonical partitions when processing success event.
**1.3 Add consumer index to BuildState**
```rust
// input_uuid → list of (output_uuid, job_run_id)
partition_consumers: BTreeMap<Uuid, Vec<(Uuid, String)>>
```
Populated from `read_deps` when processing JobRunSuccessEventV1. Uses UUIDs (not refs) to preserve historical lineage across partition rebuilds.
### ✅ Phase 2: API Response Pattern (Complete)
**2.1 RelatedEntities wrapper**
Added `RelatedEntities` message and `index` field to all Get*/List* responses.
**2.2 HasRelatedIds trait**
Implemented trait for Want, JobRun, Partition that returns the IDs of related entities. Query layer uses this to build the index.
**2.3 Query methods**
Added `*_with_index()` methods that collect related IDs via the trait and resolve them to full entity details.
### ✅ Phase 3: Job Integration (Complete)
Jobs already emit `DATABUILD_DEP_READ_JSON` and the full pipeline is wired up:
1. **Job execution** (`job_run.rs`): `SubProcessBackend::poll` parses `DATABUILD_DEP_READ_JSON` lines from stdout and stores in `SubProcessCompleted.read_deps`
2. **Event creation** (`job_run.rs`): `to_event()` creates `JobRunSuccessEventV1` with `read_deps`
3. **Event handling** (`event_handlers.rs`): `handle_job_run_success()` resolves `read_partition_uuids` and `wrote_partition_uuids`, populates `partition_consumers` index
4. **API serialization** (`job_run_state.rs`): `to_detail()` includes `read_deps`, `read_partition_uuids`, `wrote_partition_uuids` in `JobRunDetail`
### ✅ Phase 4: Frontend (Complete)
**4.1 JobRun detail page**
Added to `job_runs/detail.html`:
- "Read Partitions" section showing partition refs with UUIDs (linked to partition detail)
- "Wrote Partitions" section showing partition refs with UUIDs (linked to partition detail)
- "Derivative Wants" section showing wants spawned by dep-miss (linked to want detail)
Extended `JobRunDetailView` with:
- `read_deps: Vec<ReadDepsView>` - impacted→read dependency relationships
- `read_partitions: Vec<PartitionRefWithUuidView>` - input partitions with UUIDs
- `wrote_partitions: Vec<PartitionRefWithUuidView>` - output partitions with UUIDs
- `derivative_want_ids: Vec<String>` - derivative wants from dep-miss
**4.2 Partition detail page**
Added to `partitions/detail.html`:
- "Lineage - Built By" section showing the builder job run (linked to job run detail for upstream lineage)
- "Lineage - Downstream Consumers" section showing UUIDs of downstream partitions
Extended `PartitionDetailView` with:
- `built_by_job_run_id: Option<String>` - job run that built this partition
- `downstream_partition_uuids: Vec<String>` - downstream consumers from index
**4.3 Want detail page**
Added to `wants/detail.html`:
- "Fulfillment - Job Runs" section listing all job runs that serviced this want
- "Fulfillment - Derivative Wants" section listing derivative wants spawned by dep-misses
Extended `WantDetailView` with:
- `job_run_ids: Vec<String>` - all job runs that serviced this want
- `derivative_want_ids: Vec<String>` - derivative wants spawned

View file

@ -3,7 +3,7 @@
"jobs": [
{
"label": "//examples/multihop:job_alpha",
"entrypoint": "./examples/multihop/job_alpha.sh",
"entrypoint": "./job_alpha.sh",
"environment": {
"JOB_NAME": "alpha"
},
@ -11,7 +11,7 @@
},
{
"label": "//examples/multihop:job_beta",
"entrypoint": "./examples/multihop/job_beta.sh",
"entrypoint": "./job_beta.sh",
"environment": {
"JOB_NAME": "beta"
},