Compare commits

..

9 commits

Author SHA1 Message Date
8176a8261e fix up want lineage view
Some checks failed
/ setup (push) Has been cancelled
2025-12-01 03:54:29 +08:00
e221cd8502 add crazy idea 2025-12-01 02:14:27 +08:00
421544786f revert assumption about data dep stability, update AGENTS.md 2025-12-01 00:33:57 +08:00
23c3572106 add design assumption on stability of job run read partitions 2025-11-30 23:54:44 +08:00
9c6cb11713 add thoughts on wants for data retention 2025-11-30 23:46:31 +08:00
17d5987517 WIP
Some checks failed
/ setup (push) Has been cancelled
2025-11-28 12:48:54 +08:00
f353660f97 update docs 2025-11-28 12:41:11 +08:00
6cb11af642 update detail lineage plan 2025-11-27 15:59:38 +08:00
368558d9d8 add prd 2025-11-27 15:45:36 +08:00
25 changed files with 2179 additions and 323 deletions

View file

@ -40,6 +40,7 @@ This architecture provides compile-time correctness, observability through event
- Compile time correctness is a super-power, and investment in it speeds up flywheel for development and user value.
- **CLI/Service Interchangeability**: Both the CLI and service must produce identical artifacts (BEL events, logs, metrics, outputs) in the same locations. Users should be able to build with one interface and query/inspect results from the other seamlessly. This principle applies to all DataBuild operations, not just builds.
- The BEL represents real things that happen: job run processes that are started or fail, requests from the user, dep misses, etc.
- We focus on highly impactful tests anchored to stable interfaces. For instance, using BEL events to create valid application states to test orchestration logic via shared scenarios. This helps us keep a high ratio from "well tested functionality" to "test brittleness".
## Build & Test
```bash

View file

@ -350,11 +350,13 @@ impl Clone for BuildEventLog<MemoryBELStorage> {
}
}
#[cfg(test)]
mod tests {
mod sqlite_bel_storage {
use crate::build_event_log::{BELStorage, BuildEventLog, SqliteBELStorage};
use crate::build_state::BuildState;
use crate::data_build_event::Event;
use crate::util::test_scenarios::default_originating_lifetime;
use crate::{PartitionRef, WantCreateEventV1};
use uuid::Uuid;
@ -387,6 +389,7 @@ mod tests {
e.partitions = vec![PartitionRef {
r#ref: "sqlite_partition_1234".to_string(),
}];
e.lifetime = Some(default_originating_lifetime());
let event_id = log
.append_event(&Event::WantCreateV1(e))
.expect("append_event failed");
@ -430,14 +433,17 @@ mod tests {
let mut e2 = WantCreateEventV1::default();
e2.want_id = Uuid::new_v4().into();
e2.lifetime = Some(default_originating_lifetime());
log.append_event(&Event::WantCreateV1(e2))
.expect("append_event failed");
let mut e3 = WantCreateEventV1::default();
e3.want_id = Uuid::new_v4().into();
e3.lifetime = Some(default_originating_lifetime());
log.append_event(&Event::WantCreateV1(e3))
.expect("append_event failed");
let mut e4 = WantCreateEventV1::default();
e4.want_id = Uuid::new_v4().into();
e4.lifetime = Some(default_originating_lifetime());
log.append_event(&Event::WantCreateV1(e4))
.expect("append_event failed");

View file

@ -4,12 +4,12 @@
//! returning derivative events to be appended to the BEL.
use crate::data_build_event::Event;
use crate::data_deps::{WantTimestamps, missing_deps_to_want_events};
use crate::event_source::Source as EventSourceVariant;
use crate::data_deps::missing_deps_to_want_events;
use crate::job_run_state::{JobRun, JobRunWithState, QueuedState as JobQueuedState};
use crate::partition_state::{BuildingPartitionRef, Partition};
use crate::util::current_timestamp;
use crate::want_state::{NewState as WantNewState, Want, WantWithState};
use crate::want_create_event_v1::Lifetime;
use crate::want_state::{NewState as WantNewState, Want, WantLifetime, WantWithState};
use crate::{
JobRunBufferEventV1, JobRunCancelEventV1, JobRunFailureEventV1, JobRunHeartbeatEventV1,
JobRunMissingDepsEventV1, JobRunSuccessEventV1, PartitionRef, TaintCancelEventV1,
@ -45,26 +45,22 @@ impl BuildState {
// Create want in New state from event
let want_new: WantWithState<WantNewState> = event.clone().into();
// Log creation with derivative vs user-created distinction
let is_derivative = if let Some(source) = &event.source {
if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source {
tracing::info!(
want_id = %event.want_id,
partitions = ?event.partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
source_job_run_id = %job_triggered.job_run_id,
"Want created (derivative - auto-created due to missing dependency)"
);
true
} else {
false
}
// Log creation with derivative vs user-created distinction based on lifetime
let is_derivative = matches!(&event.lifetime, Some(Lifetime::Ephemeral(_)));
if let Some(Lifetime::Ephemeral(eph)) = &event.lifetime {
tracing::info!(
want_id = %event.want_id,
partitions = ?event.partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
source_job_run_id = %eph.job_run_id,
"Want created (ephemeral - auto-created due to missing dependency)"
);
} else {
tracing::info!(
want_id = %event.want_id,
partitions = ?event.partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
"Want created (user-requested)"
"Want created (originating - user-requested)"
);
false
};
// Register this want with all its partitions (via inverted index)
@ -175,17 +171,22 @@ impl BuildState {
self.wants.insert(event.want_id.clone(), final_want);
// If this is a derivative want (triggered by a job's dep miss), transition impacted wants to UpstreamBuilding
if is_derivative {
if let Some(source) = &event.source {
if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source {
self.handle_derivative_want_creation(
&event.want_id,
&event.partitions,
&job_triggered.job_run_id,
);
// If this is an ephemeral want (triggered by a job's dep miss):
// 1. Record the derivative want ID on the source job run
// 2. Transition impacted wants to UpstreamBuilding
if let Some(Lifetime::Ephemeral(eph)) = &event.lifetime {
// Add this want as a derivative of the source job run
if let Some(job_run) = self.job_runs.get_mut(&eph.job_run_id) {
if let JobRun::DepMiss(dep_miss) = job_run {
dep_miss.add_derivative_want_id(&event.want_id);
}
}
self.handle_derivative_want_creation(
&event.want_id,
&event.partitions,
&eph.job_run_id,
);
}
vec![]
@ -242,7 +243,7 @@ impl BuildState {
// Create job run in Queued state
let queued: JobRunWithState<JobQueuedState> = event.clone().into();
// Transition wants to Building
// Transition wants to Building and track this job run on each want
// Valid states when job buffer event arrives:
// - Idle: First job starting for this want (normal case)
// - Building: Another job already started for this want (multiple jobs can service same want)
@ -255,7 +256,7 @@ impl BuildState {
wap.want_id
));
let transitioned = match want {
let mut transitioned = match want {
Want::New(new_want) => {
// Want was just created and hasn't fully sensed yet - transition to Building
// This can happen if want creation and job buffer happen in quick succession
@ -287,6 +288,9 @@ impl BuildState {
}
};
// Track this job run on the want for lineage
transitioned.add_job_run_id(&event.job_run_id);
self.wants.insert(wap.want_id.clone(), transitioned);
}
@ -340,18 +344,51 @@ impl BuildState {
}
pub(crate) fn handle_job_run_success(&mut self, event: &JobRunSuccessEventV1) -> Vec<Event> {
use std::collections::BTreeMap;
let job_run = self.job_runs.remove(&event.job_run_id).expect(&format!(
"BUG: Job run {} must exist when success event received",
event.job_run_id
));
// Resolve read partition UUIDs from the read_deps in the event
let mut read_partition_uuids: BTreeMap<String, String> = BTreeMap::new();
for read_dep in &event.read_deps {
for read_ref in &read_dep.read {
if let Some(uuid) = self.get_canonical_partition_uuid(&read_ref.r#ref) {
read_partition_uuids.insert(read_ref.r#ref.clone(), uuid.to_string());
}
}
}
// Get the building partitions from the job run to resolve wrote_partition_uuids
let building_partitions = match &job_run {
JobRun::Running(running) => running.info.building_partitions.clone(),
_ => vec![],
};
// Resolve wrote partition UUIDs - these will be set when we transition partitions to Live
// For now, we compute them based on job_run_id (same logic as transition_partitions_to_live)
let mut wrote_partition_uuids: BTreeMap<String, String> = BTreeMap::new();
for pref in &building_partitions {
let uuid =
crate::partition_state::derive_partition_uuid(&event.job_run_id, &pref.r#ref);
wrote_partition_uuids.insert(pref.r#ref.clone(), uuid.to_string());
}
let succeeded = match job_run {
JobRun::Running(running) => {
tracing::info!(
job_run_id = %event.job_run_id,
read_deps_count = event.read_deps.len(),
"JobRun: Running → Succeeded"
);
running.succeed(current_timestamp())
running.succeed(
current_timestamp(),
event.read_deps.clone(),
read_partition_uuids.clone(),
wrote_partition_uuids.clone(),
)
}
_ => {
panic!(
@ -361,6 +398,34 @@ impl BuildState {
}
};
// Populate the consumer index from read_deps (using UUIDs for historical lineage)
// For each read partition UUID, record that the written partition UUIDs consumed it
for read_dep in &event.read_deps {
for read_ref in &read_dep.read {
// Look up the read partition's UUID
if let Some(read_uuid) = read_partition_uuids.get(&read_ref.r#ref) {
if let Ok(read_uuid) = uuid::Uuid::parse_str(read_uuid) {
let consumers = self
.partition_consumers
.entry(read_uuid)
.or_insert_with(Vec::new);
for impacted_ref in &read_dep.impacted {
// Look up the impacted (output) partition's UUID
if let Some(wrote_uuid) = wrote_partition_uuids.get(&impacted_ref.r#ref)
{
if let Ok(wrote_uuid) = uuid::Uuid::parse_str(wrote_uuid) {
let entry = (wrote_uuid, event.job_run_id.clone());
if !consumers.contains(&entry) {
consumers.push(entry);
}
}
}
}
}
}
}
}
// Job run success is SOURCE of truth that partitions are live
let newly_live_partitions = succeeded.get_completed_partitions();
@ -514,15 +579,6 @@ impl BuildState {
}
};
// Infer data/SLA timestamps from servicing wants
let want_timestamps: WantTimestamps = dep_miss
.info
.servicing_wants
.iter()
.flat_map(|wap| self.get_want(&wap.want_id).map(|w| w.into()))
.reduce(|a: WantTimestamps, b: WantTimestamps| a.merge(b))
.expect("BUG: No servicing wants found");
// Collect all missing deps into a flat list of partition refs
let all_missing_deps: Vec<PartitionRef> = event
.missing_deps
@ -534,13 +590,11 @@ impl BuildState {
let building_refs_to_reset = dep_miss.get_building_partitions_to_reset();
self.transition_partitions_to_upstream_building(&building_refs_to_reset, all_missing_deps);
// Generate WantCreateV1 events for the missing dependencies
// Generate ephemeral WantCreateV1 events for the missing dependencies
// These events will be returned and appended to the BEL by BuildEventLog.append_event()
let want_events = missing_deps_to_want_events(
dep_miss.get_missing_deps().to_vec(),
&event.job_run_id,
want_timestamps,
);
// Ephemeral wants delegate freshness decisions to their originating want via the job_run_id reference
let want_events =
missing_deps_to_want_events(dep_miss.get_missing_deps().to_vec(), &event.job_run_id);
// Store the job run in DepMiss state so we can access the missing_deps later
// When the derivative WantCreateV1 events get processed by handle_want_create(),
@ -642,17 +696,22 @@ impl BuildState {
#[cfg(test)]
mod tests {
use super::*;
use crate::{MissingDeps, WantAttributedPartitions};
mod want {
use super::*;
use crate::WantDetail;
use crate::want_create_event_v1::Lifetime;
use crate::{OriginatingLifetime, WantDetail};
#[test]
fn test_should_create_want() {
let mut e = WantCreateEventV1::default();
e.want_id = "1234".to_string();
e.partitions = vec!["mypart".into()];
e.lifetime = Some(Lifetime::Originating(OriginatingLifetime {
data_timestamp: 1000,
ttl_seconds: 3600,
sla_seconds: 7200,
}));
let mut state = BuildState::default();
state.handle_event(&e.clone().into());
@ -665,9 +724,12 @@ mod tests {
#[test]
fn test_should_cancel_want() {
use crate::util::test_scenarios::default_originating_lifetime;
let mut e = WantCreateEventV1::default();
e.want_id = "1234".to_string();
e.partitions = vec!["mypart".into()];
e.lifetime = Some(default_originating_lifetime());
let mut state = BuildState::default();
state.handle_event(&e.clone().into());
@ -686,133 +748,25 @@ mod tests {
#[test]
fn test_multihop_dependency_replay() {
use crate::{
JobRunBufferEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1,
JobRunSuccessEventV1, MissingDeps, PartitionRef, WantAttributedPartitions,
WantCreateEventV1,
};
use crate::util::test_scenarios::multihop_scenario;
let mut state = BuildState::default();
let mut events = vec![];
// 1. Create want for data/beta
let beta_want_id = "beta-want".to_string();
let mut create_beta = WantCreateEventV1::default();
create_beta.want_id = beta_want_id.clone();
create_beta.partitions = vec![PartitionRef {
r#ref: "data/beta".to_string(),
}];
events.push(Event::WantCreateV1(create_beta));
// 2. Queue beta job (first attempt)
let beta_job_1_id = "beta-job-1".to_string();
let mut buffer_beta_1 = JobRunBufferEventV1::default();
buffer_beta_1.job_run_id = beta_job_1_id.clone();
buffer_beta_1.job_label = "//job_beta".to_string();
buffer_beta_1.want_attributed_partitions = vec![WantAttributedPartitions {
want_id: beta_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
}];
buffer_beta_1.building_partitions = vec![PartitionRef {
r#ref: "data/beta".to_string(),
}];
events.push(Event::JobRunBufferV1(buffer_beta_1));
// 3. Beta job starts running
let mut heartbeat_beta_1 = JobRunHeartbeatEventV1::default();
heartbeat_beta_1.job_run_id = beta_job_1_id.clone();
events.push(Event::JobRunHeartbeatV1(heartbeat_beta_1));
// 4. Beta job reports missing dependency on data/alpha
let mut dep_miss_beta_1 = JobRunMissingDepsEventV1::default();
dep_miss_beta_1.job_run_id = beta_job_1_id.clone();
dep_miss_beta_1.missing_deps = vec![MissingDeps {
impacted: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
missing: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
}];
events.push(Event::JobRunMissingDepsV1(dep_miss_beta_1));
// 5. Create derivative want for data/alpha
let alpha_want_id = "alpha-want".to_string();
let mut create_alpha = WantCreateEventV1::default();
create_alpha.want_id = alpha_want_id.clone();
create_alpha.partitions = vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}];
events.push(Event::WantCreateV1(create_alpha));
// 6. Queue alpha job
let alpha_job_id = "alpha-job".to_string();
let mut buffer_alpha = JobRunBufferEventV1::default();
buffer_alpha.job_run_id = alpha_job_id.clone();
buffer_alpha.job_label = "//job_alpha".to_string();
buffer_alpha.want_attributed_partitions = vec![WantAttributedPartitions {
want_id: alpha_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
}];
buffer_alpha.building_partitions = vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}];
events.push(Event::JobRunBufferV1(buffer_alpha));
// 7. Alpha job starts running
let mut heartbeat_alpha = JobRunHeartbeatEventV1::default();
heartbeat_alpha.job_run_id = alpha_job_id.clone();
events.push(Event::JobRunHeartbeatV1(heartbeat_alpha));
// 8. Alpha job succeeds
let mut success_alpha = JobRunSuccessEventV1::default();
success_alpha.job_run_id = alpha_job_id.clone();
events.push(Event::JobRunSuccessV1(success_alpha));
// 9. Queue beta job again (second attempt) - THIS IS THE CRITICAL MOMENT
let beta_job_2_id = "beta-job-2".to_string();
let mut buffer_beta_2 = JobRunBufferEventV1::default();
buffer_beta_2.job_run_id = beta_job_2_id.clone();
buffer_beta_2.job_label = "//job_beta".to_string();
buffer_beta_2.want_attributed_partitions = vec![WantAttributedPartitions {
want_id: beta_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
}];
buffer_beta_2.building_partitions = vec![PartitionRef {
r#ref: "data/beta".to_string(),
}];
events.push(Event::JobRunBufferV1(buffer_beta_2));
// 10. Beta job starts running
let mut heartbeat_beta_2 = JobRunHeartbeatEventV1::default();
heartbeat_beta_2.job_run_id = beta_job_2_id.clone();
events.push(Event::JobRunHeartbeatV1(heartbeat_beta_2));
// 11. Beta job succeeds
let mut success_beta_2 = JobRunSuccessEventV1::default();
success_beta_2.job_run_id = beta_job_2_id.clone();
events.push(Event::JobRunSuccessV1(success_beta_2));
let (events, ids) = multihop_scenario();
// Process all events - this simulates replay
let mut state = BuildState::default();
for event in &events {
state.handle_event(event);
}
// Verify final state
let beta_want = state.get_want(&beta_want_id).unwrap();
let beta_want = state.get_want(&ids.beta_want_id).unwrap();
assert_eq!(
beta_want.status,
Some(crate::WantStatusCode::WantSuccessful.into()),
"Beta want should be successful after multi-hop dependency resolution"
);
let alpha_want = state.get_want(&alpha_want_id).unwrap();
let alpha_want = state.get_want(&ids.alpha_want_id).unwrap();
assert_eq!(
alpha_want.status,
Some(crate::WantStatusCode::WantSuccessful.into()),
@ -824,6 +778,7 @@ mod tests {
/// This was the original bug that motivated the UUID refactor.
#[test]
fn test_concurrent_wants_same_partition() {
use crate::util::test_scenarios::default_originating_lifetime;
use crate::{
JobRunBufferEventV1, JobRunHeartbeatEventV1, PartitionRef,
WantAttributedPartitions, WantCreateEventV1,
@ -838,6 +793,7 @@ mod tests {
create_want_1.partitions = vec![PartitionRef {
r#ref: "data/beta".to_string(),
}];
create_want_1.lifetime = Some(default_originating_lifetime());
state.handle_event(&Event::WantCreateV1(create_want_1));
// Want 1 should be Idle (no partition exists yet)
@ -855,6 +811,7 @@ mod tests {
create_want_2.partitions = vec![PartitionRef {
r#ref: "data/beta".to_string(),
}];
create_want_2.lifetime = Some(default_originating_lifetime());
state.handle_event(&Event::WantCreateV1(create_want_2));
// Want 2 should also be Idle
@ -932,6 +889,7 @@ mod tests {
mod partition_lifecycle {
use super::*;
use crate::util::test_scenarios::default_originating_lifetime;
use crate::{
JobRunBufferEventV1, JobRunFailureEventV1, JobRunHeartbeatEventV1,
JobRunMissingDepsEventV1, JobRunSuccessEventV1, MissingDeps, PartitionRef,
@ -952,6 +910,7 @@ mod tests {
create_beta.partitions = vec![PartitionRef {
r#ref: "data/beta".to_string(),
}];
create_beta.lifetime = Some(default_originating_lifetime());
state.handle_event(&Event::WantCreateV1(create_beta));
// 2. Job buffers for beta
@ -1082,6 +1041,7 @@ mod tests {
create_beta.partitions = vec![PartitionRef {
r#ref: "data/beta".to_string(),
}];
create_beta.lifetime = Some(default_originating_lifetime());
state.handle_event(&Event::WantCreateV1(create_beta));
// 2. First job buffers for beta (creates uuid-1)
@ -1129,6 +1089,7 @@ mod tests {
create_alpha.partitions = vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}];
create_alpha.lifetime = Some(default_originating_lifetime());
state.handle_event(&Event::WantCreateV1(create_alpha));
let alpha_job_id = "alpha-job".to_string();

View file

@ -61,6 +61,11 @@ pub struct BuildState {
// Inverted indexes
pub(crate) wants_for_partition: BTreeMap<String, Vec<String>>, // partition ref → want_ids
pub(crate) downstream_waiting: BTreeMap<String, Vec<Uuid>>, // upstream ref → partition UUIDs waiting for it
// Consumer index for lineage queries: input_uuid → list of (output_uuid, job_run_id)
// Uses UUIDs (not refs) to preserve historical lineage across partition rebuilds
// Populated from read_deps on job success
pub(crate) partition_consumers: BTreeMap<Uuid, Vec<(Uuid, String)>>,
}
impl BuildState {
@ -118,6 +123,15 @@ impl BuildState {
.unwrap_or(&[])
}
/// Get consumers for a partition UUID (downstream partitions that read this one)
/// Returns list of (output_uuid, job_run_id) tuples
pub fn get_partition_consumers(&self, uuid: &Uuid) -> &[(Uuid, String)] {
self.partition_consumers
.get(uuid)
.map(|v| v.as_slice())
.unwrap_or(&[])
}
/// Register a want in the wants_for_partition inverted index
pub(crate) fn register_want_for_partitions(
&mut self,

View file

@ -2,10 +2,12 @@
//!
//! Read-only methods for accessing state (get_*, list_*) used by the API layer.
use crate::util::{HasRelatedIds, RelatedIds};
use crate::{
JobRunDetail, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest,
ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest,
ListWantsResponse, PartitionDetail, TaintDetail, WantDetail,
GetJobRunResponse, GetPartitionResponse, GetWantResponse, JobRunDetail, ListJobRunsRequest,
ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest,
ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, RelatedEntities,
TaintDetail, WantDetail,
};
use std::collections::BTreeMap;
@ -13,7 +15,34 @@ use super::{BuildState, consts};
impl BuildState {
pub fn get_want(&self, want_id: &str) -> Option<WantDetail> {
self.wants.get(want_id).map(|w| w.to_detail())
self.wants.get(want_id).map(|w| {
let mut detail = w.to_detail();
// Populate job_runs and compute derivative_want_ids by traversing job runs.
//
// derivative_want_ids is computed at query time rather than maintained during
// event handling. The relationship flows: Want → JobRun → (dep-miss) → EphemeralWant
//
// - JobRun tracks which derivative wants it spawned (on DepMissState)
// - Want only tracks which job runs serviced it (job_run_ids)
// - At query time, we traverse: Want's job_run_ids → each JobRun's derivative_want_ids
//
// This keeps event handling simple (just update the job run) and keeps JobRun
// as the source of truth for derivative want relationships.
for job_run_id in &detail.job_run_ids {
if let Some(job_run) = self.job_runs.get(job_run_id) {
let job_detail = job_run.to_detail();
// Collect derivative want IDs
for derivative_want_id in &job_detail.derivative_want_ids {
if !detail.derivative_want_ids.contains(derivative_want_id) {
detail.derivative_want_ids.push(derivative_want_id.clone());
}
}
// Add full job run details
detail.job_runs.push(job_detail);
}
}
detail
})
}
pub fn get_taint(&self, taint_id: &str) -> Option<TaintDetail> {
@ -49,6 +78,7 @@ impl BuildState {
match_count: self.wants.len() as u64,
page,
page_size,
index: None,
}
}
@ -60,6 +90,7 @@ impl BuildState {
match_count: self.wants.len() as u64,
page,
page_size,
index: None,
}
}
@ -81,6 +112,7 @@ impl BuildState {
match_count: self.canonical_partitions.len() as u64,
page,
page_size,
index: None,
}
}
@ -102,6 +134,7 @@ impl BuildState {
match_count: self.job_runs.len() as u64,
page,
page_size,
index: None,
}
}
}
@ -116,3 +149,214 @@ fn list_state_items<T: Clone>(map: &BTreeMap<String, T>, page: u64, page_size: u
.cloned()
.collect()
}
// ============================================================================
// Response builders with RelatedEntities index
// ============================================================================
impl BuildState {
/// Resolve RelatedIds to a RelatedEntities index by looking up entities in BuildState.
/// This is the central method for building the index from collected IDs.
pub fn resolve_related_ids(&self, ids: &RelatedIds) -> RelatedEntities {
let mut index = RelatedEntities::default();
// Resolve partition refs
for partition_ref in &ids.partition_refs {
if !index.partitions.contains_key(partition_ref) {
if let Some(p) = self.get_canonical_partition(partition_ref) {
index
.partitions
.insert(partition_ref.clone(), p.to_detail());
}
}
}
// Resolve partition UUIDs
for uuid in &ids.partition_uuids {
if let Some(p) = self.partitions_by_uuid.get(uuid) {
let detail = p.to_detail();
if let Some(ref pref) = detail.r#ref {
if !index.partitions.contains_key(&pref.r#ref) {
index.partitions.insert(pref.r#ref.clone(), detail);
}
}
}
}
// Resolve job run IDs
for job_run_id in &ids.job_run_ids {
if !index.job_runs.contains_key(job_run_id) {
if let Some(jr) = self.job_runs.get(job_run_id) {
index.job_runs.insert(job_run_id.clone(), jr.to_detail());
}
}
}
// Resolve want IDs
for want_id in &ids.want_ids {
if !index.wants.contains_key(want_id) {
if let Some(w) = self.wants.get(want_id) {
index.wants.insert(want_id.clone(), w.to_detail());
}
}
}
index
}
/// Get a want with its related entities (job runs, partitions)
pub fn get_want_with_index(&self, want_id: &str) -> Option<GetWantResponse> {
let want = self.wants.get(want_id)?;
let want_detail = want.to_detail();
let ids = want.related_ids();
let index = self.resolve_related_ids(&ids);
Some(GetWantResponse {
data: Some(want_detail),
index: Some(index),
})
}
/// Get a partition with its related entities (builder job run, downstream consumers)
pub fn get_partition_with_index(&self, partition_ref: &str) -> Option<GetPartitionResponse> {
let partition = self.get_canonical_partition(partition_ref)?;
let partition_detail = partition.to_detail();
let mut ids = partition.related_ids();
// Add downstream consumers from the consumer index (not stored on partition)
let uuid = partition.uuid();
for (output_uuid, job_run_id) in self.get_partition_consumers(&uuid) {
if !ids.partition_uuids.contains(output_uuid) {
ids.partition_uuids.push(*output_uuid);
}
if !ids.job_run_ids.contains(job_run_id) {
ids.job_run_ids.push(job_run_id.clone());
}
}
// Add wants that reference this partition (from inverted index)
for want_id in self.get_wants_for_partition(partition_ref) {
if !ids.want_ids.contains(want_id) {
ids.want_ids.push(want_id.clone());
}
}
let index = self.resolve_related_ids(&ids);
Some(GetPartitionResponse {
data: Some(partition_detail),
index: Some(index),
})
}
/// Get a job run with its related entities (read/wrote partitions, derivative wants)
pub fn get_job_run_with_index(&self, job_run_id: &str) -> Option<GetJobRunResponse> {
let job_run = self.job_runs.get(job_run_id)?;
let job_run_detail = job_run.to_detail();
let ids = job_run.related_ids();
let index = self.resolve_related_ids(&ids);
Some(GetJobRunResponse {
data: Some(job_run_detail),
index: Some(index),
})
}
/// List wants with related entities index
pub fn list_wants_with_index(&self, request: &ListWantsRequest) -> ListWantsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
let start = page * page_size;
let wants: Vec<_> = self
.wants
.values()
.skip(start as usize)
.take(page_size as usize)
.collect();
// Collect related IDs from all wants
let mut all_ids = RelatedIds::default();
for want in &wants {
all_ids.merge(want.related_ids());
}
let data: Vec<WantDetail> = wants.iter().map(|w| w.to_detail()).collect();
let index = self.resolve_related_ids(&all_ids);
ListWantsResponse {
data,
match_count: self.wants.len() as u64,
page,
page_size,
index: Some(index),
}
}
/// List partitions with related entities index
pub fn list_partitions_with_index(
&self,
request: &ListPartitionsRequest,
) -> ListPartitionsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
let start = page * page_size;
let partitions: Vec<_> = self
.canonical_partitions
.iter()
.skip(start as usize)
.take(page_size as usize)
.filter_map(|(_, uuid)| self.partitions_by_uuid.get(uuid))
.collect();
// Collect related IDs from all partitions
let mut all_ids = RelatedIds::default();
for partition in &partitions {
all_ids.merge(partition.related_ids());
}
let data: Vec<PartitionDetail> = partitions.iter().map(|p| p.to_detail()).collect();
let index = self.resolve_related_ids(&all_ids);
ListPartitionsResponse {
data,
match_count: self.canonical_partitions.len() as u64,
page,
page_size,
index: Some(index),
}
}
/// List job runs with related entities index
pub fn list_job_runs_with_index(&self, request: &ListJobRunsRequest) -> ListJobRunsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
let start = page * page_size;
let job_runs: Vec<_> = self
.job_runs
.values()
.skip(start as usize)
.take(page_size as usize)
.collect();
// Collect related IDs from all job runs
let mut all_ids = RelatedIds::default();
for job_run in &job_runs {
all_ids.merge(job_run.related_ids());
}
let data: Vec<JobRunDetail> = job_runs.iter().map(|jr| jr.to_detail()).collect();
let index = self.resolve_related_ids(&all_ids);
ListJobRunsResponse {
data,
match_count: self.job_runs.len() as u64,
page,
page_size,
index: Some(index),
}
}
}

View file

@ -1,7 +1,7 @@
use crate::data_build_event::Event;
use crate::want_create_event_v1::Lifetime;
use crate::{
JobRunMissingDeps, JobRunReadDeps, JobTriggeredEvent, MissingDeps, ReadDeps, WantCreateEventV1,
WantDetail,
EphemeralLifetime, JobRunMissingDeps, JobRunReadDeps, MissingDeps, ReadDeps, WantCreateEventV1,
};
use uuid::Uuid;
@ -82,53 +82,19 @@ fn line_matches<'a>(line: &'a str, prefix: &'a str) -> Option<&'a str> {
line.trim().strip_prefix(prefix)
}
pub struct WantTimestamps {
data_timestamp: u64,
ttl_seconds: u64,
sla_seconds: u64,
}
impl From<WantDetail> for WantTimestamps {
fn from(want_detail: WantDetail) -> Self {
WantTimestamps {
data_timestamp: want_detail.data_timestamp,
ttl_seconds: want_detail.ttl_seconds,
sla_seconds: want_detail.sla_seconds,
}
}
}
impl WantTimestamps {
pub fn merge(self, other: WantTimestamps) -> WantTimestamps {
// TODO does this make sense?
WantTimestamps {
data_timestamp: self.data_timestamp.min(other.data_timestamp),
ttl_seconds: self.ttl_seconds.max(other.ttl_seconds),
sla_seconds: self.sla_seconds.max(other.sla_seconds),
}
}
}
pub fn missing_deps_to_want_events(
missing_deps: Vec<MissingDeps>,
job_run_id: &String,
want_timestamps: WantTimestamps,
) -> Vec<Event> {
/// Create ephemeral want events from missing dependencies.
/// Ephemeral wants are derivative wants created by the system when a job hits a dep-miss.
/// They delegate freshness decisions to their originating want.
pub fn missing_deps_to_want_events(missing_deps: Vec<MissingDeps>, job_run_id: &str) -> Vec<Event> {
missing_deps
.iter()
.map(|md| {
Event::WantCreateV1(WantCreateEventV1 {
want_id: Uuid::new_v4().into(),
partitions: md.missing.clone(),
data_timestamp: want_timestamps.data_timestamp,
ttl_seconds: want_timestamps.ttl_seconds,
sla_seconds: want_timestamps.sla_seconds,
source: Some(
JobTriggeredEvent {
job_run_id: job_run_id.clone(),
}
.into(),
),
lifetime: Some(Lifetime::Ephemeral(EphemeralLifetime {
job_run_id: job_run_id.to_string(),
})),
comment: Some("Missing data".to_string()),
})
})

View file

@ -62,6 +62,20 @@ message JobTriggeredEvent {
string job_run_id = 1;
}
// Want lifetime semantics
// Originating wants are user-created with explicit freshness requirements
message OriginatingLifetime {
uint64 data_timestamp = 1;
uint64 ttl_seconds = 2;
uint64 sla_seconds = 3;
}
// Ephemeral wants are system-created (derivative) from dep-miss
// They delegate freshness decisions to their originating want
message EphemeralLifetime {
// The job run that hit dep-miss and created this derivative want
string job_run_id = 1;
}
message WantAttributedPartitions {
string want_id = 1;
repeated PartitionRef partitions = 2;
@ -85,9 +99,11 @@ message JobRunHeartbeatEventV1 {
string job_run_id = 1;
// TODO reentrance?
}
// Simply indicates that the job has succeeded.
// Indicates that the job has succeeded, including what data was read.
message JobRunSuccessEventV1 {
string job_run_id = 1;
// The read dependencies for this job run, preserving impactedread relationships
repeated ReadDeps read_deps = 2;
}
// Simply indicates that the job has failed. Depending on retry logic defined in job, it may retry.
message JobRunFailureEventV1 {
@ -134,12 +150,14 @@ message WantCreateEventV1 {
// The unique ID of this want
string want_id = 1;
repeated PartitionRef partitions = 2;
uint64 data_timestamp = 3;
uint64 ttl_seconds = 4;
uint64 sla_seconds = 5;
// The source of the want. Can be from job, API, CLI, web app...
EventSource source = 6;
optional string comment = 7;
// Lifetime semantics - exactly one must be set
oneof lifetime {
OriginatingLifetime originating = 3;
EphemeralLifetime ephemeral = 4;
}
optional string comment = 5;
}
message WantCancelEventV1 {
string want_id = 1;
@ -205,14 +223,22 @@ message WantDetail {
repeated PartitionRef partitions = 2;
// The upstream partitions, detected from a dep miss job run failure
repeated PartitionRef upstreams = 3;
uint64 data_timestamp = 4;
uint64 ttl_seconds = 5;
uint64 sla_seconds = 6;
EventSource source = 7;
optional string comment = 8;
WantStatus status = 9;
uint64 last_updated_timestamp = 10;
// TODO
// Lifetime semantics
oneof lifetime {
OriginatingLifetime originating = 4;
EphemeralLifetime ephemeral = 5;
}
optional string comment = 6;
WantStatus status = 7;
uint64 last_updated_timestamp = 8;
// Lineage: all job runs that have serviced this want (IDs for reference)
repeated string job_run_ids = 9;
// Lineage: derivative wants spawned by this want's job dep-misses (computed from job_run_ids)
repeated string derivative_want_ids = 10;
// Lineage: full details of job runs servicing this want (for display in tables)
repeated JobRunDetail job_runs = 11;
}
message PartitionDetail {
@ -230,6 +256,11 @@ message PartitionDetail {
// The unique identifier for this partition instance (UUID as string)
// Each time a partition is built, it gets a new UUID derived from the job_run_id
string uuid = 7;
// Lineage: job run that built this partition (for Live/Tainted partitions)
// Upstream lineage is resolved via this job run's read_deps (job run is source of truth)
optional string built_by_job_run_id = 8;
// Lineage: downstream partition UUIDs that consumed this (from consumer index)
repeated string downstream_partition_uuids = 9;
}
message PartitionStatus {
PartitionStatusCode code = 1;
@ -289,10 +320,31 @@ message JobRunDetail {
optional uint64 last_heartbeat_at = 3;
repeated PartitionRef building_partitions = 4;
repeated WantAttributedPartitions servicing_wants = 5;
// Lineage: read dependencies with resolved UUIDs (for Succeeded jobs)
repeated ReadDeps read_deps = 6;
// Lineage: resolved UUIDs for read partitions (ref UUID)
map<string, string> read_partition_uuids = 7;
// Lineage: resolved UUIDs for written partitions (ref UUID)
map<string, string> wrote_partition_uuids = 8;
// Lineage: derivative wants spawned by this job's dep-miss (for DepMiss jobs)
repeated string derivative_want_ids = 9;
// Timestamps for tracking job lifecycle
optional uint64 queued_at = 10;
optional uint64 started_at = 11;
// The job label (e.g. "//path/to:job")
string job_label = 12;
}
// Related entities index - used in API responses for deduplication and O(1) lookup
// Each entity appears once in the index, even if referenced by multiple items in `data`
message RelatedEntities {
map<string, PartitionDetail> partitions = 1;
map<string, JobRunDetail> job_runs = 2;
map<string, WantDetail> wants = 3;
}
message EventFilter {
// IDs of wants to get relevant events for
repeated string want_ids = 1;
@ -308,6 +360,7 @@ message ListWantsResponse {
uint64 match_count = 2;
uint64 page = 3;
uint64 page_size = 4;
RelatedEntities index = 5;
}
message ListTaintsRequest {
@ -320,6 +373,7 @@ message ListTaintsResponse {
uint64 match_count = 2;
uint64 page = 3;
uint64 page_size = 4;
RelatedEntities index = 5;
}
message ListPartitionsRequest {
@ -332,6 +386,7 @@ message ListPartitionsResponse {
uint64 match_count = 2;
uint64 page = 3;
uint64 page_size = 4;
RelatedEntities index = 5;
}
message ListJobRunsRequest {
@ -344,15 +399,16 @@ message ListJobRunsResponse {
uint64 match_count = 2;
uint64 page = 3;
uint64 page_size = 4;
RelatedEntities index = 5;
}
message CreateWantRequest {
repeated PartitionRef partitions = 1;
// User-created wants are always originating (have explicit freshness requirements)
uint64 data_timestamp = 2;
uint64 ttl_seconds = 3;
uint64 sla_seconds = 4;
EventSource source = 5;
optional string comment = 6;
optional string comment = 5;
}
message CreateWantResponse {
WantDetail data = 1;
@ -372,6 +428,23 @@ message GetWantRequest {
}
message GetWantResponse {
WantDetail data = 1;
RelatedEntities index = 2;
}
message GetPartitionRequest {
string partition_ref = 1;
}
message GetPartitionResponse {
PartitionDetail data = 1;
RelatedEntities index = 2;
}
message GetJobRunRequest {
string job_run_id = 1;
}
message GetJobRunResponse {
JobRunDetail data = 1;
RelatedEntities index = 2;
}
message CreateTaintRequest {

View file

@ -1,14 +1,15 @@
use crate::PartitionStatusCode::{PartitionFailed, PartitionLive};
use crate::data_build_event::Event;
use crate::job_run_state::{JobInfo, JobRunWithState, QueuedState};
use crate::job_run_state::{JobInfo, JobRunWithState, QueuedState, TimingInfo};
use crate::util::current_timestamp;
use crate::want_create_event_v1::Lifetime;
use crate::{
CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse,
CreateWantRequest, CreateWantResponse, EventSource, GetWantResponse, JobRunBufferEventV1,
JobRunDetail, JobRunStatus, JobRunStatusCode, JobTriggeredEvent, ManuallyTriggeredEvent,
PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, TaintCancelEventV1,
TaintCreateEventV1, TaintDetail, WantAttributedPartitions, WantCancelEventV1,
WantCreateEventV1, WantDetail, WantStatus, WantStatusCode, event_source,
OriginatingLifetime, PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode,
TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantAttributedPartitions,
WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode, event_source,
};
use uuid::Uuid;
@ -19,17 +20,23 @@ impl From<&WantCreateEventV1> for WantDetail {
}
impl From<WantCreateEventV1> for WantDetail {
fn from(e: WantCreateEventV1) -> Self {
// Convert want_create_event_v1::Lifetime to want_detail::Lifetime
let lifetime = e.lifetime.map(|l| match l {
Lifetime::Originating(orig) => crate::want_detail::Lifetime::Originating(orig),
Lifetime::Ephemeral(eph) => crate::want_detail::Lifetime::Ephemeral(eph),
});
WantDetail {
want_id: e.want_id,
partitions: e.partitions,
upstreams: vec![],
data_timestamp: e.data_timestamp,
ttl_seconds: e.ttl_seconds,
sla_seconds: e.sla_seconds,
source: e.source,
lifetime,
comment: e.comment,
status: Some(WantStatusCode::WantIdle.into()),
last_updated_timestamp: current_timestamp(),
job_run_ids: vec![],
derivative_want_ids: vec![],
job_runs: vec![],
}
}
}
@ -74,27 +81,39 @@ impl From<WantStatusCode> for WantStatus {
impl From<JobRunBufferEventV1> for JobRunDetail {
fn from(value: JobRunBufferEventV1) -> Self {
use std::collections::HashMap;
Self {
id: value.job_run_id,
job_label: value.job_label,
status: Some(JobRunStatusCode::JobRunQueued.into()),
last_heartbeat_at: None,
building_partitions: value.building_partitions,
servicing_wants: value.want_attributed_partitions,
read_deps: vec![],
read_partition_uuids: HashMap::new(),
wrote_partition_uuids: HashMap::new(),
derivative_want_ids: vec![],
queued_at: Some(current_timestamp()),
started_at: None,
}
}
}
impl From<JobRunBufferEventV1> for JobRunWithState<QueuedState> {
fn from(event: JobRunBufferEventV1) -> Self {
let queued_at = current_timestamp();
JobRunWithState {
info: JobInfo {
id: event.job_run_id,
job_label: event.job_label,
building_partitions: event.building_partitions,
servicing_wants: event.want_attributed_partitions,
},
state: QueuedState {
queued_at: current_timestamp(),
timing: TimingInfo {
queued_at,
started_at: None,
},
state: QueuedState { queued_at },
}
}
}
@ -190,13 +209,15 @@ impl From<&WantDetail> for WantAttributedPartitions {
impl From<CreateWantRequest> for WantCreateEventV1 {
fn from(value: CreateWantRequest) -> Self {
// User-created wants are always originating (have explicit freshness requirements)
WantCreateEventV1 {
want_id: Uuid::new_v4().into(),
partitions: value.partitions,
data_timestamp: value.data_timestamp,
ttl_seconds: value.ttl_seconds,
sla_seconds: value.sla_seconds,
source: value.source,
lifetime: Some(Lifetime::Originating(OriginatingLifetime {
data_timestamp: value.data_timestamp,
ttl_seconds: value.ttl_seconds,
sla_seconds: value.sla_seconds,
})),
comment: value.comment,
}
}
@ -210,7 +231,10 @@ impl Into<CreateWantResponse> for Option<WantDetail> {
impl Into<GetWantResponse> for Option<WantDetail> {
fn into(self) -> GetWantResponse {
GetWantResponse { data: self }
GetWantResponse {
data: self,
index: None,
}
}
}

View file

@ -2,7 +2,7 @@ use crate::build_event_log::BELStorage;
use crate::build_state::BuildState;
use crate::commands::Command;
use crate::web::templates::{
BaseContext, HomePage, JobRunDetailPage, JobRunDetailView, JobRunsListPage,
BaseContext, DerivativeWantView, HomePage, JobRunDetailPage, JobRunDetailView, JobRunsListPage,
PartitionDetailPage, PartitionDetailView, PartitionsListPage, WantCreatePage, WantDetailPage,
WantDetailView, WantsListPage,
};
@ -260,9 +260,17 @@ async fn want_detail_page(
match build_state.get_want(&want_id) {
Some(want) => {
// Fetch derivative wants
let derivative_wants: Vec<_> = want
.derivative_want_ids
.iter()
.filter_map(|id| build_state.get_want(id))
.map(|w| DerivativeWantView::from(&w))
.collect();
let template = WantDetailPage {
base: BaseContext::default(),
want: WantDetailView::from(want),
want: WantDetailView::new(&want, derivative_wants),
};
match template.render() {
Ok(html) => Html(html).into_response(),
@ -433,7 +441,7 @@ async fn list_wants_json(
.into_response();
}
};
let response = build_state.list_wants(&params);
let response = build_state.list_wants_with_index(&params);
(StatusCode::OK, Json(response)).into_response()
}
@ -456,10 +464,8 @@ async fn get_want_json(
.into_response();
}
};
let response = build_state.get_want(&want_id);
match response {
Some(want) => (StatusCode::OK, Json(GetWantResponse { data: Some(want) })).into_response(),
match build_state.get_want_with_index(&want_id) {
Some(response) => (StatusCode::OK, Json(response)).into_response(),
None => {
tracing::debug!("Want not found: {}", want_id);
(
@ -608,7 +614,7 @@ async fn list_partitions_json(
.into_response();
}
};
let response = build_state.list_partitions(&params);
let response = build_state.list_partitions_with_index(&params);
(StatusCode::OK, Json(response)).into_response()
}
@ -631,7 +637,7 @@ async fn list_job_runs_json(
.into_response();
}
};
let response = build_state.list_job_runs(&params);
let response = build_state.list_job_runs_with_index(&params);
(StatusCode::OK, Json(response)).into_response()
}

View file

@ -256,6 +256,7 @@ impl SubProcessCompleted {
pub fn to_event(&self, job_run_id: &Uuid) -> Event {
Event::JobRunSuccessV1(JobRunSuccessEventV1 {
job_run_id: job_run_id.to_string(),
read_deps: self.read_deps.clone(),
})
}
}
@ -394,6 +395,7 @@ impl ToEvent for SubProcessCompleted {
fn to_event(&self, job_run_id: &Uuid) -> Event {
Event::JobRunSuccessV1(JobRunSuccessEventV1 {
job_run_id: job_run_id.to_string(),
read_deps: self.read_deps.clone(),
})
}
}

View file

@ -1,9 +1,11 @@
use crate::partition_state::{BuildingPartitionRef, FailedPartitionRef, LivePartitionRef};
use crate::util::current_timestamp;
use crate::util::{HasRelatedIds, RelatedIds, current_timestamp};
use crate::{
EventSource, JobRunDetail, JobRunStatusCode, MissingDeps, PartitionRef, ReadDeps,
WantAttributedPartitions,
};
use std::collections::BTreeMap;
use uuid::Uuid;
/// State: Job has been queued but not yet started
#[derive(Debug, Clone)]
@ -22,6 +24,12 @@ pub struct RunningState {
#[derive(Debug, Clone)]
pub struct SucceededState {
pub completed_at: u64,
/// The read dependencies reported by the job, preserving impacted→read relationships
pub read_deps: Vec<ReadDeps>,
/// Resolved UUIDs for partitions that were read (ref → UUID at read time)
pub read_partition_uuids: BTreeMap<String, String>,
/// Resolved UUIDs for partitions that were written (ref → UUID)
pub wrote_partition_uuids: BTreeMap<String, String>,
}
/// State: Job failed during execution
@ -37,6 +45,8 @@ pub struct DepMissState {
pub detected_at: u64,
pub missing_deps: Vec<MissingDeps>,
pub read_deps: Vec<ReadDeps>,
/// Want IDs of ephemeral wants spawned by this dep-miss
pub derivative_want_ids: Vec<String>,
}
/// State: Job was explicitly canceled
@ -51,14 +61,25 @@ pub struct CanceledState {
#[derive(Debug, Clone)]
pub struct JobInfo {
pub id: String,
pub job_label: String,
pub building_partitions: Vec<PartitionRef>,
pub servicing_wants: Vec<WantAttributedPartitions>,
}
/// Timing information preserved across state transitions
#[derive(Debug, Clone)]
pub struct TimingInfo {
/// When the job was first queued
pub queued_at: u64,
/// When the job started running (None if still queued or canceled before starting)
pub started_at: Option<u64>,
}
/// Generic job run struct parameterized by state
#[derive(Debug, Clone)]
pub struct JobRunWithState<S> {
pub info: JobInfo,
pub timing: TimingInfo,
pub state: S,
}
@ -80,6 +101,10 @@ impl JobRunWithState<QueuedState> {
pub fn start_running(self, timestamp: u64) -> JobRunWithState<RunningState> {
JobRunWithState {
info: self.info,
timing: TimingInfo {
queued_at: self.timing.queued_at,
started_at: Some(timestamp),
},
state: RunningState {
started_at: timestamp,
last_heartbeat_at: timestamp, // Initialize to start time
@ -96,6 +121,7 @@ impl JobRunWithState<QueuedState> {
) -> JobRunWithState<CanceledState> {
JobRunWithState {
info: self.info,
timing: self.timing, // Preserve timing (started_at remains None)
state: CanceledState {
canceled_at: timestamp,
source,
@ -113,11 +139,21 @@ impl JobRunWithState<RunningState> {
}
/// Transition from Running to Succeeded
pub fn succeed(self, timestamp: u64) -> JobRunWithState<SucceededState> {
pub fn succeed(
self,
timestamp: u64,
read_deps: Vec<ReadDeps>,
read_partition_uuids: BTreeMap<String, String>,
wrote_partition_uuids: BTreeMap<String, String>,
) -> JobRunWithState<SucceededState> {
JobRunWithState {
info: self.info,
timing: self.timing,
state: SucceededState {
completed_at: timestamp,
read_deps,
read_partition_uuids,
wrote_partition_uuids,
},
}
}
@ -126,6 +162,7 @@ impl JobRunWithState<RunningState> {
pub fn fail(self, timestamp: u64, reason: String) -> JobRunWithState<FailedState> {
JobRunWithState {
info: self.info,
timing: self.timing,
state: FailedState {
failed_at: timestamp,
failure_reason: reason,
@ -141,11 +178,13 @@ impl JobRunWithState<RunningState> {
read_deps: Vec<ReadDeps>,
) -> JobRunWithState<DepMissState> {
JobRunWithState {
timing: self.timing,
info: self.info,
state: DepMissState {
detected_at: timestamp,
missing_deps,
read_deps,
derivative_want_ids: vec![], // Populated later when ephemeral wants are created
},
}
}
@ -159,6 +198,7 @@ impl JobRunWithState<RunningState> {
) -> JobRunWithState<CanceledState> {
JobRunWithState {
info: self.info,
timing: self.timing,
state: CanceledState {
canceled_at: timestamp,
source,
@ -231,6 +271,21 @@ impl JobRunWithState<SucceededState> {
.map(|p| LivePartitionRef(p.clone()))
.collect()
}
/// Get the read dependencies reported by the job
pub fn get_read_deps(&self) -> &[ReadDeps] {
&self.state.read_deps
}
/// Get the resolved UUIDs for partitions that were read
pub fn get_read_partition_uuids(&self) -> &BTreeMap<String, String> {
&self.state.read_partition_uuids
}
/// Get the resolved UUIDs for partitions that were written
pub fn get_wrote_partition_uuids(&self) -> &BTreeMap<String, String> {
&self.state.wrote_partition_uuids
}
}
impl JobRunWithState<FailedState> {
@ -273,6 +328,17 @@ impl JobRunWithState<DepMissState> {
pub fn get_read_deps(&self) -> &[ReadDeps] {
&self.state.read_deps
}
/// Add a derivative want ID (ephemeral want spawned by this dep-miss)
pub fn add_derivative_want_id(&mut self, want_id: &str) {
if !self
.state
.derivative_want_ids
.contains(&want_id.to_string())
{
self.state.derivative_want_ids.push(want_id.to_string());
}
}
}
impl JobRunWithState<CanceledState> {
@ -290,52 +356,222 @@ impl JobRunWithState<CanceledState> {
}
}
// ==================== HasRelatedIds trait implementation ====================
impl HasRelatedIds for JobRun {
/// Get the IDs of all entities this job run references.
/// Note: derivative_want_ids come from BuildState, not from JobRun itself.
fn related_ids(&self) -> RelatedIds {
// Partition refs from building_partitions (all states have this)
let partition_refs: Vec<String> = match self {
JobRun::Queued(jr) => jr
.info
.building_partitions
.iter()
.map(|p| p.r#ref.clone())
.collect(),
JobRun::Running(jr) => jr
.info
.building_partitions
.iter()
.map(|p| p.r#ref.clone())
.collect(),
JobRun::Succeeded(jr) => jr
.info
.building_partitions
.iter()
.map(|p| p.r#ref.clone())
.collect(),
JobRun::Failed(jr) => jr
.info
.building_partitions
.iter()
.map(|p| p.r#ref.clone())
.collect(),
JobRun::DepMiss(jr) => jr
.info
.building_partitions
.iter()
.map(|p| p.r#ref.clone())
.collect(),
JobRun::Canceled(jr) => jr
.info
.building_partitions
.iter()
.map(|p| p.r#ref.clone())
.collect(),
};
// Partition UUIDs from read/write lineage (only Succeeded state has these)
let partition_uuids: Vec<Uuid> = match self {
JobRun::Succeeded(jr) => {
let mut uuids = Vec::new();
for uuid_str in jr.state.read_partition_uuids.values() {
if let Ok(uuid) = Uuid::parse_str(uuid_str) {
uuids.push(uuid);
}
}
for uuid_str in jr.state.wrote_partition_uuids.values() {
if let Ok(uuid) = Uuid::parse_str(uuid_str) {
if !uuids.contains(&uuid) {
uuids.push(uuid);
}
}
}
uuids
}
_ => vec![],
};
// Want IDs from servicing_wants (all states have this)
let want_ids: Vec<String> = match self {
JobRun::Queued(jr) => jr
.info
.servicing_wants
.iter()
.map(|w| w.want_id.clone())
.collect(),
JobRun::Running(jr) => jr
.info
.servicing_wants
.iter()
.map(|w| w.want_id.clone())
.collect(),
JobRun::Succeeded(jr) => jr
.info
.servicing_wants
.iter()
.map(|w| w.want_id.clone())
.collect(),
JobRun::Failed(jr) => jr
.info
.servicing_wants
.iter()
.map(|w| w.want_id.clone())
.collect(),
JobRun::DepMiss(jr) => jr
.info
.servicing_wants
.iter()
.map(|w| w.want_id.clone())
.collect(),
JobRun::Canceled(jr) => jr
.info
.servicing_wants
.iter()
.map(|w| w.want_id.clone())
.collect(),
};
RelatedIds {
partition_refs,
partition_uuids,
job_run_ids: vec![],
want_ids,
}
}
}
// ==================== Conversion to JobRunDetail for API ====================
impl JobRun {
pub fn to_detail(&self) -> JobRunDetail {
use std::collections::HashMap;
match self {
JobRun::Queued(queued) => JobRunDetail {
id: queued.info.id.clone(),
job_label: queued.info.job_label.clone(),
status: Some(JobRunStatusCode::JobRunQueued.into()),
last_heartbeat_at: None,
building_partitions: queued.info.building_partitions.clone(),
servicing_wants: queued.info.servicing_wants.clone(),
read_deps: vec![],
read_partition_uuids: HashMap::new(),
wrote_partition_uuids: HashMap::new(),
derivative_want_ids: vec![],
queued_at: Some(queued.timing.queued_at),
started_at: queued.timing.started_at,
},
JobRun::Running(running) => JobRunDetail {
id: running.info.id.clone(),
job_label: running.info.job_label.clone(),
status: Some(JobRunStatusCode::JobRunRunning.into()),
last_heartbeat_at: Some(running.state.last_heartbeat_at),
building_partitions: running.info.building_partitions.clone(),
servicing_wants: running.info.servicing_wants.clone(),
read_deps: vec![],
read_partition_uuids: HashMap::new(),
wrote_partition_uuids: HashMap::new(),
derivative_want_ids: vec![],
queued_at: Some(running.timing.queued_at),
started_at: running.timing.started_at,
},
JobRun::Succeeded(succeeded) => JobRunDetail {
id: succeeded.info.id.clone(),
job_label: succeeded.info.job_label.clone(),
status: Some(JobRunStatusCode::JobRunSucceeded.into()),
last_heartbeat_at: None,
building_partitions: succeeded.info.building_partitions.clone(),
servicing_wants: succeeded.info.servicing_wants.clone(),
read_deps: succeeded.state.read_deps.clone(),
read_partition_uuids: succeeded
.state
.read_partition_uuids
.clone()
.into_iter()
.collect(),
wrote_partition_uuids: succeeded
.state
.wrote_partition_uuids
.clone()
.into_iter()
.collect(),
derivative_want_ids: vec![],
queued_at: Some(succeeded.timing.queued_at),
started_at: succeeded.timing.started_at,
},
JobRun::Failed(failed) => JobRunDetail {
id: failed.info.id.clone(),
job_label: failed.info.job_label.clone(),
status: Some(JobRunStatusCode::JobRunFailed.into()),
last_heartbeat_at: None,
building_partitions: failed.info.building_partitions.clone(),
servicing_wants: failed.info.servicing_wants.clone(),
read_deps: vec![],
read_partition_uuids: HashMap::new(),
wrote_partition_uuids: HashMap::new(),
derivative_want_ids: vec![],
queued_at: Some(failed.timing.queued_at),
started_at: failed.timing.started_at,
},
JobRun::DepMiss(dep_miss) => JobRunDetail {
id: dep_miss.info.id.clone(),
job_label: dep_miss.info.job_label.clone(),
status: Some(JobRunStatusCode::JobRunDepMiss.into()),
last_heartbeat_at: None,
building_partitions: dep_miss.info.building_partitions.clone(),
servicing_wants: dep_miss.info.servicing_wants.clone(),
read_deps: dep_miss.state.read_deps.clone(),
read_partition_uuids: HashMap::new(),
wrote_partition_uuids: HashMap::new(),
derivative_want_ids: dep_miss.state.derivative_want_ids.clone(),
queued_at: Some(dep_miss.timing.queued_at),
started_at: dep_miss.timing.started_at,
},
JobRun::Canceled(canceled) => JobRunDetail {
id: canceled.info.id.clone(),
job_label: canceled.info.job_label.clone(),
status: Some(JobRunStatusCode::JobRunCanceled.into()),
last_heartbeat_at: None,
building_partitions: canceled.info.building_partitions.clone(),
servicing_wants: canceled.info.servicing_wants.clone(),
read_deps: vec![],
read_partition_uuids: HashMap::new(),
wrote_partition_uuids: HashMap::new(),
derivative_want_ids: vec![],
queued_at: Some(canceled.timing.queued_at),
started_at: canceled.timing.started_at,
},
}
}

View file

@ -459,12 +459,13 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
#[cfg(test)]
mod tests {
use crate::WantCreateEventV1;
use crate::build_event_log::MemoryBELStorage;
use crate::job::JobConfiguration;
use crate::mock_job_run::MockJobRun;
use crate::orchestrator::{Orchestrator, OrchestratorConfig};
use crate::util::current_timestamp;
use crate::want_create_event_v1::Lifetime;
use crate::{OriginatingLifetime, WantCreateEventV1};
use uuid::Uuid;
fn build_orchestrator() -> Orchestrator<MemoryBELStorage> {
@ -477,10 +478,11 @@ mod tests {
Self {
want_id: Uuid::new_v4().to_string(),
partitions: vec![],
data_timestamp: current_timestamp(),
ttl_seconds: 1000,
sla_seconds: 1000,
source: None,
lifetime: Some(Lifetime::Originating(OriginatingLifetime {
data_timestamp: current_timestamp(),
ttl_seconds: 1000,
sla_seconds: 1000,
})),
comment: Some("test want".to_string()),
}
}
@ -1045,7 +1047,8 @@ echo 'Beta succeeded'
mod want_grouping {
use super::super::*;
use crate::build_event_log::MemoryBELStorage;
use crate::{PartitionRef, WantDetail};
use crate::want_detail::Lifetime;
use crate::{OriginatingLifetime, PartitionRef, WantDetail};
fn create_job_config(label: &str, pattern: &str) -> JobConfiguration {
JobConfiguration {
@ -1066,13 +1069,17 @@ echo 'Beta succeeded'
})
.collect(),
upstreams: vec![],
data_timestamp: 0,
ttl_seconds: 0,
sla_seconds: 0,
source: None,
lifetime: Some(Lifetime::Originating(OriginatingLifetime {
data_timestamp: 0,
ttl_seconds: 0,
sla_seconds: 0,
})),
comment: None,
status: None,
last_updated_timestamp: 0,
job_run_ids: vec![],
derivative_want_ids: vec![],
job_runs: vec![],
}
}

View file

@ -1,3 +1,4 @@
use crate::util::{HasRelatedIds, RelatedIds};
use crate::{PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
@ -58,6 +59,8 @@ pub struct UpstreamFailedState {
pub struct TaintedState {
pub tainted_at: u64,
pub taint_ids: Vec<String>,
/// Job run that originally built this partition (before it was tainted)
pub built_by: String,
}
/// Generic partition struct parameterized by state.
@ -250,6 +253,7 @@ impl PartitionWithState<LiveState> {
state: TaintedState {
tainted_at: timestamp,
taint_ids: vec![taint_id],
built_by: self.state.built_by,
},
}
}
@ -337,9 +341,57 @@ impl Partition {
pub fn is_tainted(&self) -> bool {
matches!(self, Partition::Tainted(_))
}
}
// ==================== HasRelatedIds trait implementation ====================
impl HasRelatedIds for Partition {
/// Get the IDs of all entities this partition references.
/// Note: downstream_partition_uuids and want_ids come from BuildState indexes,
/// not from Partition itself.
fn related_ids(&self) -> RelatedIds {
// Job run ID from the builder (for states that track it)
let job_run_ids: Vec<String> = match self {
Partition::Building(p) => vec![p.state.job_run_id.clone()],
Partition::UpstreamBuilding(p) => vec![p.state.job_run_id.clone()],
Partition::UpForRetry(p) => vec![p.state.original_job_run_id.clone()],
Partition::Live(p) => vec![p.state.built_by.clone()],
Partition::Failed(p) => vec![p.state.failed_by.clone()],
Partition::UpstreamFailed(_) => vec![],
Partition::Tainted(p) => vec![p.state.built_by.clone()],
};
// Partition refs from missing deps (for UpstreamBuilding state)
let partition_refs: Vec<String> = match self {
Partition::UpstreamBuilding(p) => p
.state
.missing_deps
.iter()
.map(|d| d.r#ref.clone())
.collect(),
Partition::UpstreamFailed(p) => p
.state
.failed_upstream_refs
.iter()
.map(|d| d.r#ref.clone())
.collect(),
_ => vec![],
};
RelatedIds {
partition_refs,
partition_uuids: vec![],
job_run_ids,
want_ids: vec![],
}
}
}
impl Partition {
/// Convert to PartitionDetail for API responses and queries.
/// Note: want_ids is now empty - this will be populated by BuildState from the inverted index.
/// Note: want_ids and downstream_partition_uuids are empty here and will be
/// populated by BuildState from its inverted indexes.
/// Upstream lineage is resolved via built_by_job_run_id → job run's read_deps.
pub fn to_detail(&self) -> PartitionDetail {
match self {
Partition::Building(p) => PartitionDetail {
@ -353,6 +405,8 @@ impl Partition {
taint_ids: vec![],
last_updated_timestamp: None,
uuid: p.uuid.to_string(),
built_by_job_run_id: None,
downstream_partition_uuids: vec![], // Populated by BuildState
},
Partition::UpstreamBuilding(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
@ -365,6 +419,8 @@ impl Partition {
taint_ids: vec![],
last_updated_timestamp: None,
uuid: p.uuid.to_string(),
built_by_job_run_id: None,
downstream_partition_uuids: vec![], // Populated by BuildState
},
Partition::UpForRetry(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
@ -377,6 +433,8 @@ impl Partition {
taint_ids: vec![],
last_updated_timestamp: None,
uuid: p.uuid.to_string(),
built_by_job_run_id: None,
downstream_partition_uuids: vec![], // Populated by BuildState
},
Partition::Live(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
@ -389,6 +447,8 @@ impl Partition {
taint_ids: vec![],
last_updated_timestamp: Some(p.state.built_at),
uuid: p.uuid.to_string(),
built_by_job_run_id: Some(p.state.built_by.clone()),
downstream_partition_uuids: vec![], // Populated by BuildState
},
Partition::Failed(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
@ -401,6 +461,8 @@ impl Partition {
taint_ids: vec![],
last_updated_timestamp: Some(p.state.failed_at),
uuid: p.uuid.to_string(),
built_by_job_run_id: None,
downstream_partition_uuids: vec![], // Populated by BuildState
},
Partition::UpstreamFailed(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
@ -413,6 +475,8 @@ impl Partition {
taint_ids: vec![],
last_updated_timestamp: Some(p.state.failed_at),
uuid: p.uuid.to_string(),
built_by_job_run_id: None,
downstream_partition_uuids: vec![], // Populated by BuildState
},
Partition::Tainted(p) => PartitionDetail {
r#ref: Some(p.partition_ref.clone()),
@ -421,10 +485,12 @@ impl Partition {
name: "PartitionTainted".to_string(),
}),
want_ids: vec![], // Populated by BuildState
job_run_ids: vec![],
job_run_ids: vec![p.state.built_by.clone()],
taint_ids: p.state.taint_ids.clone(),
last_updated_timestamp: Some(p.state.tainted_at),
uuid: p.uuid.to_string(),
built_by_job_run_id: Some(p.state.built_by.clone()),
downstream_partition_uuids: vec![], // Populated by BuildState
},
}
}

View file

@ -1,5 +1,57 @@
use std::backtrace::Backtrace;
use std::time::{SystemTime, UNIX_EPOCH};
use uuid::Uuid;
// ============================================================================
// Related IDs - for building RelatedEntities index in API responses
// ============================================================================
/// IDs of related entities that an object references.
/// Used by the query layer to build the RelatedEntities index for API responses.
#[derive(Debug, Clone, Default)]
pub struct RelatedIds {
/// Partition refs (e.g., from Want.partitions, JobRun.building_partitions)
pub partition_refs: Vec<String>,
/// Partition UUIDs (e.g., from read/write lineage, consumer index)
pub partition_uuids: Vec<Uuid>,
/// Job run IDs (e.g., from built_by, consumer jobs)
pub job_run_ids: Vec<String>,
/// Want IDs (e.g., derivative wants, upstream wants, servicing wants)
pub want_ids: Vec<String>,
}
impl RelatedIds {
/// Merge another RelatedIds into this one, deduplicating
pub fn merge(&mut self, other: RelatedIds) {
for r in other.partition_refs {
if !self.partition_refs.contains(&r) {
self.partition_refs.push(r);
}
}
for u in other.partition_uuids {
if !self.partition_uuids.contains(&u) {
self.partition_uuids.push(u);
}
}
for j in other.job_run_ids {
if !self.job_run_ids.contains(&j) {
self.job_run_ids.push(j);
}
}
for w in other.want_ids {
if !self.want_ids.contains(&w) {
self.want_ids.push(w);
}
}
}
}
/// Trait for entities that can report their related entity IDs.
/// Used by the query layer to build RelatedEntities indexes for API responses.
/// Implementing types: Want, JobRun, Partition
pub trait HasRelatedIds {
fn related_ids(&self) -> RelatedIds;
}
pub fn current_timestamp() -> u64 {
let now = SystemTime::now();
@ -89,3 +141,192 @@ impl std::fmt::Display for DatabuildError {
write!(f, "{}", self.msg)
}
}
// ============================================================================
// Test Scenarios - reusable BEL event sequences for testing
// ============================================================================
#[cfg(test)]
pub mod test_scenarios {
use crate::data_build_event::Event;
use crate::want_create_event_v1::Lifetime;
use crate::{
EphemeralLifetime, JobRunBufferEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1,
JobRunSuccessEventV1, MissingDeps, OriginatingLifetime, PartitionRef, ReadDeps,
WantAttributedPartitions, WantCreateEventV1,
};
/// Returns a default Originating lifetime for use in tests.
/// WantCreateEventV1 requires a lifetime to be set.
pub fn default_originating_lifetime() -> Lifetime {
Lifetime::Originating(OriginatingLifetime {
data_timestamp: 1000,
ttl_seconds: 3600,
sla_seconds: 7200,
})
}
/// IDs used in the multihop scenario for easy reference in tests
pub struct MultihopIds {
pub beta_want_id: String,
pub alpha_want_id: String,
pub beta_job_1_id: String,
pub beta_job_2_id: String,
pub alpha_job_id: String,
}
impl Default for MultihopIds {
fn default() -> Self {
Self {
beta_want_id: "beta-want".to_string(),
alpha_want_id: "alpha-want".to_string(),
beta_job_1_id: "beta-job-1".to_string(),
beta_job_2_id: "beta-job-2".to_string(),
alpha_job_id: "alpha-job".to_string(),
}
}
}
/// Creates a multihop dependency scenario:
/// 1. Want for data/beta is created
/// 2. Job beta-job-1 starts, discovers missing dep on data/alpha
/// 3. Derivative want for data/alpha is created
/// 4. Job alpha-job builds data/alpha successfully
/// 5. Job beta-job-2 retries and succeeds
///
/// This exercises: want creation, job buffering, dep-miss, derivative wants,
/// job success with read_deps, and retry logic.
pub fn multihop_scenario() -> (Vec<Event>, MultihopIds) {
let ids = MultihopIds::default();
let mut events = vec![];
// 1. Create originating want for data/beta (user-requested)
events.push(Event::WantCreateV1(WantCreateEventV1 {
want_id: ids.beta_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
lifetime: Some(Lifetime::Originating(OriginatingLifetime {
data_timestamp: 1000,
ttl_seconds: 3600,
sla_seconds: 7200,
})),
comment: Some("User requested beta data".to_string()),
}));
// 2. Queue beta job (first attempt)
events.push(Event::JobRunBufferV1(JobRunBufferEventV1 {
job_run_id: ids.beta_job_1_id.clone(),
job_label: "//job_beta".to_string(),
want_attributed_partitions: vec![WantAttributedPartitions {
want_id: ids.beta_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
}],
building_partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
..Default::default()
}));
// 3. Beta job starts running
events.push(Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 {
job_run_id: ids.beta_job_1_id.clone(),
..Default::default()
}));
// 4. Beta job reports missing dependency on data/alpha
events.push(Event::JobRunMissingDepsV1(JobRunMissingDepsEventV1 {
job_run_id: ids.beta_job_1_id.clone(),
missing_deps: vec![MissingDeps {
impacted: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
missing: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
}],
..Default::default()
}));
// 5. Create ephemeral want for data/alpha (derivative - created due to dep-miss)
events.push(Event::WantCreateV1(WantCreateEventV1 {
want_id: ids.alpha_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
lifetime: Some(Lifetime::Ephemeral(EphemeralLifetime {
job_run_id: ids.beta_job_1_id.clone(),
})),
comment: Some("Missing data".to_string()),
}));
// 6. Queue alpha job
events.push(Event::JobRunBufferV1(JobRunBufferEventV1 {
job_run_id: ids.alpha_job_id.clone(),
job_label: "//job_alpha".to_string(),
want_attributed_partitions: vec![WantAttributedPartitions {
want_id: ids.alpha_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
}],
building_partitions: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
..Default::default()
}));
// 7. Alpha job starts running
events.push(Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 {
job_run_id: ids.alpha_job_id.clone(),
..Default::default()
}));
// 8. Alpha job succeeds (no read deps for leaf node)
events.push(Event::JobRunSuccessV1(JobRunSuccessEventV1 {
job_run_id: ids.alpha_job_id.clone(),
read_deps: vec![],
..Default::default()
}));
// 9. Queue beta job again (second attempt - retry)
events.push(Event::JobRunBufferV1(JobRunBufferEventV1 {
job_run_id: ids.beta_job_2_id.clone(),
job_label: "//job_beta".to_string(),
want_attributed_partitions: vec![WantAttributedPartitions {
want_id: ids.beta_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
}],
building_partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
..Default::default()
}));
// 10. Beta job 2 starts running
events.push(Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 {
job_run_id: ids.beta_job_2_id.clone(),
..Default::default()
}));
// 11. Beta job 2 succeeds with read_deps showing it read data/alpha
events.push(Event::JobRunSuccessV1(JobRunSuccessEventV1 {
job_run_id: ids.beta_job_2_id.clone(),
read_deps: vec![ReadDeps {
impacted: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
read: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
}],
..Default::default()
}));
(events, ids)
}
}

View file

@ -1,8 +1,31 @@
use crate::partition_state::FailedPartitionRef;
use crate::util::current_timestamp;
use crate::{EventSource, PartitionRef, WantCreateEventV1, WantDetail, WantStatusCode};
use crate::util::{HasRelatedIds, RelatedIds, current_timestamp};
use crate::want_create_event_v1::Lifetime;
use crate::want_detail::Lifetime as WantDetailLifetime;
use crate::{
EphemeralLifetime, EventSource, OriginatingLifetime, PartitionRef, WantCreateEventV1,
WantDetail, WantStatusCode,
};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
/// Want lifetime semantics - determines how freshness/TTL is evaluated
#[derive(Debug, Clone)]
pub enum WantLifetime {
/// User/API-created wants with explicit freshness requirements.
/// These drive ongoing rebuilds when partitions get tainted.
Originating {
data_timestamp: u64,
ttl_seconds: u64,
sla_seconds: u64,
},
/// System-created (derivative) wants from dep-miss.
/// Delegate freshness decisions to their originating want.
/// Complete when partitions become Live, never trigger independent rebuilds.
Ephemeral {
/// The job run that hit dep-miss and created this derivative want
job_run_id: String,
},
}
/// State: Want has just been created, state not yet determined by sensing partition states
#[derive(Debug, Clone)]
@ -57,12 +80,11 @@ pub struct CanceledState {
pub struct WantInfo {
pub want_id: String,
pub partitions: Vec<PartitionRef>,
pub data_timestamp: u64,
pub ttl_seconds: u64,
pub sla_seconds: u64,
pub source: Option<EventSource>,
pub lifetime: WantLifetime,
pub comment: Option<String>,
pub last_updated_at: u64,
/// Job runs that have serviced this want (populated by handle_job_run_buffer)
pub job_run_ids: Vec<String>,
}
impl Default for WantInfo {
@ -70,12 +92,14 @@ impl Default for WantInfo {
Self {
want_id: uuid::Uuid::new_v4().to_string(),
partitions: vec![],
data_timestamp: 0,
ttl_seconds: 0,
sla_seconds: 0,
source: None,
lifetime: WantLifetime::Originating {
data_timestamp: 0,
ttl_seconds: 0,
sla_seconds: 0,
},
comment: None,
last_updated_at: 0,
job_run_ids: vec![],
}
}
}
@ -180,16 +204,26 @@ impl WantWithState<CanceledState> {
// From impl for creating want from event - creates in New state for sensing
impl From<WantCreateEventV1> for WantWithState<NewState> {
fn from(event: WantCreateEventV1) -> Self {
let lifetime = match event.lifetime {
Some(Lifetime::Originating(orig)) => WantLifetime::Originating {
data_timestamp: orig.data_timestamp,
ttl_seconds: orig.ttl_seconds,
sla_seconds: orig.sla_seconds,
},
Some(Lifetime::Ephemeral(eph)) => WantLifetime::Ephemeral {
job_run_id: eph.job_run_id,
},
None => panic!("Unexpectedly empty want lifetime"),
};
WantWithState {
want: WantInfo {
want_id: event.want_id,
partitions: event.partitions,
data_timestamp: event.data_timestamp,
ttl_seconds: event.ttl_seconds,
sla_seconds: event.sla_seconds,
source: event.source,
lifetime,
comment: event.comment,
last_updated_at: current_timestamp(),
job_run_ids: vec![],
},
state: NewState {},
}
@ -464,33 +498,134 @@ impl WantWithState<UpstreamBuildingState> {
}
}
// ==================== HasRelatedIds trait implementation ====================
impl HasRelatedIds for Want {
/// Get the IDs of all entities this want references.
/// Note: job_run_ids come from inverted indexes in BuildState, not from Want itself.
fn related_ids(&self) -> RelatedIds {
let partition_refs = self
.want()
.partitions
.iter()
.map(|p| p.r#ref.clone())
.collect();
// Collect want IDs from state-specific relationships
let want_ids = match self {
Want::UpstreamBuilding(w) => w.state.upstream_want_ids.clone(),
Want::UpstreamFailed(w) => w.state.failed_wants.clone(),
_ => vec![],
};
RelatedIds {
partition_refs,
partition_uuids: vec![],
job_run_ids: vec![],
want_ids,
}
}
}
// Helper methods on the Want enum
impl Want {
/// Create a new want in the Idle state
pub fn new(
/// Create a new originating want in the Idle state
pub fn new_originating(
want_id: String,
partitions: Vec<PartitionRef>,
data_timestamp: u64,
ttl_seconds: u64,
sla_seconds: u64,
source: Option<EventSource>,
comment: Option<String>,
) -> Self {
Want::Idle(WantWithState {
want: WantInfo {
want_id,
partitions,
data_timestamp,
ttl_seconds,
sla_seconds,
source,
lifetime: WantLifetime::Originating {
data_timestamp,
ttl_seconds,
sla_seconds,
},
comment,
last_updated_at: current_timestamp(),
job_run_ids: vec![],
},
state: IdleState {},
})
}
/// Create a new ephemeral want in the Idle state (derivative from dep-miss)
pub fn new_ephemeral(
want_id: String,
partitions: Vec<PartitionRef>,
job_run_id: String,
comment: Option<String>,
) -> Self {
Want::Idle(WantWithState {
want: WantInfo {
want_id,
partitions,
lifetime: WantLifetime::Ephemeral { job_run_id },
comment,
last_updated_at: current_timestamp(),
job_run_ids: vec![],
},
state: IdleState {},
})
}
/// Get the lifetime of this want
pub fn lifetime(&self) -> &WantLifetime {
&self.want().lifetime
}
/// Add a job run ID to this want's list of servicing job runs
pub fn add_job_run_id(&mut self, job_run_id: &str) {
match self {
Want::New(w) => {
if !w.want.job_run_ids.contains(&job_run_id.to_string()) {
w.want.job_run_ids.push(job_run_id.to_string());
}
}
Want::Idle(w) => {
if !w.want.job_run_ids.contains(&job_run_id.to_string()) {
w.want.job_run_ids.push(job_run_id.to_string());
}
}
Want::Building(w) => {
if !w.want.job_run_ids.contains(&job_run_id.to_string()) {
w.want.job_run_ids.push(job_run_id.to_string());
}
}
Want::UpstreamBuilding(w) => {
if !w.want.job_run_ids.contains(&job_run_id.to_string()) {
w.want.job_run_ids.push(job_run_id.to_string());
}
}
Want::Successful(w) => {
if !w.want.job_run_ids.contains(&job_run_id.to_string()) {
w.want.job_run_ids.push(job_run_id.to_string());
}
}
Want::Failed(w) => {
if !w.want.job_run_ids.contains(&job_run_id.to_string()) {
w.want.job_run_ids.push(job_run_id.to_string());
}
}
Want::UpstreamFailed(w) => {
if !w.want.job_run_ids.contains(&job_run_id.to_string()) {
w.want.job_run_ids.push(job_run_id.to_string());
}
}
Want::Canceled(w) => {
if !w.want.job_run_ids.contains(&job_run_id.to_string()) {
w.want.job_run_ids.push(job_run_id.to_string());
}
}
}
}
/// Check if want is schedulable (Idle or UpstreamBuilding with satisfied upstreams)
pub fn is_schedulable(&self) -> bool {
match self {
@ -522,16 +657,32 @@ impl Want {
}
}
/// Convert to WantDetail for API responses and queries
/// Convert to WantDetail for API responses and queries.
/// job_run_ids are returned from the Want itself.
/// derivative_want_ids are computed by traversing job runs (done by BuildState).
pub fn to_detail(&self) -> WantDetail {
let lifetime = match &self.want().lifetime {
WantLifetime::Originating {
data_timestamp,
ttl_seconds,
sla_seconds,
} => Some(WantDetailLifetime::Originating(OriginatingLifetime {
data_timestamp: *data_timestamp,
ttl_seconds: *ttl_seconds,
sla_seconds: *sla_seconds,
})),
WantLifetime::Ephemeral { job_run_id } => {
Some(WantDetailLifetime::Ephemeral(EphemeralLifetime {
job_run_id: job_run_id.clone(),
}))
}
};
WantDetail {
want_id: self.want().want_id.clone(),
partitions: self.want().partitions.clone(),
upstreams: vec![], // Upstreams are tracked via want relationships, not stored here
data_timestamp: self.want().data_timestamp,
ttl_seconds: self.want().ttl_seconds,
sla_seconds: self.want().sla_seconds,
source: self.want().source.clone(),
lifetime,
comment: self.want().comment.clone(),
last_updated_timestamp: self.want().last_updated_at,
status: match self {
@ -544,6 +695,9 @@ impl Want {
Want::UpstreamFailed(_) => Some(WantStatusCode::WantUpstreamFailed.into()),
Want::Canceled(_) => Some(WantStatusCode::WantCanceled.into()),
},
job_run_ids: self.want().job_run_ids.clone(),
derivative_want_ids: vec![], // Computed by BuildState via job traversal
job_runs: vec![], // Populated by BuildState.get_want()
}
}
}

View file

@ -3,7 +3,7 @@
use askama::Template;
use crate::{
JobRunDetail, JobRunStatus, PartitionDetail, PartitionRef, PartitionStatus,
JobRunDetail, JobRunStatus, PartitionDetail, PartitionRef, PartitionStatus, ReadDeps,
WantAttributedPartitions, WantDetail, WantStatus,
};
@ -73,39 +73,120 @@ impl From<&JobRunStatus> for JobRunStatusView {
}
}
pub struct WantDetailView {
/// Simple view for derivative wants in the want detail page
pub struct DerivativeWantView {
pub want_id: String,
pub partitions: Vec<PartitionRefView>,
pub upstreams: Vec<PartitionRefView>,
pub data_timestamp: u64,
pub ttl_seconds: u64,
pub sla_seconds: u64,
pub comment: Option<String>,
pub comment_display: String,
pub status: Option<WantStatusView>,
pub last_updated_timestamp: u64,
}
impl From<&WantDetail> for WantDetailView {
impl From<&WantDetail> for DerivativeWantView {
fn from(w: &WantDetail) -> Self {
Self {
want_id: w.want_id.clone(),
partitions: w.partitions.iter().map(PartitionRefView::from).collect(),
upstreams: w.upstreams.iter().map(PartitionRefView::from).collect(),
data_timestamp: w.data_timestamp,
ttl_seconds: w.ttl_seconds,
sla_seconds: w.sla_seconds,
comment: w.comment.clone(),
comment_display: w.comment.as_deref().unwrap_or("-").to_string(),
status: w.status.as_ref().map(WantStatusView::from),
last_updated_timestamp: w.last_updated_timestamp,
}
}
}
/// Enum representing the want lifetime type for templates
pub enum WantLifetimeView {
Originating {
data_timestamp: u64,
ttl_seconds: u64,
sla_seconds: u64,
},
Ephemeral {
job_run_id: String,
},
}
pub struct WantDetailView {
pub want_id: String,
pub partitions: Vec<PartitionRefView>,
pub upstreams: Vec<PartitionRefView>,
pub lifetime: Option<WantLifetimeView>,
/// Convenience accessor for originating wants - returns data_timestamp or 0
pub data_timestamp: u64,
/// Convenience accessor for originating wants - returns ttl_seconds or 0
pub ttl_seconds: u64,
/// Convenience accessor for originating wants - returns sla_seconds or 0
pub sla_seconds: u64,
/// True if this is an ephemeral (derivative) want
pub is_ephemeral: bool,
/// Job run that created this ephemeral want (if ephemeral)
pub source_job_run_id: Option<String>,
pub comment: Option<String>,
pub comment_display: String,
pub status: Option<WantStatusView>,
pub last_updated_timestamp: u64,
// Lineage fields
pub job_run_ids: Vec<String>,
pub derivative_want_ids: Vec<String>,
pub job_runs: Vec<JobRunDetailView>,
pub derivative_wants: Vec<DerivativeWantView>,
}
impl WantDetailView {
/// Create a WantDetailView with derivative wants populated.
/// Use this for the detail page where derivative wants need to be shown.
pub fn new(w: &WantDetail, derivative_wants: Vec<DerivativeWantView>) -> Self {
use crate::want_detail::Lifetime;
let (lifetime, data_timestamp, ttl_seconds, sla_seconds, is_ephemeral, source_job_run_id) =
match &w.lifetime {
Some(Lifetime::Originating(orig)) => (
Some(WantLifetimeView::Originating {
data_timestamp: orig.data_timestamp,
ttl_seconds: orig.ttl_seconds,
sla_seconds: orig.sla_seconds,
}),
orig.data_timestamp,
orig.ttl_seconds,
orig.sla_seconds,
false,
None,
),
Some(Lifetime::Ephemeral(eph)) => (
Some(WantLifetimeView::Ephemeral {
job_run_id: eph.job_run_id.clone(),
}),
0,
0,
0,
true,
Some(eph.job_run_id.clone()),
),
None => (None, 0, 0, 0, false, None),
};
Self {
want_id: w.want_id.clone(),
partitions: w.partitions.iter().map(PartitionRefView::from).collect(),
upstreams: w.upstreams.iter().map(PartitionRefView::from).collect(),
lifetime,
data_timestamp,
ttl_seconds,
sla_seconds,
is_ephemeral,
source_job_run_id,
comment: w.comment.clone(),
comment_display: w.comment.as_deref().unwrap_or("-").to_string(),
status: w.status.as_ref().map(WantStatusView::from),
last_updated_timestamp: w.last_updated_timestamp,
job_run_ids: w.job_run_ids.clone(),
derivative_want_ids: w.derivative_want_ids.clone(),
job_runs: w.job_runs.iter().map(JobRunDetailView::from).collect(),
derivative_wants,
}
}
}
/// For list pages where derivative wants aren't needed
impl From<WantDetail> for WantDetailView {
fn from(w: WantDetail) -> Self {
Self::from(&w)
Self::new(&w, vec![])
}
}
@ -119,6 +200,9 @@ pub struct PartitionDetailView {
pub want_ids: Vec<String>,
pub taint_ids: Vec<String>,
pub uuid: String,
// Lineage fields
pub built_by_job_run_id: Option<String>,
pub downstream_partition_uuids: Vec<String>,
}
impl From<&PartitionDetail> for PartitionDetailView {
@ -141,6 +225,8 @@ impl From<&PartitionDetail> for PartitionDetailView {
want_ids: p.want_ids.clone(),
taint_ids: p.taint_ids.clone(),
uuid: p.uuid.clone(),
built_by_job_run_id: p.built_by_job_run_id.clone(),
downstream_partition_uuids: p.downstream_partition_uuids.clone(),
}
}
}
@ -165,20 +251,75 @@ impl From<&WantAttributedPartitions> for WantAttributedPartitionsView {
}
}
/// View for read dependency entries (impacted → read relationships)
pub struct ReadDepsView {
pub impacted: Vec<PartitionRefView>,
pub read: Vec<PartitionRefView>,
}
impl From<&ReadDeps> for ReadDepsView {
fn from(rd: &ReadDeps) -> Self {
Self {
impacted: rd.impacted.iter().map(PartitionRefView::from).collect(),
read: rd.read.iter().map(PartitionRefView::from).collect(),
}
}
}
/// View for partition ref with its resolved UUID (for lineage display)
pub struct PartitionRefWithUuidView {
pub partition_ref: String,
pub partition_ref_encoded: String,
pub uuid: String,
}
pub struct JobRunDetailView {
pub id: String,
pub job_label: String,
pub status: Option<JobRunStatusView>,
pub last_heartbeat_at: Option<u64>,
pub queued_at: Option<u64>,
pub started_at: Option<u64>,
pub building_partitions: Vec<PartitionRefView>,
pub servicing_wants: Vec<WantAttributedPartitionsView>,
// Lineage fields (populated for Succeeded/DepMiss states)
pub read_deps: Vec<ReadDepsView>,
pub read_partitions: Vec<PartitionRefWithUuidView>,
pub wrote_partitions: Vec<PartitionRefWithUuidView>,
pub derivative_want_ids: Vec<String>,
}
impl From<&JobRunDetail> for JobRunDetailView {
fn from(jr: &JobRunDetail) -> Self {
// Build read_partitions from read_partition_uuids map
let read_partitions: Vec<PartitionRefWithUuidView> = jr
.read_partition_uuids
.iter()
.map(|(partition_ref, uuid)| PartitionRefWithUuidView {
partition_ref: partition_ref.clone(),
partition_ref_encoded: urlencoding::encode(partition_ref).into_owned(),
uuid: uuid.clone(),
})
.collect();
// Build wrote_partitions from wrote_partition_uuids map
let wrote_partitions: Vec<PartitionRefWithUuidView> = jr
.wrote_partition_uuids
.iter()
.map(|(partition_ref, uuid)| PartitionRefWithUuidView {
partition_ref: partition_ref.clone(),
partition_ref_encoded: urlencoding::encode(partition_ref).into_owned(),
uuid: uuid.clone(),
})
.collect();
Self {
id: jr.id.clone(),
job_label: jr.job_label.clone(),
status: jr.status.as_ref().map(JobRunStatusView::from),
last_heartbeat_at: jr.last_heartbeat_at,
queued_at: jr.queued_at,
started_at: jr.started_at,
building_partitions: jr
.building_partitions
.iter()
@ -189,6 +330,10 @@ impl From<&JobRunDetail> for JobRunDetailView {
.iter()
.map(WantAttributedPartitionsView::from)
.collect(),
read_deps: jr.read_deps.iter().map(ReadDepsView::from).collect(),
read_partitions,
wrote_partitions,
derivative_want_ids: jr.derivative_want_ids.clone(),
}
}
}
@ -361,3 +506,100 @@ pub struct JobRunDetailPage {
pub struct WantCreatePage {
pub base: BaseContext,
}
// =============================================================================
// Tests
// =============================================================================
#[cfg(test)]
mod tests {
use super::*;
use crate::build_state::BuildState;
use crate::util::test_scenarios::multihop_scenario;
use askama::Template;
/// Helper to replay events into a fresh BuildState
fn build_state_from_events(events: &[crate::data_build_event::Event]) -> BuildState {
let mut state = BuildState::default();
for event in events {
state.handle_event(event);
}
state
}
mod want_detail_page {
use super::*;
/// Tests that the want detail page shows the job runs that serviced this want.
/// This is the "Fulfillment - Job Runs" section in the UI.
///
/// Given: The multihop scenario completes (beta job dep-misses, alpha built, beta retries)
/// When: We render the beta want detail page
/// Then: It should show both beta job runs (beta-job-1 and beta-job-2)
#[test]
fn test_shows_servicing_job_runs() {
let (events, ids) = multihop_scenario();
let state = build_state_from_events(&events);
// Get beta want and render its detail page
let want_detail = state
.get_want(&ids.beta_want_id)
.expect("beta want should exist");
let template = WantDetailPage {
base: BaseContext::default(),
want: WantDetailView::from(want_detail),
};
let html = template.render().expect("template should render");
// Verify the Fulfillment section exists and contains both job run IDs
assert!(
html.contains(&ids.beta_job_1_id),
"Should show beta-job-1 in fulfillment section. HTML:\n{}",
html
);
assert!(
html.contains(&ids.beta_job_2_id),
"Should show beta-job-2 in fulfillment section. HTML:\n{}",
html
);
}
/// Tests that the want detail page shows derivative wants spawned by dep-miss.
/// This is the "Fulfillment - Derivative Wants" section in the UI.
///
/// Given: The multihop scenario completes (beta job dep-misses, spawning alpha want)
/// When: We render the beta want detail page
/// Then: It should show the alpha want as a derivative
#[test]
fn test_shows_derivative_wants() {
let (events, ids) = multihop_scenario();
let state = build_state_from_events(&events);
// Get beta want and render its detail page
let want_detail = state
.get_want(&ids.beta_want_id)
.expect("beta want should exist");
// Fetch derivative wants (like the http_server does)
let derivative_wants: Vec<_> = want_detail
.derivative_want_ids
.iter()
.filter_map(|id| state.get_want(id))
.map(|w| DerivativeWantView::from(&w))
.collect();
let template = WantDetailPage {
base: BaseContext::default(),
want: WantDetailView::new(&want_detail, derivative_wants),
};
let html = template.render().expect("template should render");
// Verify the Fulfillment section exists and contains the derivative want
assert!(
html.contains(&ids.alpha_want_id),
"Should show alpha-want as derivative want. HTML:\n{}",
html
);
}
}
}

View file

@ -54,4 +54,43 @@
</div>
{% endif %}
{% if !job_run.read_partitions.is_empty() %}
<div class="detail-section">
<h2>Read Partitions ({{ job_run.read_partitions.len() }})</h2>
<ul class="partition-list">
{% for p in job_run.read_partitions %}
<li>
<a href="/partitions/{{ p.partition_ref_encoded }}" class="partition-ref">{{ p.partition_ref }}</a>
<span style="color:var(--color-text-muted);font-size:.75rem;font-family:monospace;margin-left:.5rem">uuid: {{ p.uuid }}</span>
</li>
{% endfor %}
</ul>
</div>
{% endif %}
{% if !job_run.wrote_partitions.is_empty() %}
<div class="detail-section">
<h2>Wrote Partitions ({{ job_run.wrote_partitions.len() }})</h2>
<ul class="partition-list">
{% for p in job_run.wrote_partitions %}
<li>
<a href="/partitions/{{ p.partition_ref_encoded }}" class="partition-ref">{{ p.partition_ref }}</a>
<span style="color:var(--color-text-muted);font-size:.75rem;font-family:monospace;margin-left:.5rem">uuid: {{ p.uuid }}</span>
</li>
{% endfor %}
</ul>
</div>
{% endif %}
{% if !job_run.derivative_want_ids.is_empty() %}
<div class="detail-section">
<h2>Derivative Wants ({{ job_run.derivative_want_ids.len() }})</h2>
<ul class="partition-list">
{% for id in job_run.derivative_want_ids %}
<li><a href="/wants/{{ id }}">{{ id }}</a></li>
{% endfor %}
</ul>
</div>
{% endif %}
{% call base::footer() %}

View file

@ -32,9 +32,34 @@
</div>
</div>
{% match partition.built_by_job_run_id %}
{% when Some with (job_run_id) %}
<div class="detail-section">
<h2>Lineage - Built By</h2>
<p>
<a href="/job_runs/{{ job_run_id }}">{{ job_run_id }}</a>
<span style="color:var(--color-text-muted);font-size:.75rem;margin-left:.5rem">(view job run for input partitions)</span>
</p>
</div>
{% when None %}
{% endmatch %}
{% if !partition.downstream_partition_uuids.is_empty() %}
<div class="detail-section">
<h2>Lineage - Downstream Consumers ({{ partition.downstream_partition_uuids.len() }})</h2>
<ul class="partition-list">
{% for uuid in partition.downstream_partition_uuids %}
<li>
<span style="font-family:monospace;font-size:.8125rem">{{ uuid }}</span>
</li>
{% endfor %}
</ul>
</div>
{% endif %}
{% if !partition.job_run_ids.is_empty() %}
<div class="detail-section">
<h2>Job Runs ({{ partition.job_run_ids.len() }})</h2>
<h2>Related Job Runs ({{ partition.job_run_ids.len() }})</h2>
<ul class="partition-list">
{% for id in partition.job_run_ids %}
<li><a href="/job_runs/{{ id }}">{{ id }}</a></li>

View file

@ -65,4 +65,56 @@
</div>
{% endif %}
{% if !want.job_runs.is_empty() %}
<div class="detail-section">
<h2>Fulfillment - Job Runs ({{ want.job_runs.len() }})</h2>
<table class="data-table">
<thead>
<tr>
<th>ID</th>
<th>Job Label</th>
<th>Started</th>
<th>Duration</th>
<th>Status</th>
</tr>
</thead>
<tbody>
{% for jr in want.job_runs %}
<tr>
<td><a href="/job_runs/{{ jr.id }}">{{ jr.id }}</a></td>
<td>{{ jr.job_label }}</td>
<td>{% match jr.started_at %}{% when Some with (ts) %}{{ ts }}{% when None %}-{% endmatch %}</td>
<td>{% match jr.started_at %}{% when Some with (started) %}{% match jr.queued_at %}{% when Some with (queued) %}{{ started - queued }}ms{% when None %}-{% endmatch %}{% when None %}-{% endmatch %}</td>
<td>{% match jr.status %}{% when Some with (s) %}<span class="status status-{{ s.name_lowercase }}">{{ s.name }}</span>{% when None %}-{% endmatch %}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endif %}
{% if !want.derivative_wants.is_empty() %}
<div class="detail-section">
<h2>Fulfillment - Derivative Wants ({{ want.derivative_wants.len() }})</h2>
<table class="data-table">
<thead>
<tr>
<th>ID</th>
<th>Partitions</th>
<th>Status</th>
</tr>
</thead>
<tbody>
{% for dw in want.derivative_wants %}
<tr>
<td><a href="/wants/{{ dw.want_id }}">{{ dw.want_id }}</a></td>
<td>{% for p in dw.partitions %}{{ p.partition_ref }}{% if !loop.last %}, {% endif %}{% endfor %}</td>
<td>{% match dw.status %}{% when Some with (s) %}<span class="status status-{{ s.name|lower }}">{{ s.name }}</span>{% when None %}-{% endmatch %}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endif %}
{% call base::footer() %}

View file

@ -0,0 +1,12 @@
- You can't keep building partitions forever
- Space runs out for literal data, costs increase as data value decreases for old data
- Memory runs out for indexes over partitions, e.g. in databuild itself
- We could introduce vacuuming?
- We could introduce partition want expiry callbacks?
- Jobs and job runs as partition edge transitions?
- Do we even want to delete partition entries? Can just wait till this is a problem.
- Partition want expiry events also enable non-event level reaction, e.g. vacuum for all events between time T1 and T2.
- RISK! If partition to partition data deps (via jobs that change, etc) are not canonical/stable:
- We cannot assert the necessity of upstream partitions for anything longer than the initial job time to success (because it may have changed)
- To make this valuable, we need to be able to assume that the reads/data deps from a singular parameterized job run are durable, because then we can propagate want times and have a durable "why does this partition need to exist"

View file

@ -0,0 +1,241 @@
# Event-Sourced CPN Framework
A vision for a Rust library/framework combining event sourcing, Colored Petri Net semantics, and compile-time safety for building correct distributed systems.
## The Problem
In highly connected applications with multiple entity types and relationships (like databuild's Wants, JobRuns, Partitions), developers face combinatorial complexity:
For each edge type between entities, you need:
1. Forward accessor
2. Inverse accessor (index)
3. Index maintenance on creation
4. Index maintenance on deletion
5. Consistency checks
6. Query patterns for traversal
As the number of entities and edges grows, this becomes:
- Hard to keep in your head
- Error-prone (forgot to update an index)
- Lots of boilerplate
- Testing burden for plumbing rather than business logic
The temptation is to "throw hands up" and use SQL with foreign keys, or accept eventual consistency. But this sacrifices the compile-time guarantees Rust can provide.
## The Vision
A framework where developers declare:
- **Entities** with their valid states (state machines)
- **Edges** between entities (typed, directional, with cardinality)
- **Transitions** (what state changes are valid, and when)
And the framework provides:
- Auto-generated accessors (both directions)
- Auto-maintained indexes
- Compile-time invalid transition errors
- Runtime referential integrity (fail-fast or transactional)
- Event log as source of truth with replay capability
- Potential automatic concurrency from CPN place-disjointness
## Why
### Correctness Guarantees
- **Compile-time**: Invalid state transitions are type errors
- **Compile-time**: Edge definitions guarantee bidirectional navigability
- **Runtime**: Referential integrity violations detected immediately
- **Result**: "If it compiles and the event log replays, the state is consistent"
### Performance "For Free"
- Indexes auto-maintained as edges are created/destroyed
- No query planning needed - traversal patterns known at compile time
- Potential: CPN place-disjointness → automatic safe concurrency
### Developer Experience
- Declare entities, states, edges, transitions
- Library generates: accessors, inverse indexes, transition methods, consistency checks
- Focus on *what* not *how* - the plumbing disappears
- Still Rust: escape hatch to custom logic when needed
### Testing Burden Reduction
- No tests for "did I update the index correctly"
- No tests for "can I traverse this relationship backwards"
- Focus tests on business logic, not graph bookkeeping
## How
### Foundations
- **Colored Petri Nets** for state machine composition semantics
- **Typestate pattern** for compile-time transition validity
- **Event sourcing** for persistence and replay
### Implementation Approach
Declarative DSL or proc macros for entity/edge/transition definitions:
```rust
// Hypothetical syntax
entity! {
Want {
states: [New, Idle, Building, Successful, Failed, Canceled],
transitions: [
New -> Idle,
New -> Building,
Idle -> Building,
Building -> Successful,
Building -> Failed,
// ...
]
}
}
edge! {
servicing_wants: JobRun -> many Want,
built_by: Partition -> one JobRun,
}
```
Code generation produces:
- Entity structs with state type parameters
- Edge storage with auto-maintained inverses
- Transition methods that enforce valid source states
- Query methods for traversal in both directions
### The Graph Model
- Entities are nodes (with state)
- Edges are typed, directional, with cardinality (one/many)
- Both directions always queryable
- Edge creation/deletion is transactional within a step
### Entry Point
Single `step(event) -> Result<(), StepError>` that:
1. Validates the event against current state
2. Applies state transitions
3. Updates all affected indexes
4. Returns success or rolls back
## Transactionality
### Beyond Fail-Fast
Instead of panicking on consistency violations, support transactional semantics:
```rust
// Infallible (panics on error)
state.step(event);
// Fallible (returns error, state unchanged on failure)
state.try_step(event) -> Result<(), StepError>;
// Explicit transaction (for multi-event atomicity)
let txn = state.begin();
txn.apply(event1)?;
txn.apply(event2)?;
txn.commit(); // or rollback on drop
```
### What This Enables
1. **Local atomicity**: A single event either fully applies or doesn't - no partial states
2. **Distributed coordination**: If `step` can return `Err` instead of panicking:
- Try to apply an event
- If it fails, coordinate with other systems before retrying
- Implement saga patterns, 2PC, etc.
3. **Speculative execution**: "What if I applied this event?" without committing
- Useful for validation, dry-runs, conflict detection
4. **Optimistic concurrency**:
- Multiple workers try to apply events concurrently
- Conflicts detected and rolled back
- Retry with updated state
### Implementation Options
1. **Copy-on-write / snapshot**: Clone state, apply to clone, swap on success
- Simple but memory-heavy for large state
2. **Command pattern / undo log**: Record inverse operations, replay backwards on rollback
- More complex, but efficient for small changes to large state
3. **MVCC-style**: Version all entities, only "commit" versions on success
- Most sophisticated, enables concurrent reads during transaction
## Relationship to Datomic
[Datomic](https://docs.datomic.com/datomic-overview.html) is a distributed database built on similar principles that validates many of these ideas in production:
### Shared Concepts
| Concept | Datomic | This Framework |
|---------|---------|----------------|
| Immutable facts | Datoms (E-A-V-T tuples) | BEL events |
| Time travel | `as-of` queries | Event replay |
| Speculative execution | [`d/with`](https://docs.datomic.com/transactions/transaction-processing.html) | `try_step()` / transactions |
| Atomic commits | `d/transact` = `d/with` + durable swap | `step()` = validate + apply + persist |
| Transaction-time validation | [Transaction functions](https://docs.datomic.com/transactions/transaction-functions.html) with `db-before` | Transition guards |
| Post-transaction validation | [Entity specs](https://docs.datomic.com/transactions/model.html) with `db-after` | Invariant checks |
| Single writer | Transactor serializes all writes | Single `step()` entry point |
| Horizontal read scaling | Peers cache and query locally | Immutable state snapshots |
### Datomic's Speculative Writes
Datomic's `d/with` is particularly relevant - it's a [pure function](https://vvvvalvalval.github.io/posts/2018-11-12-datomic-event-sourcing-without-the-hassle.html) that takes a database value and proposed facts, returning a new database value *without persisting*. This enables:
- Testing transactions without mutation
- Composing transaction data before committing
- [Enforcing invariants](https://stackoverflow.com/questions/48268887/how-to-prevent-transactions-from-violating-application-invariants-in-datomic) by speculatively applying, checking, then committing or aborting
- Development against production data safely (via libraries like Datomock)
### What Datomic Doesn't Provide
- **CPN state machine semantics**: Typed transitions between entity states
- **Compile-time transition validity**: Invalid transitions caught by the type system
- **Auto-generated bidirectional indexes**: Declared edges automatically traversable both ways
- **Rust**: Memory safety, zero-cost abstractions, embeddable
The vision here is essentially: *Datomic's transaction model + CPN state machines + Rust compile-time safety*
## Open Questions
- How to express transition guards (conditions beyond "in state X")?
- How to handle edges to entities that don't exist yet (forward references)?
- Serialization format for the event log?
- How much CPN formalism to expose vs. hide?
- What's the right granularity for "places" in the CPN model?
- How does this interact with async/distributed systems?
## Potential Names
Something evoking: event-sourced + graph + state machines + Rust
- `petri-graph`
- `ironweave` (iron = Rust, weave = connected graph)
- `factforge`
- `datumflow`
## Prior Art to Investigate
- Datomic (Clojure, distributed immutable database)
- Bevy ECS (Rust, entity-component-system with events)
- CPN Tools (Petri net modeling/simulation)
- Diesel / SeaORM (Rust, compile-time SQL checking)
- EventStoreDB (event sourcing infrastructure)
## Next Steps
This document captures the "why" and "how" at a conceptual level. To validate:
1. Prototype the macro/DSL syntax for a simple 2-3 entity system
2. Implement auto-indexed bidirectional edges
3. Implement typestate transitions
4. Add speculative execution (`try_step`)
5. Benchmark against hand-written equivalent
6. Evaluate ergonomics in real use (databuild as first consumer)

View file

@ -0,0 +1,7 @@
# What LLMs Don't Do
- Create and cultivate technical strategy
- Don't have a specific vision of the organizing formalization of the problem + the technical solution
- Adhere to technical strategy
- Please please please just read the relevant docs!

View file

@ -0,0 +1,230 @@
# Detail & Lineage Views
## Vision
Provide rich, navigable views into databuild's execution history that answer operational questions:
- **"What work was done to fulfill this want?"** - The full DAG of partitions built and jobs run
- **"Where did this data come from?"** - Trace a partition's lineage back through its inputs
- **"What downstream data uses this?"** - Understand impact before tainting or debugging staleness
## Three Distinct Views
### 1. Want Fulfillment View
Shows the work tree rooted at a want: all partitions built, jobs run, and derivative wants spawned to fulfill it.
```
W-001 "data/gamma" [Successful]
├── data/gamma [Live, uuid:abc]
│ └── JR-789 [Succeeded]
│ ├── read: data/beta [Live, uuid:def]
│ └── read: data/alpha [Live, uuid:ghi]
└── derivative: W-002 "data/beta" [Successful]
│ └── triggered by: JR-456 dep-miss
└── data/beta [Live, uuid:def]
└── JR-456 [DepMiss → retry → Succeeded]
└── read: data/alpha [Live, uuid:ghi]
```
Key insight: This shows **specific partition instances** (by UUID), not just refs. A want's fulfillment is a concrete snapshot of what was built.
### 2. Partition Lineage View
The data flow graph: partition ↔ job_run alternating. Navigable upstream (inputs) and downstream (consumers).
```
UPSTREAM
┌────────────┼────────────┐
▼ ▼ ▼
[data/a] [data/b] [data/c]
│ │ │
└────────────┼────────────┘
JR-xyz [Succeeded]
══════════════════
║ data/beta ║ ← FOCUS
║ [Live] ║
══════════════════
JR-abc [Running]
┌────────────┼────────────┐
▼ ▼ ▼
[data/x] [data/y] [data/z]
DOWNSTREAM
```
This view answers: "What data flows into/out of this partition?" Click to navigate.
### 3. JobRun Detail View
Not a graph - just the immediate context of a single job execution:
- **Scheduled for**: Which want(s) triggered this job
- **Read**: Input partitions (with UUIDs - the specific versions read)
- **Wrote**: Output partitions (with UUIDs)
- **Status history**: Queued → Running → Succeeded/Failed/DepMiss
- **If DepMiss**: Which derivative wants were spawned
## Data Requirements
### Track read_deps on success
Currently only captured on dep-miss. Need to extend `JobRunSuccessEventV1`:
```protobuf
message JobRunSuccessEventV1 {
string job_run_id = 1;
repeated ReadDeps read_deps = 2; // NEW
}
```
### Inverted consumer index
To answer "what reads this partition", need:
```rust
partition_consumers: BTreeMap<Uuid, Vec<(Uuid, String)>> // input_uuid → (output_uuid, job_run_id)
```
Indexed by UUID (not ref) because partition refs get reused across rebuilds, but UUIDs are immutable per instance. This preserves historical lineage correctly.
Built from read_deps when processing JobRunSuccessEventV1.
## Design Decisions
1. **Retries**: List all job runs triggered by a want, collapsing retries in the UI (expandable)
2. **Lineage UUIDs**: Resolve partition refs to canonical UUIDs at job success time (jobs don't need to know about UUIDs)
3. **High fan-out**: Truncate to N items with "+X more" expansion
4. **Consumer index by UUID**: Index consumers by partition UUID (not ref) since refs get reused across rebuilds but UUIDs are immutable per instance
5. **Job run as lineage source of truth**: Partition details don't duplicate upstream info - they reference their builder job run, which holds the read_deps
## API Response Pattern
Detail and list endpoints return a wrapper with the primary data plus a shared index of related entities:
```protobuf
message GetJobRunResponse {
JobRunDetail data = 1;
RelatedEntities index = 2;
}
message ListJobRunsResponse {
repeated JobRunDetail data = 1;
RelatedEntities index = 2; // shared across all items
}
message RelatedEntities {
map<string, PartitionDetail> partitions = 1;
map<string, JobRunDetail> job_runs = 2;
map<string, WantDetail> wants = 3;
}
```
**Why this pattern:**
- **No recursion** - Detail types stay flat, don't embed each other
- **Deduplication** - Each entity appears once in the index, even if referenced by multiple items in `data`
- **O(1) lookup** - Templates access `index.partitions["data/beta"]` directly
- **Composable** - Same pattern works for single-item and list endpoints
## Implementation Plan
### ✅ Phase 1: Data Model (Complete)
**1.1 Extend JobRunSuccessEventV1**
```protobuf
message JobRunSuccessEventV1 {
string job_run_id = 1;
repeated ReadDeps read_deps = 2; // preserves impacted→read relationships
}
```
**1.2 Extend SucceededState to store resolved UUIDs**
```rust
pub struct SucceededState {
pub succeeded_at: u64,
pub read_deps: Vec<ReadDeps>, // from event
pub read_partition_uuids: BTreeMap<String, Uuid>, // ref → UUID at read time
pub wrote_partition_uuids: BTreeMap<String, Uuid>, // ref → UUID (from building_partitions)
}
```
UUIDs resolved by looking up canonical partitions when processing success event.
**1.3 Add consumer index to BuildState**
```rust
// input_uuid → list of (output_uuid, job_run_id)
partition_consumers: BTreeMap<Uuid, Vec<(Uuid, String)>>
```
Populated from `read_deps` when processing JobRunSuccessEventV1. Uses UUIDs (not refs) to preserve historical lineage across partition rebuilds.
### ✅ Phase 2: API Response Pattern (Complete)
**2.1 RelatedEntities wrapper**
Added `RelatedEntities` message and `index` field to all Get*/List* responses.
**2.2 HasRelatedIds trait**
Implemented trait for Want, JobRun, Partition that returns the IDs of related entities. Query layer uses this to build the index.
**2.3 Query methods**
Added `*_with_index()` methods that collect related IDs via the trait and resolve them to full entity details.
### ✅ Phase 3: Job Integration (Complete)
Jobs already emit `DATABUILD_DEP_READ_JSON` and the full pipeline is wired up:
1. **Job execution** (`job_run.rs`): `SubProcessBackend::poll` parses `DATABUILD_DEP_READ_JSON` lines from stdout and stores in `SubProcessCompleted.read_deps`
2. **Event creation** (`job_run.rs`): `to_event()` creates `JobRunSuccessEventV1` with `read_deps`
3. **Event handling** (`event_handlers.rs`): `handle_job_run_success()` resolves `read_partition_uuids` and `wrote_partition_uuids`, populates `partition_consumers` index
4. **API serialization** (`job_run_state.rs`): `to_detail()` includes `read_deps`, `read_partition_uuids`, `wrote_partition_uuids` in `JobRunDetail`
### ✅ Phase 4: Frontend (Complete)
**4.1 JobRun detail page**
Added to `job_runs/detail.html`:
- "Read Partitions" section showing partition refs with UUIDs (linked to partition detail)
- "Wrote Partitions" section showing partition refs with UUIDs (linked to partition detail)
- "Derivative Wants" section showing wants spawned by dep-miss (linked to want detail)
Extended `JobRunDetailView` with:
- `read_deps: Vec<ReadDepsView>` - impacted→read dependency relationships
- `read_partitions: Vec<PartitionRefWithUuidView>` - input partitions with UUIDs
- `wrote_partitions: Vec<PartitionRefWithUuidView>` - output partitions with UUIDs
- `derivative_want_ids: Vec<String>` - derivative wants from dep-miss
**4.2 Partition detail page**
Added to `partitions/detail.html`:
- "Lineage - Built By" section showing the builder job run (linked to job run detail for upstream lineage)
- "Lineage - Downstream Consumers" section showing UUIDs of downstream partitions
Extended `PartitionDetailView` with:
- `built_by_job_run_id: Option<String>` - job run that built this partition
- `downstream_partition_uuids: Vec<String>` - downstream consumers from index
**4.3 Want detail page**
Added to `wants/detail.html`:
- "Fulfillment - Job Runs" section listing all job runs that serviced this want
- "Fulfillment - Derivative Wants" section listing derivative wants spawned by dep-misses
Extended `WantDetailView` with:
- `job_run_ids: Vec<String>` - all job runs that serviced this want
- `derivative_want_ids: Vec<String>` - derivative wants spawned

View file

@ -3,7 +3,7 @@
"jobs": [
{
"label": "//examples/multihop:job_alpha",
"entrypoint": "./examples/multihop/job_alpha.sh",
"entrypoint": "./job_alpha.sh",
"environment": {
"JOB_NAME": "alpha"
},
@ -11,7 +11,7 @@
},
{
"label": "//examples/multihop:job_beta",
"entrypoint": "./examples/multihop/job_beta.sh",
"entrypoint": "./job_beta.sh",
"environment": {
"JOB_NAME": "beta"
},

View file

@ -10,8 +10,9 @@
"https://bcr.bazel.build/modules/abseil-cpp/20230802.1/MODULE.bazel": "fa92e2eb41a04df73cdabeec37107316f7e5272650f81d6cc096418fe647b915",
"https://bcr.bazel.build/modules/abseil-cpp/20240116.1/MODULE.bazel": "37bcdb4440fbb61df6a1c296ae01b327f19e9bb521f9b8e26ec854b6f97309ed",
"https://bcr.bazel.build/modules/abseil-cpp/20240116.1/source.json": "9be551b8d4e3ef76875c0d744b5d6a504a27e3ae67bc6b28f46415fd2d2957da",
"https://bcr.bazel.build/modules/apple_support/1.17.1/MODULE.bazel": "655c922ab1209978a94ef6ca7d9d43e940cd97d9c172fb55f94d91ac53f8610b",
"https://bcr.bazel.build/modules/apple_support/1.17.1/source.json": "6b2b8c74d14e8d485528a938e44bdb72a5ba17632b9e14ef6e68a5ee96c8347f",
"https://bcr.bazel.build/modules/apple_support/1.23.1/MODULE.bazel": "53763fed456a968cf919b3240427cf3a9d5481ec5466abc9d5dc51bc70087442",
"https://bcr.bazel.build/modules/apple_support/1.24.1/MODULE.bazel": "f46e8ddad60aef170ee92b2f3d00ef66c147ceafea68b6877cb45bd91737f5f8",
"https://bcr.bazel.build/modules/apple_support/1.24.1/source.json": "cf725267cbacc5f028ef13bb77e7f2c2e0066923a4dab1025e4a0511b1ed258a",
"https://bcr.bazel.build/modules/aspect_bazel_lib/2.14.0/MODULE.bazel": "2b31ffcc9bdc8295b2167e07a757dbbc9ac8906e7028e5170a3708cecaac119f",
"https://bcr.bazel.build/modules/aspect_bazel_lib/2.14.0/source.json": "0cf1826853b0bef8b5cd19c0610d717500f5521aa2b38b72b2ec302ac5e7526c",
"https://bcr.bazel.build/modules/aspect_bazel_lib/2.7.2/MODULE.bazel": "780d1a6522b28f5edb7ea09630748720721dfe27690d65a2d33aa7509de77e07",
@ -31,8 +32,11 @@
"https://bcr.bazel.build/modules/bazel_features/1.18.0/MODULE.bazel": "1be0ae2557ab3a72a57aeb31b29be347bcdc5d2b1eb1e70f39e3851a7e97041a",
"https://bcr.bazel.build/modules/bazel_features/1.19.0/MODULE.bazel": "59adcdf28230d220f0067b1f435b8537dd033bfff8db21335ef9217919c7fb58",
"https://bcr.bazel.build/modules/bazel_features/1.21.0/MODULE.bazel": "675642261665d8eea09989aa3b8afb5c37627f1be178382c320d1b46afba5e3b",
"https://bcr.bazel.build/modules/bazel_features/1.27.0/MODULE.bazel": "621eeee06c4458a9121d1f104efb80f39d34deff4984e778359c60eaf1a8cb65",
"https://bcr.bazel.build/modules/bazel_features/1.28.0/MODULE.bazel": "4b4200e6cbf8fa335b2c3f43e1d6ef3e240319c33d43d60cc0fbd4b87ece299d",
"https://bcr.bazel.build/modules/bazel_features/1.30.0/MODULE.bazel": "a14b62d05969a293b80257e72e597c2da7f717e1e69fa8b339703ed6731bec87",
"https://bcr.bazel.build/modules/bazel_features/1.30.0/source.json": "b07e17f067fe4f69f90b03b36ef1e08fe0d1f3cac254c1241a1818773e3423bc",
"https://bcr.bazel.build/modules/bazel_features/1.32.0/MODULE.bazel": "095d67022a58cb20f7e20e1aefecfa65257a222c18a938e2914fd257b5f1ccdc",
"https://bcr.bazel.build/modules/bazel_features/1.32.0/source.json": "2546c766986a6541f0bacd3e8542a1f621e2b14a80ea9e88c6f89f7eedf64ae1",
"https://bcr.bazel.build/modules/bazel_features/1.4.1/MODULE.bazel": "e45b6bb2350aff3e442ae1111c555e27eac1d915e77775f6fdc4b351b758b5d7",
"https://bcr.bazel.build/modules/bazel_features/1.9.0/MODULE.bazel": "885151d58d90d8d9c811eb75e3288c11f850e1d6b481a8c9f766adee4712358b",
"https://bcr.bazel.build/modules/bazel_features/1.9.1/MODULE.bazel": "8f679097876a9b609ad1f60249c49d68bfab783dd9be012faf9d82547b14815a",
@ -48,7 +52,8 @@
"https://bcr.bazel.build/modules/bazel_skylib/1.7.0/MODULE.bazel": "0db596f4563de7938de764cc8deeabec291f55e8ec15299718b93c4423e9796d",
"https://bcr.bazel.build/modules/bazel_skylib/1.7.1/MODULE.bazel": "3120d80c5861aa616222ec015332e5f8d3171e062e3e804a2a0253e1be26e59b",
"https://bcr.bazel.build/modules/bazel_skylib/1.8.1/MODULE.bazel": "88ade7293becda963e0e3ea33e7d54d3425127e0a326e0d17da085a5f1f03ff6",
"https://bcr.bazel.build/modules/bazel_skylib/1.8.1/source.json": "7ebaefba0b03efe59cac88ed5bbc67bcf59a3eff33af937345ede2a38b2d368a",
"https://bcr.bazel.build/modules/bazel_skylib/1.8.2/MODULE.bazel": "69ad6927098316848b34a9142bcc975e018ba27f08c4ff403f50c1b6e646ca67",
"https://bcr.bazel.build/modules/bazel_skylib/1.8.2/source.json": "34a3c8bcf233b835eb74be9d628899bb32999d3e0eadef1947a0a562a2b16ffb",
"https://bcr.bazel.build/modules/buildozer/7.1.2/MODULE.bazel": "2e8dd40ede9c454042645fd8d8d0cd1527966aa5c919de86661e62953cd73d84",
"https://bcr.bazel.build/modules/buildozer/7.1.2/source.json": "c9028a501d2db85793a6996205c8de120944f50a0d570438fcae0457a5f9d1f8",
"https://bcr.bazel.build/modules/google_benchmark/1.8.2/MODULE.bazel": "a70cf1bba851000ba93b58ae2f6d76490a9feb74192e57ab8e8ff13c34ec50cb",
@ -61,13 +66,14 @@
"https://bcr.bazel.build/modules/libpfm/4.11.0/MODULE.bazel": "45061ff025b301940f1e30d2c16bea596c25b176c8b6b3087e92615adbd52902",
"https://bcr.bazel.build/modules/platforms/0.0.10/MODULE.bazel": "8cb8efaf200bdeb2150d93e162c40f388529a25852b332cec879373771e48ed5",
"https://bcr.bazel.build/modules/platforms/0.0.11/MODULE.bazel": "0daefc49732e227caa8bfa834d65dc52e8cc18a2faf80df25e8caea151a9413f",
"https://bcr.bazel.build/modules/platforms/0.0.11/source.json": "f7e188b79ebedebfe75e9e1d098b8845226c7992b307e28e1496f23112e8fc29",
"https://bcr.bazel.build/modules/platforms/0.0.4/MODULE.bazel": "9b328e31ee156f53f3c416a64f8491f7eb731742655a47c9eec4703a71644aee",
"https://bcr.bazel.build/modules/platforms/0.0.5/MODULE.bazel": "5733b54ea419d5eaf7997054bb55f6a1d0b5ff8aedf0176fef9eea44f3acda37",
"https://bcr.bazel.build/modules/platforms/0.0.6/MODULE.bazel": "ad6eeef431dc52aefd2d77ed20a4b353f8ebf0f4ecdd26a807d2da5aa8cd0615",
"https://bcr.bazel.build/modules/platforms/0.0.7/MODULE.bazel": "72fd4a0ede9ee5c021f6a8dd92b503e089f46c227ba2813ff183b71616034814",
"https://bcr.bazel.build/modules/platforms/0.0.8/MODULE.bazel": "9f142c03e348f6d263719f5074b21ef3adf0b139ee4c5133e2aa35664da9eb2d",
"https://bcr.bazel.build/modules/platforms/0.0.9/MODULE.bazel": "4a87a60c927b56ddd67db50c89acaa62f4ce2a1d2149ccb63ffd871d5ce29ebc",
"https://bcr.bazel.build/modules/platforms/1.0.0/MODULE.bazel": "f05feb42b48f1b3c225e4ccf351f367be0371411a803198ec34a389fb22aa580",
"https://bcr.bazel.build/modules/platforms/1.0.0/source.json": "f4ff1fd412e0246fd38c82328eb209130ead81d62dcd5a9e40910f867f733d96",
"https://bcr.bazel.build/modules/protobuf/21.7/MODULE.bazel": "a5a29bb89544f9b97edce05642fac225a808b5b7be74038ea3640fae2f8e66a7",
"https://bcr.bazel.build/modules/protobuf/27.0/MODULE.bazel": "7873b60be88844a0a1d8f80b9d5d20cfbd8495a689b8763e76c6372998d3f64c",
"https://bcr.bazel.build/modules/protobuf/27.1/MODULE.bazel": "703a7b614728bb06647f965264967a8ef1c39e09e8f167b3ca0bb1fd80449c0d",
@ -93,7 +99,9 @@
"https://bcr.bazel.build/modules/rules_cc/0.0.8/MODULE.bazel": "964c85c82cfeb6f3855e6a07054fdb159aced38e99a5eecf7bce9d53990afa3e",
"https://bcr.bazel.build/modules/rules_cc/0.0.9/MODULE.bazel": "836e76439f354b89afe6a911a7adf59a6b2518fafb174483ad78a2a2fde7b1c5",
"https://bcr.bazel.build/modules/rules_cc/0.1.1/MODULE.bazel": "2f0222a6f229f0bf44cd711dc13c858dad98c62d52bd51d8fc3a764a83125513",
"https://bcr.bazel.build/modules/rules_cc/0.1.1/source.json": "d61627377bd7dd1da4652063e368d9366fc9a73920bfa396798ad92172cf645c",
"https://bcr.bazel.build/modules/rules_cc/0.2.4/MODULE.bazel": "1ff1223dfd24f3ecf8f028446d4a27608aa43c3f41e346d22838a4223980b8cc",
"https://bcr.bazel.build/modules/rules_cc/0.2.8/MODULE.bazel": "f1df20f0bf22c28192a794f29b501ee2018fa37a3862a1a2132ae2940a23a642",
"https://bcr.bazel.build/modules/rules_cc/0.2.8/source.json": "85087982aca15f31307bd52698316b28faa31bd2c3095a41f456afec0131344c",
"https://bcr.bazel.build/modules/rules_foreign_cc/0.9.0/MODULE.bazel": "c9e8c682bf75b0e7c704166d79b599f93b72cfca5ad7477df596947891feeef6",
"https://bcr.bazel.build/modules/rules_fuzzing/0.5.2/MODULE.bazel": "40c97d1144356f52905566c55811f13b299453a14ac7769dfba2ac38192337a8",
"https://bcr.bazel.build/modules/rules_fuzzing/0.5.2/source.json": "c8b1e2c717646f1702290959a3302a178fb639d987ab61d548105019f11e527e",
@ -108,8 +116,8 @@
"https://bcr.bazel.build/modules/rules_java/7.2.0/MODULE.bazel": "06c0334c9be61e6cef2c8c84a7800cef502063269a5af25ceb100b192453d4ab",
"https://bcr.bazel.build/modules/rules_java/7.3.2/MODULE.bazel": "50dece891cfdf1741ea230d001aa9c14398062f2b7c066470accace78e412bc2",
"https://bcr.bazel.build/modules/rules_java/7.6.1/MODULE.bazel": "2f14b7e8a1aa2f67ae92bc69d1ec0fa8d9f827c4e17ff5e5f02e91caa3b2d0fe",
"https://bcr.bazel.build/modules/rules_java/8.12.0/MODULE.bazel": "8e6590b961f2defdfc2811c089c75716cb2f06c8a4edeb9a8d85eaa64ee2a761",
"https://bcr.bazel.build/modules/rules_java/8.12.0/source.json": "cbd5d55d9d38d4008a7d00bee5b5a5a4b6031fcd4a56515c9accbcd42c7be2ba",
"https://bcr.bazel.build/modules/rules_java/8.14.0/MODULE.bazel": "717717ed40cc69994596a45aec6ea78135ea434b8402fb91b009b9151dd65615",
"https://bcr.bazel.build/modules/rules_java/8.14.0/source.json": "8a88c4ca9e8759da53cddc88123880565c520503321e2566b4e33d0287a3d4bc",
"https://bcr.bazel.build/modules/rules_java/8.3.2/MODULE.bazel": "7336d5511ad5af0b8615fdc7477535a2e4e723a357b6713af439fe8cf0195017",
"https://bcr.bazel.build/modules/rules_java/8.5.1/MODULE.bazel": "d8a9e38cc5228881f7055a6079f6f7821a073df3744d441978e7a43e20226939",
"https://bcr.bazel.build/modules/rules_jvm_external/4.4.2/MODULE.bazel": "a56b85e418c83eb1839819f0b515c431010160383306d13ec21959ac412d2fe7",
@ -149,12 +157,11 @@
"https://bcr.bazel.build/modules/rules_python/1.3.0/MODULE.bazel": "8361d57eafb67c09b75bf4bbe6be360e1b8f4f18118ab48037f2bd50aa2ccb13",
"https://bcr.bazel.build/modules/rules_python/1.5.1/MODULE.bazel": "acfe65880942d44a69129d4c5c3122d57baaf3edf58ae5a6bd4edea114906bf5",
"https://bcr.bazel.build/modules/rules_python/1.5.1/source.json": "aa903e1bcbdfa1580f2b8e2d55100b7c18bc92d779ebb507fec896c75635f7bd",
"https://bcr.bazel.build/modules/rules_rust/0.61.0/MODULE.bazel": "0318a95777b9114c8740f34b60d6d68f9cfef61e2f4b52424ca626213d33787b",
"https://bcr.bazel.build/modules/rules_rust/0.61.0/source.json": "d1bc743b5fa2e2abb35c436df7126a53dab0c3f35890ae6841592b2253786a63",
"https://bcr.bazel.build/modules/rules_rust/0.67.0/MODULE.bazel": "87c3816c4321352dcfd9e9e26b58e84efc5b21351ae3ef8fb5d0d57bde7237f5",
"https://bcr.bazel.build/modules/rules_rust/0.67.0/source.json": "a8ef4d3be30eb98e060cad9e5875a55b603195487f76e01b619b51a1df4641cc",
"https://bcr.bazel.build/modules/rules_shell/0.2.0/MODULE.bazel": "fda8a652ab3c7d8fee214de05e7a9916d8b28082234e8d2c0094505c5268ed3c",
"https://bcr.bazel.build/modules/rules_shell/0.3.0/MODULE.bazel": "de4402cd12f4cc8fda2354fce179fdb068c0b9ca1ec2d2b17b3e21b24c1a937b",
"https://bcr.bazel.build/modules/rules_shell/0.4.0/MODULE.bazel": "0f8f11bb3cd11755f0b48c1de0bbcf62b4b34421023aa41a2fc74ef68d9584f0",
"https://bcr.bazel.build/modules/rules_shell/0.4.0/source.json": "1d7fa7f941cd41dc2704ba5b4edc2e2230eea1cc600d80bd2b65838204c50b95",
"https://bcr.bazel.build/modules/rules_shell/0.6.1/MODULE.bazel": "72e76b0eea4e81611ef5452aa82b3da34caca0c8b7b5c0c9584338aa93bae26b",
"https://bcr.bazel.build/modules/rules_shell/0.6.1/source.json": "20ec05cd5e592055e214b2da8ccb283c7f2a421ea0dc2acbf1aa792e11c03d0c",
"https://bcr.bazel.build/modules/stardoc/0.5.1/MODULE.bazel": "1a05d92974d0c122f5ccf09291442580317cdd859f07a8655f1db9a60374f9f8",
"https://bcr.bazel.build/modules/stardoc/0.5.3/MODULE.bazel": "c7f6948dae6999bf0db32c1858ae345f112cacf98f174c7a8bb707e41b974f1c",
"https://bcr.bazel.build/modules/stardoc/0.5.4/MODULE.bazel": "6569966df04610b8520957cb8e97cf2e9faac2c0309657c537ab51c16c18a2a4",