Compare commits

...

5 commits

Author SHA1 Message Date
f531730a6b the battle is done... for now
Some checks are pending
/ setup (push) Waiting to run
2025-11-25 23:31:46 +08:00
14a24ef6d6 refactor build_state.rs into module 2025-11-25 14:55:43 +08:00
e7aac32607 add logo 2025-11-25 14:26:25 +08:00
efed281a5a correct wants docs 2025-11-25 14:26:21 +08:00
042526ea8c add multihop example script 2025-11-25 14:26:15 +08:00
19 changed files with 4273 additions and 2391 deletions

View file

@ -109,6 +109,15 @@ crate.spec(
package = "toml",
version = "0.8",
)
crate.spec(
features = ["urlencode"],
package = "askama",
version = "0.14",
)
crate.spec(
package = "urlencoding",
version = "2.1",
)
crate.from_specs()
use_repo(crate, "crates")

File diff suppressed because one or more lines are too long

6
assets/logo.svg Normal file
View file

@ -0,0 +1,6 @@
<svg width="243" height="215" viewBox="0 0 243 215" fill="none" xmlns="http://www.w3.org/2000/svg">
<path d="M123.5 77L149.048 121.25H97.9523L123.5 77Z" fill="#F2994A"/>
<path d="M224.772 125.035L155.772 125.035L109.52 45.3147L86.7722 45.3147L40.2722 124.463L16.7725 124.463" stroke="#333333" stroke-width="20"/>
<path d="M86.6196 5.18886L121.12 64.9444L75.2062 144.86L86.58 164.56L178.375 165.256L190.125 185.608" stroke="#333333" stroke-width="20"/>
<path d="M51.966 184.847L86.4659 125.092L178.632 124.896L190.006 105.196L144.711 25.3514L156.461 5.00002" stroke="#333333" stroke-width="20"/>
</svg>

After

Width:  |  Height:  |  Size: 602 B

View file

@ -17,14 +17,18 @@ rust_binary(
)
# DataBuild library using generated prost code
# Note: Templates are embedded inline in web/templates.rs using Askama's in_doc feature
# because Bazel's sandbox doesn't support file-based Askama templates properly.
rust_library(
name = "lib",
srcs = glob(["**/*.rs"]) + [
":generate_databuild_rust",
],
crate_root = "lib.rs",
edition = "2021",
visibility = ["//visibility:public"],
deps = [
"@crates//:askama",
"@crates//:axum",
"@crates//:prost",
"@crates//:prost-types",
@ -33,13 +37,14 @@ rust_library(
"@crates//:schemars",
"@crates//:serde",
"@crates//:serde_json",
"@crates//:sha2",
"@crates//:tokio",
"@crates//:toml",
"@crates//:tower",
"@crates//:tower-http",
"@crates//:tracing",
"@crates//:urlencoding",
"@crates//:uuid",
"@crates//:sha2",
],
)

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,188 @@
//! Build State - the heart of databuild's orchestration system
//!
//! The BuildState struct tracks all application state, defines valid state transitions,
//! and manages cross-state machine state transitions (e.g. job run success resulting
//! in partition going from Building to Live).
//!
//! See docs/design/build-state-semantics.md for the full conceptual model.
mod event_handlers;
mod partition_transitions;
mod queries;
mod schedulability;
mod want_transitions;
use crate::job_run_state::JobRun;
use crate::partition_state::Partition;
use crate::want_state::Want;
use crate::{PartitionRef, TaintDetail};
use std::collections::BTreeMap;
use uuid::Uuid;
// Re-export public types
pub use schedulability::{WantSchedulability, WantUpstreamStatus, WantsSchedulability};
/**
Design Notes
The build state struct is the heart of the service and orchestrator, adapting build events to
higher level questions about build state. One temptation is to implement the build state as a set
of hierarchically defined reducers, to achieve information hiding and factor system capabilities and
state tracking simply. Unfortunately, to update state based on an event, you need a mutable borrow
of some part of the build state (that the reducer controls, for instance), and an immutable borrow
of the whole state for read/query purposes. The whole state needs to be available to handle state
updates like "this is the list of currently active job runs" in response to a job run event. Put
simply, this isn't possible without introducing some locking of the whole state and mutable state
subset, since they would conflict (the mutable subset would have already been borrowed, so can't
be borrowed immutably as part of the whole state borrow). You might also define a "query" phase
in which reducers query the state based on the received event, but that just increases complexity.
Instead, databuild opts for an entity-component system (ECS) that just provides the whole build
state mutably to all state update functionality, trusting that we know how to use it responsibly.
This means no boxing or "query phase", and means we can have all state updates happen as map lookups
and updates, which is exceptionally fast. The states of the different entities are managed by state
machines, in a pseudo-colored-petri-net style (only pseudo because we haven't formalized it). It is
critical that these state machines, their states, and their transitions are type-safe.
*/
/// Tracks all application state, defines valid state transitions, and manages cross-state machine
/// state transitions (e.g. job run success resulting in partition going from Building to Live)
#[derive(Debug, Clone, Default)]
pub struct BuildState {
// Core entity storage
pub(crate) wants: BTreeMap<String, Want>,
pub(crate) taints: BTreeMap<String, TaintDetail>,
pub(crate) job_runs: BTreeMap<String, JobRun>,
// UUID-based partition indexing
pub(crate) partitions_by_uuid: BTreeMap<Uuid, Partition>,
pub(crate) canonical_partitions: BTreeMap<String, Uuid>, // partition ref → current UUID
// Inverted indexes
pub(crate) wants_for_partition: BTreeMap<String, Vec<String>>, // partition ref → want_ids
pub(crate) downstream_waiting: BTreeMap<String, Vec<Uuid>>, // upstream ref → partition UUIDs waiting for it
}
impl BuildState {
/// Reconstruct BuildState from a sequence of events (for read path in web server)
/// This allows the web server to rebuild state from BEL storage without holding a lock
pub fn from_events(events: &[crate::DataBuildEvent]) -> Self {
let mut state = BuildState::default();
for event in events {
if let Some(ref inner_event) = event.event {
// handle_event returns Vec<Event> for cascading events, but we ignore them
// since we're replaying from a complete event log
state.handle_event(inner_event);
}
}
state
}
pub fn count_job_runs(&self) -> usize {
self.job_runs.len()
}
// ===== UUID-based partition access methods =====
/// Get the canonical partition for a ref (the current/active partition instance)
pub fn get_canonical_partition(&self, partition_ref: &str) -> Option<&Partition> {
self.canonical_partitions
.get(partition_ref)
.and_then(|uuid| self.partitions_by_uuid.get(uuid))
}
/// Get the canonical partition UUID for a ref
pub fn get_canonical_partition_uuid(&self, partition_ref: &str) -> Option<Uuid> {
self.canonical_partitions.get(partition_ref).copied()
}
/// Get a partition by its UUID
pub fn get_partition_by_uuid(&self, uuid: Uuid) -> Option<&Partition> {
self.partitions_by_uuid.get(&uuid)
}
/// Take the canonical partition for a ref (removes from partitions_by_uuid for state transition)
/// The canonical_partitions mapping is NOT removed - caller must update it if creating a new partition
pub(crate) fn take_canonical_partition(&mut self, partition_ref: &str) -> Option<Partition> {
self.canonical_partitions
.get(partition_ref)
.copied()
.and_then(|uuid| self.partitions_by_uuid.remove(&uuid))
}
/// Get want IDs for a partition ref (from inverted index)
pub fn get_wants_for_partition(&self, partition_ref: &str) -> &[String] {
self.wants_for_partition
.get(partition_ref)
.map(|v| v.as_slice())
.unwrap_or(&[])
}
/// Register a want in the wants_for_partition inverted index
pub(crate) fn register_want_for_partitions(
&mut self,
want_id: &str,
partition_refs: &[PartitionRef],
) {
for pref in partition_refs {
let want_ids = self
.wants_for_partition
.entry(pref.r#ref.clone())
.or_insert_with(Vec::new);
if !want_ids.contains(&want_id.to_string()) {
want_ids.push(want_id.to_string());
}
}
}
/// Update a partition in the indexes (after state transition)
pub(crate) fn update_partition(&mut self, partition: Partition) {
let uuid = partition.uuid();
self.partitions_by_uuid.insert(uuid, partition);
}
// Test helpers
pub(crate) fn with_wants(self, wants: BTreeMap<String, Want>) -> Self {
Self { wants, ..self }
}
#[cfg(test)]
pub(crate) fn with_partitions(
self,
old_partitions: BTreeMap<String, crate::PartitionDetail>,
) -> Self {
use crate::partition_state::PartitionWithState;
let mut canonical_partitions: BTreeMap<String, Uuid> = BTreeMap::new();
let mut partitions_by_uuid: BTreeMap<Uuid, Partition> = BTreeMap::new();
// Convert PartitionDetail to Live partitions for testing
for (key, detail) in old_partitions {
let partition_ref = detail.r#ref.clone().unwrap_or_default();
// Create a deterministic UUID for test data
let uuid =
crate::partition_state::derive_partition_uuid("test_job_run", &partition_ref.r#ref);
let live_partition = Partition::Live(PartitionWithState {
uuid,
partition_ref,
state: crate::partition_state::LiveState {
built_at: 0,
built_by: "test_job_run".to_string(),
},
});
canonical_partitions.insert(key, uuid);
partitions_by_uuid.insert(uuid, live_partition);
}
Self {
canonical_partitions,
partitions_by_uuid,
..self
}
}
}
pub(crate) mod consts {
pub const DEFAULT_PAGE_SIZE: u64 = 100;
}

View file

@ -0,0 +1,345 @@
//! Partition state transition logic
//!
//! Methods for transitioning partitions between states (Building, Live, Failed,
//! UpstreamBuilding, UpForRetry, UpstreamFailed) and managing downstream dependencies.
use crate::PartitionRef;
use crate::partition_state::{
BuildingPartitionRef, BuildingState, FailedPartitionRef, LivePartitionRef, Partition,
PartitionWithState,
};
use crate::util::current_timestamp;
use uuid::Uuid;
use super::BuildState;
impl BuildState {
/// Create a new partition in Building state and update indexes
pub(crate) fn create_partition_building(
&mut self,
job_run_id: &str,
partition_ref: PartitionRef,
) -> Uuid {
let partition =
PartitionWithState::<BuildingState>::new(job_run_id.to_string(), partition_ref.clone());
let uuid = partition.uuid;
// Update indexes
self.partitions_by_uuid
.insert(uuid, Partition::Building(partition));
self.canonical_partitions
.insert(partition_ref.r#ref.clone(), uuid);
tracing::info!(
partition = %partition_ref.r#ref,
uuid = %uuid,
job_run_id = %job_run_id,
"Partition: Created in Building state"
);
uuid
}
/// Create partitions in Building state
/// Used when a job run starts building partitions.
/// Note: Partitions no longer have a Missing state - they start directly as Building.
pub(crate) fn transition_partitions_to_building(
&mut self,
partition_refs: &[BuildingPartitionRef],
job_run_id: &str,
) {
for building_ref in partition_refs {
if let Some(partition) = self.get_canonical_partition(&building_ref.0.r#ref).cloned() {
// Partition already exists - this is an error unless we're retrying from UpForRetry
match partition {
Partition::UpForRetry(_) => {
// Valid: UpForRetry -> Building (retry after deps satisfied)
// Old partition stays in partitions_by_uuid as historical record
// Create new Building partition with fresh UUID
let uuid =
self.create_partition_building(job_run_id, building_ref.0.clone());
tracing::info!(
partition = %building_ref.0.r#ref,
job_run_id = %job_run_id,
uuid = %uuid,
"Partition: UpForRetry → Building (retry)"
);
}
_ => {
panic!(
"BUG: Invalid state - partition {} cannot start building from state {:?}",
building_ref.0.r#ref, partition
)
}
}
} else {
// Partition doesn't exist yet - create directly in Building state
let uuid = self.create_partition_building(job_run_id, building_ref.0.clone());
tracing::info!(
partition = %building_ref.0.r#ref,
job_run_id = %job_run_id,
uuid = %uuid,
"Partition: (new) → Building"
);
}
}
}
/// Transition partitions from Building to Live state
/// Used when a job run successfully completes
pub(crate) fn transition_partitions_to_live(
&mut self,
partition_refs: &[LivePartitionRef],
job_run_id: &str,
timestamp: u64,
) {
for pref in partition_refs {
let partition = self
.take_canonical_partition(&pref.0.r#ref)
.expect(&format!(
"BUG: Partition {} must exist and be in Building state before completion",
pref.0.r#ref
));
// ONLY valid transition: Building -> Live
let transitioned = match partition {
Partition::Building(building) => {
tracing::info!(
partition = %pref.0.r#ref,
job_run_id = %job_run_id,
"Partition: Building → Live"
);
Partition::Live(building.complete(timestamp))
}
// All other states are invalid
_ => {
panic!(
"BUG: Invalid state - partition {} must be Building to transition to Live, found {:?}",
pref.0.r#ref, partition
)
}
};
self.update_partition(transitioned);
}
}
/// Transition partitions from Building to Failed state
/// Used when a job run fails
pub(crate) fn transition_partitions_to_failed(
&mut self,
partition_refs: &[FailedPartitionRef],
job_run_id: &str,
timestamp: u64,
) {
for pref in partition_refs {
let partition = self
.take_canonical_partition(&pref.0.r#ref)
.expect(&format!(
"BUG: Partition {} must exist and be in Building state before failure",
pref.0.r#ref
));
// ONLY valid transition: Building -> Failed
let transitioned = match partition {
Partition::Building(building) => {
tracing::info!(
partition = %pref.0.r#ref,
job_run_id = %job_run_id,
"Partition: Building → Failed"
);
Partition::Failed(building.fail(timestamp))
}
// All other states are invalid
_ => {
panic!(
"BUG: Invalid state - partition {} must be Building to transition to Failed, found {:?}",
pref.0.r#ref, partition
)
}
};
self.update_partition(transitioned);
}
}
/// Transition partitions from Building to UpstreamBuilding state
/// Used when a job run encounters missing dependencies and cannot proceed.
/// The partition waits for its upstream deps to be built before becoming UpForRetry.
pub(crate) fn transition_partitions_to_upstream_building(
&mut self,
partition_refs: &[BuildingPartitionRef],
missing_deps: Vec<PartitionRef>,
) {
for building_ref in partition_refs {
let partition = self
.take_canonical_partition(&building_ref.0.r#ref)
.expect(&format!(
"BUG: Partition {} must exist and be in Building state during dep_miss",
building_ref.0.r#ref
));
// Only valid transition: Building -> UpstreamBuilding
let transitioned = match partition {
Partition::Building(building) => {
let partition_uuid = building.uuid;
tracing::info!(
partition = %building_ref.0.r#ref,
uuid = %partition_uuid,
missing_deps = ?missing_deps.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
"Partition: Building → UpstreamBuilding (dep miss)"
);
// Update downstream_waiting index: for each missing dep, record that this partition is waiting
for missing_dep in &missing_deps {
self.downstream_waiting
.entry(missing_dep.r#ref.clone())
.or_default()
.push(partition_uuid);
}
Partition::UpstreamBuilding(building.dep_miss(missing_deps.clone()))
}
// All other states are invalid
_ => {
panic!(
"BUG: Invalid state - partition {} must be Building during dep_miss, found {:?}",
building_ref.0.r#ref, partition
)
}
};
self.update_partition(transitioned);
}
}
/// Transition partitions from UpstreamBuilding to UpForRetry when their upstream deps become Live.
/// This should be called when partitions become Live to check if any downstream partitions can now retry.
/// Uses the `downstream_waiting` index for O(1) lookup of affected partitions.
pub(crate) fn unblock_downstream_partitions(
&mut self,
newly_live_partition_refs: &[LivePartitionRef],
) {
// Collect UUIDs of partitions that might be unblocked using the inverted index
let mut uuids_to_check: Vec<Uuid> = Vec::new();
for live_ref in newly_live_partition_refs {
if let Some(waiting_uuids) = self.downstream_waiting.get(&live_ref.0.r#ref) {
uuids_to_check.extend(waiting_uuids.iter().cloned());
}
}
// Deduplicate UUIDs (a partition might be waiting for multiple deps that all became live)
uuids_to_check.sort();
uuids_to_check.dedup();
for uuid in uuids_to_check {
// Get partition by UUID - it might have been transitioned already or no longer exist
let Some(partition) = self.partitions_by_uuid.get(&uuid).cloned() else {
continue;
};
let partition_ref = partition.partition_ref().r#ref.clone();
// Only process UpstreamBuilding partitions
if let Partition::UpstreamBuilding(mut upstream_building) = partition {
// Remove satisfied deps from missing_deps
for live_ref in newly_live_partition_refs {
upstream_building
.state
.missing_deps
.retain(|d| d.r#ref != live_ref.0.r#ref);
// Also remove from downstream_waiting index
if let Some(waiting) = self.downstream_waiting.get_mut(&live_ref.0.r#ref) {
waiting.retain(|u| *u != uuid);
}
}
let transitioned = if upstream_building.state.missing_deps.is_empty() {
// All deps satisfied, transition to UpForRetry
tracing::info!(
partition = %partition_ref,
uuid = %uuid,
"Partition: UpstreamBuilding → UpForRetry (all upstreams satisfied)"
);
Partition::UpForRetry(upstream_building.upstreams_satisfied())
} else {
// Still waiting for more deps
tracing::debug!(
partition = %partition_ref,
uuid = %uuid,
remaining_deps = ?upstream_building.state.missing_deps.iter().map(|d| &d.r#ref).collect::<Vec<_>>(),
"Partition remains in UpstreamBuilding (still waiting for deps)"
);
Partition::UpstreamBuilding(upstream_building)
};
self.update_partition(transitioned);
}
}
}
/// Cascade failures to downstream partitions when their upstream dependencies fail.
/// Transitions UpstreamBuilding → UpstreamFailed for partitions waiting on failed upstreams.
/// Uses the `downstream_waiting` index for O(1) lookup of affected partitions.
pub(crate) fn cascade_failures_to_downstream_partitions(
&mut self,
failed_partition_refs: &[FailedPartitionRef],
) {
// Collect UUIDs of partitions that are waiting for the failed partitions
let mut uuids_to_fail: Vec<Uuid> = Vec::new();
for failed_ref in failed_partition_refs {
if let Some(waiting_uuids) = self.downstream_waiting.get(&failed_ref.0.r#ref) {
uuids_to_fail.extend(waiting_uuids.iter().cloned());
}
}
// Deduplicate UUIDs
uuids_to_fail.sort();
uuids_to_fail.dedup();
for uuid in uuids_to_fail {
// Get partition by UUID
let Some(partition) = self.partitions_by_uuid.get(&uuid).cloned() else {
continue;
};
let partition_ref = partition.partition_ref().r#ref.clone();
// Only process UpstreamBuilding partitions
if let Partition::UpstreamBuilding(upstream_building) = partition {
// Collect which upstream refs failed
let failed_upstream_refs: Vec<PartitionRef> = failed_partition_refs
.iter()
.filter(|f| {
upstream_building
.state
.missing_deps
.iter()
.any(|d| d.r#ref == f.0.r#ref)
})
.map(|f| f.0.clone())
.collect();
if !failed_upstream_refs.is_empty() {
tracing::info!(
partition = %partition_ref,
uuid = %uuid,
failed_upstreams = ?failed_upstream_refs.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
"Partition: UpstreamBuilding → UpstreamFailed (upstream failed)"
);
// Remove from downstream_waiting index for all deps
for dep in &upstream_building.state.missing_deps {
if let Some(waiting) = self.downstream_waiting.get_mut(&dep.r#ref) {
waiting.retain(|u| *u != uuid);
}
}
// Transition to UpstreamFailed
let transitioned = Partition::UpstreamFailed(
upstream_building
.upstream_failed(failed_upstream_refs, current_timestamp()),
);
self.update_partition(transitioned);
}
}
}
}
}

View file

@ -0,0 +1,118 @@
//! Query methods for BuildState
//!
//! Read-only methods for accessing state (get_*, list_*) used by the API layer.
use crate::{
JobRunDetail, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest,
ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest,
ListWantsResponse, PartitionDetail, TaintDetail, WantDetail,
};
use std::collections::BTreeMap;
use super::{BuildState, consts};
impl BuildState {
pub fn get_want(&self, want_id: &str) -> Option<WantDetail> {
self.wants.get(want_id).map(|w| w.to_detail())
}
pub fn get_taint(&self, taint_id: &str) -> Option<TaintDetail> {
self.taints.get(taint_id).cloned()
}
pub fn get_partition(&self, partition_id: &str) -> Option<PartitionDetail> {
self.get_canonical_partition(partition_id)
.map(|p| p.to_detail())
}
pub fn get_job_run(&self, job_run_id: &str) -> Option<JobRunDetail> {
self.job_runs.get(job_run_id).map(|jr| jr.to_detail())
}
pub fn list_wants(&self, request: &ListWantsRequest) -> ListWantsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
let start = page * page_size;
// Paginate first, then convert only the needed wants to WantDetail
let data: Vec<WantDetail> = self
.wants
.values()
.skip(start as usize)
.take(page_size as usize)
.map(|w| w.to_detail())
.collect();
ListWantsResponse {
data,
match_count: self.wants.len() as u64,
page,
page_size,
}
}
pub fn list_taints(&self, request: &ListTaintsRequest) -> ListTaintsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
ListTaintsResponse {
data: list_state_items(&self.taints, page, page_size),
match_count: self.wants.len() as u64,
page,
page_size,
}
}
pub fn list_partitions(&self, request: &ListPartitionsRequest) -> ListPartitionsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
// Convert canonical partitions to PartitionDetail for API
let partition_details: BTreeMap<String, PartitionDetail> = self
.canonical_partitions
.iter()
.filter_map(|(k, uuid)| {
self.partitions_by_uuid
.get(uuid)
.map(|p| (k.clone(), p.to_detail()))
})
.collect();
ListPartitionsResponse {
data: list_state_items(&partition_details, page, page_size),
match_count: self.canonical_partitions.len() as u64,
page,
page_size,
}
}
pub fn list_job_runs(&self, request: &ListJobRunsRequest) -> ListJobRunsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
let start = page * page_size;
let data: Vec<JobRunDetail> = self
.job_runs
.values()
.skip(start as usize)
.take(page_size as usize)
.map(|jr| jr.to_detail())
.collect();
ListJobRunsResponse {
data,
match_count: self.job_runs.len() as u64,
page,
page_size,
}
}
}
fn list_state_items<T: Clone>(map: &BTreeMap<String, T>, page: u64, page_size: u64) -> Vec<T> {
// TODO when we add filtering, can we add it generically via some trait or filter object that can be provided?
let start = page * page_size;
let end = start + page_size;
map.values()
.skip(start as usize)
.take(end as usize)
.cloned()
.collect()
}

View file

@ -0,0 +1,176 @@
//! Want schedulability logic
//!
//! Types and methods for determining whether wants are schedulable based on
//! upstream partition states and target partition build status.
use crate::partition_state::{
BuildingPartitionRef, LivePartitionRef, Partition, TaintedPartitionRef,
};
use crate::{PartitionRef, WantDetail};
use serde::{Deserialize, Serialize};
use super::BuildState;
/// The status of partitions required by a want to build (sensed from dep miss job run)
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WantUpstreamStatus {
pub live: Vec<LivePartitionRef>,
pub tainted: Vec<TaintedPartitionRef>,
/// Upstream partitions that are not ready (don't exist, or are in Building/UpstreamBuilding/UpForRetry/Failed/UpstreamFailed states)
pub not_ready: Vec<PartitionRef>,
/// Target partitions that are currently being built by another job
pub building: Vec<BuildingPartitionRef>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WantSchedulability {
pub want: WantDetail,
pub status: WantUpstreamStatus,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WantsSchedulability(pub Vec<WantSchedulability>);
impl WantsSchedulability {
pub fn schedulable_wants(self) -> Vec<WantDetail> {
self.0
.iter()
.filter_map(|ws| match ws.is_schedulable() {
false => None,
true => Some(ws.want.clone()),
})
.collect()
}
}
impl WantSchedulability {
pub fn is_schedulable(&self) -> bool {
// Want is schedulable if:
// - No not-ready upstream dependencies (must all be Live or Tainted)
// - No tainted upstream dependencies
// - No target partitions currently being built by another job
self.status.not_ready.is_empty()
&& self.status.tainted.is_empty()
&& self.status.building.is_empty()
}
}
impl BuildState {
/// Wants are schedulable when their upstreams are ready and target partitions are not already building
pub fn want_schedulability(&self, want: &WantDetail) -> WantSchedulability {
// Check upstream partition statuses (dependencies)
let mut live: Vec<LivePartitionRef> = Vec::new();
let mut tainted: Vec<TaintedPartitionRef> = Vec::new();
let mut not_ready: Vec<PartitionRef> = Vec::new(); // Partitions that don't exist or aren't Live
for upstream_ref in &want.upstreams {
match self.get_canonical_partition(&upstream_ref.r#ref) {
Some(partition) => {
match partition {
Partition::Live(p) => live.push(p.get_ref()),
Partition::Tainted(p) => tainted.push(p.get_ref()),
// All other states (Building, UpstreamBuilding, UpForRetry, Failed, UpstreamFailed) mean upstream is not ready
_ => not_ready.push(upstream_ref.clone()),
}
}
None => {
// Partition doesn't exist yet - it's not ready
not_ready.push(upstream_ref.clone());
}
}
}
// Check target partition statuses (what this want is trying to build)
// If any target partition is already Building, this want should wait
let mut building: Vec<BuildingPartitionRef> = Vec::new();
for target_ref in &want.partitions {
if let Some(partition) = self.get_canonical_partition(&target_ref.r#ref) {
if let Partition::Building(p) = partition {
building.push(p.get_ref());
}
}
}
WantSchedulability {
want: want.clone(),
status: WantUpstreamStatus {
live,
tainted,
not_ready,
building,
},
}
}
pub fn wants_schedulability(&self) -> WantsSchedulability {
WantsSchedulability(
self.wants
.values()
// Use type-safe is_schedulable() - only Idle wants are schedulable
.filter(|w| w.is_schedulable())
.map(|w| self.want_schedulability(&w.to_detail()))
.collect(),
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::want_state::{IdleState as WantIdleState, Want, WantInfo, WantWithState};
use crate::{PartitionDetail, PartitionRef, PartitionStatus, WantStatus};
use std::collections::BTreeMap;
impl WantDetail {
fn with_partitions(self, partitions: Vec<PartitionRef>) -> Self {
Self { partitions, ..self }
}
fn with_upstreams(self, upstreams: Vec<PartitionRef>) -> Self {
Self { upstreams, ..self }
}
fn with_status(self, status: Option<WantStatus>) -> Self {
Self { status, ..self }
}
}
impl PartitionDetail {
fn with_status(self, status: Option<PartitionStatus>) -> Self {
Self { status, ..self }
}
fn with_ref(self, r#ref: Option<PartitionRef>) -> Self {
Self { r#ref, ..self }
}
}
#[test]
fn test_empty_wants_noop() {
assert_eq!(BuildState::default().wants_schedulability().0.len(), 0);
}
// A want with satisfied upstreams (incl "none") should be schedulable
#[test]
fn test_simple_want_with_live_upstream_is_schedulable() {
// Given...
let test_partition = "test_partition";
let state = BuildState::default()
.with_wants(BTreeMap::from([(
"foo".to_string(),
Want::Idle(WantWithState {
want: WantInfo {
partitions: vec![test_partition.into()],
..Default::default()
},
state: WantIdleState {},
}),
)]))
.with_partitions(BTreeMap::from([(
test_partition.to_string(),
PartitionDetail::default().with_ref(Some(test_partition.into())),
)]));
// Should...
let schedulability = state.wants_schedulability();
let ws = schedulability.0.first().unwrap();
assert!(ws.is_schedulable());
}
}

View file

@ -0,0 +1,403 @@
//! Want state transition logic
//!
//! Methods for transitioning wants between states and managing dependencies
//! between wants (derivative wants from dep misses).
use crate::PartitionRef;
use crate::job_run_state::JobRun;
use crate::partition_state::{FailedPartitionRef, LivePartitionRef, Partition};
use crate::want_state::{FailedWantId, SuccessfulWantId, Want};
use super::BuildState;
impl BuildState {
/// Handle creation of a derivative want (created due to job dep miss)
///
/// When a job reports missing dependencies, it returns WantCreateV1 events for those missing partitions.
/// Those events get appended to the BEL and eventually processed by handle_want_create().
///
/// This function is called when we detect a derivative want (has source.job_triggered) and transitions
/// the impacted wants to UpstreamBuilding state, tracking the derivative want ID as an upstream dependency.
///
/// KEY INSIGHT: We must use the actual want_id from the WantCreateV1 event, not synthetic UUIDs generated
/// during event processing. This ensures replay works correctly - the same want IDs are used both during
/// original execution and during replay from the BEL.
pub(crate) fn handle_derivative_want_creation(
&mut self,
derivative_want_id: &str,
derivative_want_partitions: &[PartitionRef],
source_job_run_id: &str,
) {
// Look up the job run that triggered this derivative want
// This job run must be in DepMiss state because it reported missing dependencies
let job_run = self.job_runs.get(source_job_run_id).expect(&format!(
"BUG: Job run {} must exist when derivative want created",
source_job_run_id
));
// Extract the missing deps from the DepMiss job run
let missing_deps = match job_run {
JobRun::DepMiss(dep_miss) => dep_miss.get_missing_deps(),
_ => {
panic!(
"BUG: Job run {} must be in DepMiss state when derivative want created, found {:?}",
source_job_run_id, job_run
);
}
};
// Find which MissingDeps entry corresponds to this derivative want
// The derivative want was created for a specific set of missing partitions,
// and we need to find which downstream partitions are impacted by those missing partitions
for md in missing_deps {
// Check if this derivative want's partitions match the missing partitions in this entry
// We need exact match because one dep miss event can create multiple derivative wants
let partitions_match = md.missing.iter().all(|missing_ref| {
derivative_want_partitions
.iter()
.any(|p| p.r#ref == missing_ref.r#ref)
}) && derivative_want_partitions.len() == md.missing.len();
if partitions_match {
// Now we know which partitions are impacted by this missing dependency
let impacted_partition_refs: Vec<String> =
md.impacted.iter().map(|p| p.r#ref.clone()).collect();
tracing::debug!(
derivative_want_id = %derivative_want_id,
source_job_run_id = %source_job_run_id,
missing_partitions = ?derivative_want_partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
impacted_partitions = ?impacted_partition_refs,
"Processing derivative want creation"
);
// Find all wants that include these impacted partitions
// These are the wants that need to wait for the derivative want to complete
let mut impacted_want_ids: std::collections::HashSet<String> =
std::collections::HashSet::new();
for partition_ref in &impacted_partition_refs {
for want_id in self.get_wants_for_partition(partition_ref) {
impacted_want_ids.insert(want_id.clone());
}
}
// Transition each impacted want to UpstreamBuilding, tracking this derivative want as an upstream
for want_id in impacted_want_ids {
let want = self.wants.remove(&want_id).expect(&format!(
"BUG: Want {} must exist when processing derivative want",
want_id
));
let transitioned = match want {
Want::Building(building) => {
// First dep miss for this want: Building → UpstreamBuilding
tracing::info!(
want_id = %want_id,
derivative_want_id = %derivative_want_id,
"Want: Building → UpstreamBuilding (first missing dep detected)"
);
Want::UpstreamBuilding(
building.detect_missing_deps(vec![derivative_want_id.to_string()]),
)
}
Want::UpstreamBuilding(upstream) => {
// Additional dep miss: UpstreamBuilding → UpstreamBuilding (add another upstream)
// This can happen if multiple jobs report dep misses for different upstreams
tracing::info!(
want_id = %want_id,
derivative_want_id = %derivative_want_id,
"Want: UpstreamBuilding → UpstreamBuilding (additional upstream added)"
);
Want::UpstreamBuilding(
upstream.add_upstreams(vec![derivative_want_id.to_string()]),
)
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} when processing derivative want. Should be Building or UpstreamBuilding.",
want_id, want
);
}
};
self.wants.insert(want_id, transitioned);
}
}
}
}
/// Complete wants when all their partitions become Live
/// Transitions Building → Successful, returns list of newly successful want IDs
pub(crate) fn complete_successful_wants(
&mut self,
newly_live_partitions: &[LivePartitionRef],
job_run_id: &str,
timestamp: u64,
) -> Vec<SuccessfulWantId> {
let mut newly_successful_wants: Vec<SuccessfulWantId> = Vec::new();
for pref in newly_live_partitions {
let want_ids: Vec<String> = self.get_wants_for_partition(&pref.0.r#ref).to_vec();
for want_id in want_ids {
let want = self.wants.remove(&want_id).expect(&format!(
"BUG: Want {} must exist when referenced by partition",
want_id
));
let transitioned = match want {
Want::Building(building) => {
// Check if ALL partitions for this want are now Live
let all_partitions_live = building.want.partitions.iter().all(|p| {
self.get_canonical_partition(&p.r#ref)
.map(|partition| partition.is_live())
.unwrap_or(false)
});
if all_partitions_live {
let successful_want =
building.complete(job_run_id.to_string(), timestamp);
tracing::info!(
want_id = %want_id,
job_run_id = %job_run_id,
"Want: Building → Successful"
);
newly_successful_wants.push(successful_want.get_id());
Want::Successful(successful_want)
} else {
Want::Building(building) // Still building other partitions
}
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} when partition {} became Live. Should be Building.",
want_id, want, pref.0.r#ref
);
}
};
self.wants.insert(want_id.clone(), transitioned);
}
}
newly_successful_wants
}
/// Fail wants when their partitions fail
/// Transitions Building → Failed, and adds to already-failed wants
/// Returns list of newly failed want IDs for downstream cascade
pub(crate) fn fail_directly_affected_wants(
&mut self,
failed_partitions: &[FailedPartitionRef],
) -> Vec<FailedWantId> {
let mut newly_failed_wants: Vec<FailedWantId> = Vec::new();
for pref in failed_partitions {
let want_ids: Vec<String> = self.get_wants_for_partition(&pref.0.r#ref).to_vec();
for want_id in want_ids {
let want = self.wants.remove(&want_id).expect(&format!(
"BUG: Want {} must exist when referenced by partition",
want_id
));
let transitioned = match want {
Want::Building(building) => {
let failed = building
.fail(vec![pref.0.clone()], "Partition build failed".to_string());
newly_failed_wants.push(failed.get_id());
Want::Failed(failed)
}
// Failed → Failed: add new failed partition to existing failed state
Want::Failed(failed) => {
Want::Failed(failed.add_failed_partitions(vec![pref.clone()]))
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} when partition {} failed. Should be Building or Failed.",
want_id, want, pref.0.r#ref
);
}
};
self.wants.insert(want_id.clone(), transitioned);
}
}
newly_failed_wants
}
/// Unblock downstream wants when their upstream dependencies succeed
/// Transitions UpstreamBuilding → Idle (when ready) or Building (when partitions already building)
pub(crate) fn unblock_downstream_wants(
&mut self,
newly_successful_wants: &[SuccessfulWantId],
job_run_id: &str,
timestamp: u64,
) {
tracing::debug!(
newly_successful_wants = ?newly_successful_wants
.iter()
.map(|w| &w.0)
.collect::<Vec<_>>(),
"Checking downstream wants for unblocking"
);
// Find downstream wants that are waiting for any of the newly successful wants
// TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants
let downstream_wants_to_check: Vec<String> = self
.wants
.iter()
.filter_map(|(id, want)| {
match want {
Want::UpstreamBuilding(downstream_want) => {
// Is this downstream want waiting for any of the newly successful wants?
let is_affected =
downstream_want.state.upstream_want_ids.iter().any(|up_id| {
newly_successful_wants.iter().any(|swid| &swid.0 == up_id)
});
if is_affected { Some(id.clone()) } else { None }
}
_ => None,
}
})
.collect();
tracing::debug!(
downstream_wants_to_check = ?downstream_wants_to_check,
"Found downstream wants affected by upstream completion"
);
for want_id in downstream_wants_to_check {
let want = self
.wants
.remove(&want_id)
.expect(&format!("BUG: Want {} must exist", want_id));
let transitioned = match want {
Want::UpstreamBuilding(downstream_want) => {
tracing::debug!(
want_id = %want_id,
upstreams = ?downstream_want.state.upstream_want_ids,
"Checking if all upstreams are satisfied"
);
// Check if ALL of this downstream want's upstream dependencies are now Successful
let all_upstreams_successful = downstream_want
.state
.upstream_want_ids
.iter()
.all(|up_want_id| {
self.wants
.get(up_want_id)
.map(|w| matches!(w, Want::Successful(_)))
.unwrap_or(false)
});
tracing::debug!(
want_id = %want_id,
all_upstreams_successful = %all_upstreams_successful,
"Upstream satisfaction check complete"
);
if all_upstreams_successful {
// Check if any of this want's partitions are still being built
// If a job dep-missed, its partitions transitioned back to Missing
// But other jobs might still be building other partitions for this want
let any_partition_building =
downstream_want.want.partitions.iter().any(|p| {
self.get_canonical_partition(&p.r#ref)
.map(|partition| matches!(partition, Partition::Building(_)))
.unwrap_or(false)
});
tracing::debug!(
want_id = %want_id,
any_partition_building = %any_partition_building,
"Partition building status check"
);
if any_partition_building {
// Some partitions still being built, continue in Building state
tracing::info!(
want_id = %want_id,
job_run_id = %job_run_id,
"Want: UpstreamBuilding → Building (upstreams satisfied, partitions building)"
);
Want::Building(
downstream_want
.continue_building(job_run_id.to_string(), timestamp),
)
} else {
// No partitions being built, become schedulable again
tracing::info!(
want_id = %want_id,
"Want: UpstreamBuilding → Idle (upstreams satisfied, ready to schedule)"
);
Want::Idle(downstream_want.upstreams_satisfied())
}
} else {
// Upstreams not all satisfied yet, stay in UpstreamBuilding
tracing::debug!(
want_id = %want_id,
"Want remains in UpstreamBuilding state (upstreams not yet satisfied)"
);
Want::UpstreamBuilding(downstream_want)
}
}
_ => {
panic!("BUG: Want {} should be UpstreamBuilding here", want_id);
}
};
self.wants.insert(want_id, transitioned);
}
}
/// Cascade failures to downstream wants when their upstream dependencies fail
/// Transitions UpstreamBuilding → UpstreamFailed
pub(crate) fn cascade_failures_to_downstream_wants(
&mut self,
newly_failed_wants: &[FailedWantId],
timestamp: u64,
) {
// Find downstream wants that are waiting for any of the newly failed wants
// TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants
let downstream_wants_to_fail: Vec<String> = self
.wants
.iter()
.filter_map(|(id, want)| {
match want {
Want::UpstreamBuilding(downstream_want) => {
// Is this downstream want waiting for any of the newly failed wants?
let is_affected =
downstream_want.state.upstream_want_ids.iter().any(|up_id| {
newly_failed_wants.iter().any(|fwid| &fwid.0 == up_id)
});
if is_affected { Some(id.clone()) } else { None }
}
_ => None,
}
})
.collect();
for want_id in downstream_wants_to_fail {
let want = self
.wants
.remove(&want_id)
.expect(&format!("BUG: Want {} must exist", want_id));
let transitioned = match want {
Want::UpstreamBuilding(downstream_want) => Want::UpstreamFailed(
downstream_want.upstream_failed(
newly_failed_wants
.iter()
.map(|fwid| fwid.0.clone())
.collect(),
timestamp,
),
),
_ => {
panic!("BUG: Want {} should be UpstreamBuilding here", want_id);
}
};
self.wants.insert(want_id, transitioned);
}
}
}

View file

@ -13,8 +13,8 @@ use lib::orchestrator::{Orchestrator, OrchestratorConfig};
#[command(name = "databuild")]
#[command(about = "DataBuild CLI - Build system for data pipelines", long_about = None)]
struct Cli {
/// Server URL (default: http://localhost:3000)
#[arg(long, default_value = "http://localhost:3000", global = true)]
/// Server URL
#[arg(long, default_value = "http://localhost:3538", global = true)]
server: String,
#[command(subcommand)]
@ -26,7 +26,7 @@ enum Commands {
/// Start the DataBuild HTTP server
Serve {
/// Port to listen on
#[arg(long, default_value = "3000")]
#[arg(long, default_value = "3538")]
port: u16,
/// Database URL (default: :memory: for in-memory SQLite)

View file

@ -1,17 +1,23 @@
use crate::build_event_log::BELStorage;
use crate::build_state::BuildState;
use crate::commands::Command;
use crate::web::templates::{
BaseContext, HomePage, JobRunDetailPage, JobRunDetailView, JobRunsListPage,
PartitionDetailPage, PartitionDetailView, PartitionsListPage, WantDetailPage, WantDetailView,
WantsListPage,
};
use crate::{
CancelWantRequest, CreateWantRequest, CreateWantResponse, GetWantRequest, GetWantResponse,
ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse,
ListWantsRequest, ListWantsResponse,
ListWantsRequest, ListWantsResponse, PartitionStatusCode,
};
use askama::Template;
use axum::{
Json, Router,
extract::{Path, Query, Request, State},
http::{HeaderValue, Method, StatusCode},
middleware::{self, Next},
response::{IntoResponse, Response},
response::{Html, IntoResponse, Response},
routing::{delete, get, post},
};
use std::sync::{
@ -80,7 +86,7 @@ async fn update_last_request_time(
pub fn create_router(state: AppState) -> Router {
// Configure CORS for web app development
let cors = CorsLayer::new()
.allow_origin("http://localhost:3000".parse::<HeaderValue>().unwrap())
.allow_origin("http://localhost:3538".parse::<HeaderValue>().unwrap())
.allow_methods([Method::GET, Method::POST, Method::DELETE, Method::OPTIONS])
.allow_headers([
axum::http::header::CONTENT_TYPE,
@ -90,15 +96,21 @@ pub fn create_router(state: AppState) -> Router {
Router::new()
// Health check
.route("/health", get(health))
// Want endpoints
.route("/api/wants", get(list_wants))
// HTML pages
.route("/", get(home_page))
.route("/wants", get(wants_list_page))
.route("/wants/:id", get(want_detail_page))
.route("/partitions", get(partitions_list_page))
.route("/partitions/*id", get(partition_detail_page))
.route("/job_runs", get(job_runs_list_page))
.route("/job_runs/:id", get(job_run_detail_page))
// JSON API endpoints
.route("/api/wants", get(list_wants_json))
.route("/api/wants", post(create_want))
.route("/api/wants/:id", get(get_want))
.route("/api/wants/:id", get(get_want_json))
.route("/api/wants/:id", delete(cancel_want))
// Partition endpoints
.route("/api/partitions", get(list_partitions))
// Job run endpoints
.route("/api/job_runs", get(list_job_runs))
.route("/api/partitions", get(list_partitions_json))
.route("/api/job_runs", get(list_job_runs_json))
// Add CORS middleware
.layer(cors)
// Add middleware to track request time
@ -138,7 +150,251 @@ impl ErrorResponse {
}
// ============================================================================
// Handlers
// HTML Page Handlers
// ============================================================================
/// Home page
async fn home_page(State(state): State<AppState>) -> impl IntoResponse {
let build_state = match state.build_state.read() {
Ok(s) => s,
Err(_) => return Html("<h1>Error: state lock poisoned</h1>".to_string()).into_response(),
};
// Count active wants (not successful or canceled)
let active_wants_count = build_state
.list_wants(&ListWantsRequest::default())
.data
.iter()
.filter(|w| {
w.status
.as_ref()
.map(|s| s.name != "Successful" && s.name != "Canceled")
.unwrap_or(true)
})
.count() as u64;
// Count active job runs (running or queued)
let active_job_runs_count = build_state
.list_job_runs(&ListJobRunsRequest::default())
.data
.iter()
.filter(|jr| {
jr.status
.as_ref()
.map(|s| s.name == "Running" || s.name == "Queued")
.unwrap_or(false)
})
.count() as u64;
// Count live partitions
let live_partitions_count = build_state
.list_partitions(&ListPartitionsRequest::default())
.data
.iter()
.filter(|p| {
p.status
.as_ref()
.map(|s| s.code == PartitionStatusCode::PartitionLive as i32)
.unwrap_or(false)
})
.count() as u64;
let template = HomePage {
base: BaseContext::default(),
active_wants_count,
active_job_runs_count,
live_partitions_count,
};
match template.render() {
Ok(html) => Html(html).into_response(),
Err(e) => {
tracing::error!("Template render error: {}", e);
Html(format!("<h1>Template error: {}</h1>", e)).into_response()
}
}
}
/// Wants list page
async fn wants_list_page(
State(state): State<AppState>,
Query(params): Query<ListWantsRequest>,
) -> impl IntoResponse {
let build_state = match state.build_state.read() {
Ok(s) => s,
Err(_) => return Html("<h1>Error: state lock poisoned</h1>".to_string()).into_response(),
};
let response = build_state.list_wants(&params);
let template = WantsListPage {
base: BaseContext::default(),
wants: response
.data
.into_iter()
.map(WantDetailView::from)
.collect(),
page: response.page,
page_size: response.page_size,
total_count: response.match_count,
};
match template.render() {
Ok(html) => Html(html).into_response(),
Err(e) => {
tracing::error!("Template render error: {}", e);
Html(format!("<h1>Template error: {}</h1>", e)).into_response()
}
}
}
/// Want detail page
async fn want_detail_page(
State(state): State<AppState>,
Path(want_id): Path<String>,
) -> impl IntoResponse {
let build_state = match state.build_state.read() {
Ok(s) => s,
Err(_) => return Html("<h1>Error: state lock poisoned</h1>".to_string()).into_response(),
};
match build_state.get_want(&want_id) {
Some(want) => {
let template = WantDetailPage {
base: BaseContext::default(),
want: WantDetailView::from(want),
};
match template.render() {
Ok(html) => Html(html).into_response(),
Err(e) => Html(format!("<h1>Template error: {}</h1>", e)).into_response(),
}
}
None => (
StatusCode::NOT_FOUND,
Html("<h1>Want not found</h1>".to_string()),
)
.into_response(),
}
}
/// Partitions list page
async fn partitions_list_page(
State(state): State<AppState>,
Query(params): Query<ListPartitionsRequest>,
) -> impl IntoResponse {
let build_state = match state.build_state.read() {
Ok(s) => s,
Err(_) => return Html("<h1>Error: state lock poisoned</h1>".to_string()).into_response(),
};
let response = build_state.list_partitions(&params);
let template = PartitionsListPage {
base: BaseContext::default(),
partitions: response
.data
.into_iter()
.map(PartitionDetailView::from)
.collect(),
page: response.page,
page_size: response.page_size,
total_count: response.match_count,
};
match template.render() {
Ok(html) => Html(html).into_response(),
Err(e) => Html(format!("<h1>Template error: {}</h1>", e)).into_response(),
}
}
/// Partition detail page
async fn partition_detail_page(
State(state): State<AppState>,
Path(partition_ref): Path<String>,
) -> impl IntoResponse {
let build_state = match state.build_state.read() {
Ok(s) => s,
Err(_) => return Html("<h1>Error: state lock poisoned</h1>".to_string()).into_response(),
};
// Axum's Path extractor automatically percent-decodes the path parameter
match build_state.get_partition(&partition_ref) {
Some(partition) => {
let template = PartitionDetailPage {
base: BaseContext::default(),
partition: PartitionDetailView::from(partition),
};
match template.render() {
Ok(html) => Html(html).into_response(),
Err(e) => Html(format!("<h1>Template error: {}</h1>", e)).into_response(),
}
}
None => (
StatusCode::NOT_FOUND,
Html("<h1>Partition not found</h1>".to_string()),
)
.into_response(),
}
}
/// Job runs list page
async fn job_runs_list_page(
State(state): State<AppState>,
Query(params): Query<ListJobRunsRequest>,
) -> impl IntoResponse {
let build_state = match state.build_state.read() {
Ok(s) => s,
Err(_) => return Html("<h1>Error: state lock poisoned</h1>".to_string()).into_response(),
};
let response = build_state.list_job_runs(&params);
let template = JobRunsListPage {
base: BaseContext::default(),
job_runs: response
.data
.into_iter()
.map(JobRunDetailView::from)
.collect(),
page: response.page,
page_size: response.page_size,
total_count: response.match_count,
};
match template.render() {
Ok(html) => Html(html).into_response(),
Err(e) => Html(format!("<h1>Template error: {}</h1>", e)).into_response(),
}
}
/// Job run detail page
async fn job_run_detail_page(
State(state): State<AppState>,
Path(job_run_id): Path<String>,
) -> impl IntoResponse {
let build_state = match state.build_state.read() {
Ok(s) => s,
Err(_) => return Html("<h1>Error: state lock poisoned</h1>".to_string()).into_response(),
};
match build_state.get_job_run(&job_run_id) {
Some(job_run) => {
let template = JobRunDetailPage {
base: BaseContext::default(),
job_run: JobRunDetailView::from(job_run),
};
match template.render() {
Ok(html) => Html(html).into_response(),
Err(e) => Html(format!("<h1>Template error: {}</h1>", e)).into_response(),
}
}
None => (
StatusCode::NOT_FOUND,
Html("<h1>Job run not found</h1>".to_string()),
)
.into_response(),
}
}
// ============================================================================
// JSON API Handlers
// ============================================================================
/// Health check endpoint
@ -146,12 +402,11 @@ async fn health() -> impl IntoResponse {
(StatusCode::OK, "OK")
}
/// List all wants
async fn list_wants(
/// List all wants (JSON)
async fn list_wants_json(
State(state): State<AppState>,
Query(params): Query<ListWantsRequest>,
) -> impl IntoResponse {
// Read from shared build state
let build_state = match state.build_state.read() {
Ok(state) => state,
Err(e) => {
@ -170,9 +425,11 @@ async fn list_wants(
(StatusCode::OK, Json(response)).into_response()
}
/// Get a specific want by ID
async fn get_want(State(state): State<AppState>, Path(want_id): Path<String>) -> impl IntoResponse {
// Read from shared build state
/// Get a specific want by ID (JSON)
async fn get_want_json(
State(state): State<AppState>,
Path(want_id): Path<String>,
) -> impl IntoResponse {
let build_state = match state.build_state.read() {
Ok(state) => state,
Err(e) => {
@ -320,12 +577,11 @@ async fn cancel_want(
}
}
/// List all partitions
async fn list_partitions(
/// List all partitions (JSON)
async fn list_partitions_json(
State(state): State<AppState>,
Query(params): Query<ListPartitionsRequest>,
) -> impl IntoResponse {
// Read from shared build state
let build_state = match state.build_state.read() {
Ok(state) => state,
Err(e) => {
@ -344,12 +600,11 @@ async fn list_partitions(
(StatusCode::OK, Json(response)).into_response()
}
/// List all job runs
async fn list_job_runs(
/// List all job runs (JSON)
async fn list_job_runs_json(
State(state): State<AppState>,
Query(params): Query<ListJobRunsRequest>,
) -> impl IntoResponse {
// Read from shared build state
let build_state = match state.build_state.read() {
Ok(state) => state,
Err(e) => {

View file

@ -13,6 +13,7 @@ pub mod orchestrator;
mod partition_state;
mod util;
mod want_state;
pub mod web;
// Include generated protobuf code
include!("databuild.rs");

6
databuild/web/mod.rs Normal file
View file

@ -0,0 +1,6 @@
//! Web templates and handlers for the DataBuild dashboard
//!
//! Server-side rendered HTML using Askama templates with CSS View Transitions
//! for smooth navigation. No JavaScript framework required.
pub mod templates;

1316
databuild/web/templates.rs Normal file

File diff suppressed because it is too large Load diff

View file

@ -9,9 +9,9 @@ The service provides two primary capabilities:
## Correctness Strategy
- Rely on databuild.proto, call same shared code in core
- Fully asserted type safety from core to service to web app
- Core -- databuild.proto --> service -- openapi --> web app
- No magic strings (how? protobuf doesn't have consts. enums values? code gen over yaml?)
- Fully asserted type safety from core to service to web app via Askama compile-time template checking
- `databuild.proto` → prost (Rust structs) → Askama templates (compile-checked field access) → HTML
- No magic strings: enum variants and field names are checked at compile time
## Cross-Graph Coordination
Services expose the `GraphService` API for cross-graph dependency management:
@ -29,11 +29,39 @@ needed by the [web app](#web-app).
[Notes about details, context, and askama views.](https://claude.ai/share/76622c1c-7489-496e-be81-a64fef24e636)
## Web App
The web app visualizes databuild application state via features like listing past builds, job statistics,
partition liveness, build request status, etc. This section specifies the hierarchy of functions of the web app. Pages
The web app visualizes databuild application state via features like listing past builds, job statistics,
partition liveness, build request status, etc. This section specifies the hierarchy of functions of the web app. Pages
are described in visual order (generally top to bottom).
General requirements:
### Implementation Strategy: Pure MPA with View Transitions
The web app is a traditional multi-page application (MPA) using server-side rendering. No JavaScript framework.
**Stack:**
- **Askama**: Compile-time checked HTML templates in Rust
- **CSS View Transitions API**: Smooth navigation animations between pages
- **Plain HTML forms**: For mutations (create want, cancel job, etc.)
**Why this approach:**
- **Fast**: No JS bundle to download/parse; HTML streams directly from server
- **Simple**: Single Rust codebase; no frontend build pipeline
- **Type-safe**: Askama checks template field access at compile time
- **Easy to change**: Templates are just HTML; Rust compiler catches breaking changes
**View Transitions** (supported in Chrome 126+, Safari 18.2+, Firefox 144+) provide SPA-like smooth
navigation without JavaScript. The browser snapshots the old page, loads the new one, and animates
matching elements:
```css
@view-transition { navigation: auto; }
.want-card { view-transition-name: want-card; }
```
**Trade-off**: No partial page updates. Pagination and navigation reload the full page, but view
transitions make this feel instant. For live updates, users refresh manually or we add
`<meta http-equiv="refresh">` on status pages.
### General Requirements
- Nav at top of page
- DataBuild logo in top left
- Navigation links at the top allowing navigation to each list page:
@ -43,9 +71,7 @@ General requirements:
- Triggers list page
- Build event log page
- Graph label at top right
- Search box for finding builds, jobs, and partitions (needs a new service API?)
The site is implemented via Aksama templating and HTMX for dynamic updates.
- Search box for finding builds, jobs, and partitions
### Home Page
Jumping off point to navigate and build.

View file

@ -8,26 +8,25 @@ Wants declare intent to have partitions exist. The graph continuously reconciles
## Want Identity
Wants are idempotent through deterministic ID generation:
Wants use UUID-based identity:
```protobuf
message PartitionWant {
string want_id = 1; // Hash(partition_ref + data_timestamp + source)
string root_want_id = 2; // Original user want
string parent_want_id = 3; // Want that triggered this
PartitionRef partition_ref = 4;
uint64 data_timestamp = 5; // Business time (e.g., "2024-01-01" → midnight UTC)
uint64 ttl_seconds = 6; // From data_timestamp
uint64 sla_seconds = 7; // From data_timestamp
WantSource source = 8;
message WantCreateEventV1 {
string want_id = 1; // UUID generated at creation time
repeated PartitionRef partitions = 2; // Partitions this want requests
uint64 data_timestamp = 3; // Business time (e.g., "2024-01-01" → midnight UTC)
uint64 ttl_seconds = 4; // From data_timestamp
uint64 sla_seconds = 5; // From data_timestamp
EventSource source = 6; // Origin: job, API, CLI, web app...
optional string comment = 7;
}
```
Multiple identical want requests produce the same `want_id`, preventing duplication.
Want IDs are UUIDs generated at creation time. Duplicate prevention is handled at the scheduling layer: the orchestrator checks canonical partition state before scheduling jobs, so multiple wants for the same partition simply observe the same build progress rather than triggering redundant work.
## Execution Flow
1. **Want Registration**: User/trigger creates wants with deterministic IDs
1. **Want Registration**: User/trigger creates wants (UUIDs assigned at creation)
2. **Immediate Dispatch**: Graph attempts execution without checking dependencies
3. **Runtime Discovery**: Jobs fail with `MissingDependenciesError(partitions)`
4. **Want Propagation**: Graph creates upstream wants from missing dependencies

74
scripts/run_multihop_example.sh Executable file
View file

@ -0,0 +1,74 @@
#!/usr/bin/env bash
set -euo pipefail
# Navigate to repository root
cd "$(dirname "$0")/.."
# Configuration
PORT=3050
DB_PATH="/tmp/databuild_multihop.db"
FLAG_FILE="/tmp/databuild_multihop_alpha_complete"
PID_FILE="/tmp/databuild_multihop.pid"
echo "=== DataBuild Multi-Hop Example ==="
echo
# Clean up previous state
echo "Cleaning up previous state..."
rm -f "$DB_PATH" "$FLAG_FILE" "$PID_FILE"
pkill -f "databuild.*serve.*port $PORT" || true
sleep 1
# Build the binary
echo "Building databuild CLI..."
bazel build //databuild:databuild
# Start the server in background
echo "Starting databuild server on port $PORT..."
./bazel-bin/databuild/databuild serve \
--port $PORT \
--database "$DB_PATH" \
--config examples/multihop/config.json &
SERVER_PID=$!
echo $SERVER_PID > "$PID_FILE"
echo "Server started with PID $SERVER_PID"
# Wait for server to be ready
echo "Waiting for server to start..."
sleep 2
# Test server health
if curl -s http://localhost:$PORT/health > /dev/null 2>&1; then
echo "Server is ready!"
else
echo "WARNING: Server health check failed, but continuing..."
fi
echo
echo "=== Server is running ==="
echo
echo "You can now interact with the server:"
echo
echo " # Create a want for data/beta (triggers dependency chain)"
echo " ./bazel-bin/databuild/databuild --server http://localhost:$PORT want data/beta"
echo
echo " # Monitor wants"
echo " ./bazel-bin/databuild/databuild --server http://localhost:$PORT wants list"
echo
echo " # Monitor job runs"
echo " ./bazel-bin/databuild/databuild --server http://localhost:$PORT job-runs list"
echo
echo " # Monitor partitions"
echo " ./bazel-bin/databuild/databuild --server http://localhost:$PORT partitions list"
echo
echo "To stop the server:"
echo " kill $SERVER_PID"
echo " # or: pkill -f 'databuild.*serve.*port $PORT'"
echo
echo "Server logs will appear below. Press Ctrl+C to stop."
echo "=========================================="
echo
# Wait for the server process
wait $SERVER_PID