Compare commits
5 commits
31db6a00cb
...
f531730a6b
| Author | SHA1 | Date | |
|---|---|---|---|
| f531730a6b | |||
| 14a24ef6d6 | |||
| e7aac32607 | |||
| efed281a5a | |||
| 042526ea8c |
19 changed files with 4273 additions and 2391 deletions
|
|
@ -109,6 +109,15 @@ crate.spec(
|
|||
package = "toml",
|
||||
version = "0.8",
|
||||
)
|
||||
crate.spec(
|
||||
features = ["urlencode"],
|
||||
package = "askama",
|
||||
version = "0.14",
|
||||
)
|
||||
crate.spec(
|
||||
package = "urlencoding",
|
||||
version = "2.1",
|
||||
)
|
||||
crate.from_specs()
|
||||
use_repo(crate, "crates")
|
||||
|
||||
|
|
|
|||
File diff suppressed because one or more lines are too long
6
assets/logo.svg
Normal file
6
assets/logo.svg
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
<svg width="243" height="215" viewBox="0 0 243 215" fill="none" xmlns="http://www.w3.org/2000/svg">
|
||||
<path d="M123.5 77L149.048 121.25H97.9523L123.5 77Z" fill="#F2994A"/>
|
||||
<path d="M224.772 125.035L155.772 125.035L109.52 45.3147L86.7722 45.3147L40.2722 124.463L16.7725 124.463" stroke="#333333" stroke-width="20"/>
|
||||
<path d="M86.6196 5.18886L121.12 64.9444L75.2062 144.86L86.58 164.56L178.375 165.256L190.125 185.608" stroke="#333333" stroke-width="20"/>
|
||||
<path d="M51.966 184.847L86.4659 125.092L178.632 124.896L190.006 105.196L144.711 25.3514L156.461 5.00002" stroke="#333333" stroke-width="20"/>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 602 B |
|
|
@ -17,14 +17,18 @@ rust_binary(
|
|||
)
|
||||
|
||||
# DataBuild library using generated prost code
|
||||
# Note: Templates are embedded inline in web/templates.rs using Askama's in_doc feature
|
||||
# because Bazel's sandbox doesn't support file-based Askama templates properly.
|
||||
rust_library(
|
||||
name = "lib",
|
||||
srcs = glob(["**/*.rs"]) + [
|
||||
":generate_databuild_rust",
|
||||
],
|
||||
crate_root = "lib.rs",
|
||||
edition = "2021",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"@crates//:askama",
|
||||
"@crates//:axum",
|
||||
"@crates//:prost",
|
||||
"@crates//:prost-types",
|
||||
|
|
@ -33,13 +37,14 @@ rust_library(
|
|||
"@crates//:schemars",
|
||||
"@crates//:serde",
|
||||
"@crates//:serde_json",
|
||||
"@crates//:sha2",
|
||||
"@crates//:tokio",
|
||||
"@crates//:toml",
|
||||
"@crates//:tower",
|
||||
"@crates//:tower-http",
|
||||
"@crates//:tracing",
|
||||
"@crates//:urlencoding",
|
||||
"@crates//:uuid",
|
||||
"@crates//:sha2",
|
||||
],
|
||||
)
|
||||
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
1199
databuild/build_state/event_handlers.rs
Normal file
1199
databuild/build_state/event_handlers.rs
Normal file
File diff suppressed because it is too large
Load diff
188
databuild/build_state/mod.rs
Normal file
188
databuild/build_state/mod.rs
Normal file
|
|
@ -0,0 +1,188 @@
|
|||
//! Build State - the heart of databuild's orchestration system
|
||||
//!
|
||||
//! The BuildState struct tracks all application state, defines valid state transitions,
|
||||
//! and manages cross-state machine state transitions (e.g. job run success resulting
|
||||
//! in partition going from Building to Live).
|
||||
//!
|
||||
//! See docs/design/build-state-semantics.md for the full conceptual model.
|
||||
|
||||
mod event_handlers;
|
||||
mod partition_transitions;
|
||||
mod queries;
|
||||
mod schedulability;
|
||||
mod want_transitions;
|
||||
|
||||
use crate::job_run_state::JobRun;
|
||||
use crate::partition_state::Partition;
|
||||
use crate::want_state::Want;
|
||||
use crate::{PartitionRef, TaintDetail};
|
||||
use std::collections::BTreeMap;
|
||||
use uuid::Uuid;
|
||||
|
||||
// Re-export public types
|
||||
pub use schedulability::{WantSchedulability, WantUpstreamStatus, WantsSchedulability};
|
||||
|
||||
/**
|
||||
Design Notes
|
||||
|
||||
The build state struct is the heart of the service and orchestrator, adapting build events to
|
||||
higher level questions about build state. One temptation is to implement the build state as a set
|
||||
of hierarchically defined reducers, to achieve information hiding and factor system capabilities and
|
||||
state tracking simply. Unfortunately, to update state based on an event, you need a mutable borrow
|
||||
of some part of the build state (that the reducer controls, for instance), and an immutable borrow
|
||||
of the whole state for read/query purposes. The whole state needs to be available to handle state
|
||||
updates like "this is the list of currently active job runs" in response to a job run event. Put
|
||||
simply, this isn't possible without introducing some locking of the whole state and mutable state
|
||||
subset, since they would conflict (the mutable subset would have already been borrowed, so can't
|
||||
be borrowed immutably as part of the whole state borrow). You might also define a "query" phase
|
||||
in which reducers query the state based on the received event, but that just increases complexity.
|
||||
|
||||
Instead, databuild opts for an entity-component system (ECS) that just provides the whole build
|
||||
state mutably to all state update functionality, trusting that we know how to use it responsibly.
|
||||
This means no boxing or "query phase", and means we can have all state updates happen as map lookups
|
||||
and updates, which is exceptionally fast. The states of the different entities are managed by state
|
||||
machines, in a pseudo-colored-petri-net style (only pseudo because we haven't formalized it). It is
|
||||
critical that these state machines, their states, and their transitions are type-safe.
|
||||
*/
|
||||
|
||||
/// Tracks all application state, defines valid state transitions, and manages cross-state machine
|
||||
/// state transitions (e.g. job run success resulting in partition going from Building to Live)
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct BuildState {
|
||||
// Core entity storage
|
||||
pub(crate) wants: BTreeMap<String, Want>,
|
||||
pub(crate) taints: BTreeMap<String, TaintDetail>,
|
||||
pub(crate) job_runs: BTreeMap<String, JobRun>,
|
||||
|
||||
// UUID-based partition indexing
|
||||
pub(crate) partitions_by_uuid: BTreeMap<Uuid, Partition>,
|
||||
pub(crate) canonical_partitions: BTreeMap<String, Uuid>, // partition ref → current UUID
|
||||
|
||||
// Inverted indexes
|
||||
pub(crate) wants_for_partition: BTreeMap<String, Vec<String>>, // partition ref → want_ids
|
||||
pub(crate) downstream_waiting: BTreeMap<String, Vec<Uuid>>, // upstream ref → partition UUIDs waiting for it
|
||||
}
|
||||
|
||||
impl BuildState {
|
||||
/// Reconstruct BuildState from a sequence of events (for read path in web server)
|
||||
/// This allows the web server to rebuild state from BEL storage without holding a lock
|
||||
pub fn from_events(events: &[crate::DataBuildEvent]) -> Self {
|
||||
let mut state = BuildState::default();
|
||||
for event in events {
|
||||
if let Some(ref inner_event) = event.event {
|
||||
// handle_event returns Vec<Event> for cascading events, but we ignore them
|
||||
// since we're replaying from a complete event log
|
||||
state.handle_event(inner_event);
|
||||
}
|
||||
}
|
||||
state
|
||||
}
|
||||
|
||||
pub fn count_job_runs(&self) -> usize {
|
||||
self.job_runs.len()
|
||||
}
|
||||
|
||||
// ===== UUID-based partition access methods =====
|
||||
|
||||
/// Get the canonical partition for a ref (the current/active partition instance)
|
||||
pub fn get_canonical_partition(&self, partition_ref: &str) -> Option<&Partition> {
|
||||
self.canonical_partitions
|
||||
.get(partition_ref)
|
||||
.and_then(|uuid| self.partitions_by_uuid.get(uuid))
|
||||
}
|
||||
|
||||
/// Get the canonical partition UUID for a ref
|
||||
pub fn get_canonical_partition_uuid(&self, partition_ref: &str) -> Option<Uuid> {
|
||||
self.canonical_partitions.get(partition_ref).copied()
|
||||
}
|
||||
|
||||
/// Get a partition by its UUID
|
||||
pub fn get_partition_by_uuid(&self, uuid: Uuid) -> Option<&Partition> {
|
||||
self.partitions_by_uuid.get(&uuid)
|
||||
}
|
||||
|
||||
/// Take the canonical partition for a ref (removes from partitions_by_uuid for state transition)
|
||||
/// The canonical_partitions mapping is NOT removed - caller must update it if creating a new partition
|
||||
pub(crate) fn take_canonical_partition(&mut self, partition_ref: &str) -> Option<Partition> {
|
||||
self.canonical_partitions
|
||||
.get(partition_ref)
|
||||
.copied()
|
||||
.and_then(|uuid| self.partitions_by_uuid.remove(&uuid))
|
||||
}
|
||||
|
||||
/// Get want IDs for a partition ref (from inverted index)
|
||||
pub fn get_wants_for_partition(&self, partition_ref: &str) -> &[String] {
|
||||
self.wants_for_partition
|
||||
.get(partition_ref)
|
||||
.map(|v| v.as_slice())
|
||||
.unwrap_or(&[])
|
||||
}
|
||||
|
||||
/// Register a want in the wants_for_partition inverted index
|
||||
pub(crate) fn register_want_for_partitions(
|
||||
&mut self,
|
||||
want_id: &str,
|
||||
partition_refs: &[PartitionRef],
|
||||
) {
|
||||
for pref in partition_refs {
|
||||
let want_ids = self
|
||||
.wants_for_partition
|
||||
.entry(pref.r#ref.clone())
|
||||
.or_insert_with(Vec::new);
|
||||
if !want_ids.contains(&want_id.to_string()) {
|
||||
want_ids.push(want_id.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Update a partition in the indexes (after state transition)
|
||||
pub(crate) fn update_partition(&mut self, partition: Partition) {
|
||||
let uuid = partition.uuid();
|
||||
self.partitions_by_uuid.insert(uuid, partition);
|
||||
}
|
||||
|
||||
// Test helpers
|
||||
pub(crate) fn with_wants(self, wants: BTreeMap<String, Want>) -> Self {
|
||||
Self { wants, ..self }
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn with_partitions(
|
||||
self,
|
||||
old_partitions: BTreeMap<String, crate::PartitionDetail>,
|
||||
) -> Self {
|
||||
use crate::partition_state::PartitionWithState;
|
||||
|
||||
let mut canonical_partitions: BTreeMap<String, Uuid> = BTreeMap::new();
|
||||
let mut partitions_by_uuid: BTreeMap<Uuid, Partition> = BTreeMap::new();
|
||||
|
||||
// Convert PartitionDetail to Live partitions for testing
|
||||
for (key, detail) in old_partitions {
|
||||
let partition_ref = detail.r#ref.clone().unwrap_or_default();
|
||||
// Create a deterministic UUID for test data
|
||||
let uuid =
|
||||
crate::partition_state::derive_partition_uuid("test_job_run", &partition_ref.r#ref);
|
||||
let live_partition = Partition::Live(PartitionWithState {
|
||||
uuid,
|
||||
partition_ref,
|
||||
state: crate::partition_state::LiveState {
|
||||
built_at: 0,
|
||||
built_by: "test_job_run".to_string(),
|
||||
},
|
||||
});
|
||||
|
||||
canonical_partitions.insert(key, uuid);
|
||||
partitions_by_uuid.insert(uuid, live_partition);
|
||||
}
|
||||
|
||||
Self {
|
||||
canonical_partitions,
|
||||
partitions_by_uuid,
|
||||
..self
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) mod consts {
|
||||
pub const DEFAULT_PAGE_SIZE: u64 = 100;
|
||||
}
|
||||
345
databuild/build_state/partition_transitions.rs
Normal file
345
databuild/build_state/partition_transitions.rs
Normal file
|
|
@ -0,0 +1,345 @@
|
|||
//! Partition state transition logic
|
||||
//!
|
||||
//! Methods for transitioning partitions between states (Building, Live, Failed,
|
||||
//! UpstreamBuilding, UpForRetry, UpstreamFailed) and managing downstream dependencies.
|
||||
|
||||
use crate::PartitionRef;
|
||||
use crate::partition_state::{
|
||||
BuildingPartitionRef, BuildingState, FailedPartitionRef, LivePartitionRef, Partition,
|
||||
PartitionWithState,
|
||||
};
|
||||
use crate::util::current_timestamp;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::BuildState;
|
||||
|
||||
impl BuildState {
|
||||
/// Create a new partition in Building state and update indexes
|
||||
pub(crate) fn create_partition_building(
|
||||
&mut self,
|
||||
job_run_id: &str,
|
||||
partition_ref: PartitionRef,
|
||||
) -> Uuid {
|
||||
let partition =
|
||||
PartitionWithState::<BuildingState>::new(job_run_id.to_string(), partition_ref.clone());
|
||||
let uuid = partition.uuid;
|
||||
|
||||
// Update indexes
|
||||
self.partitions_by_uuid
|
||||
.insert(uuid, Partition::Building(partition));
|
||||
self.canonical_partitions
|
||||
.insert(partition_ref.r#ref.clone(), uuid);
|
||||
|
||||
tracing::info!(
|
||||
partition = %partition_ref.r#ref,
|
||||
uuid = %uuid,
|
||||
job_run_id = %job_run_id,
|
||||
"Partition: Created in Building state"
|
||||
);
|
||||
|
||||
uuid
|
||||
}
|
||||
|
||||
/// Create partitions in Building state
|
||||
/// Used when a job run starts building partitions.
|
||||
/// Note: Partitions no longer have a Missing state - they start directly as Building.
|
||||
pub(crate) fn transition_partitions_to_building(
|
||||
&mut self,
|
||||
partition_refs: &[BuildingPartitionRef],
|
||||
job_run_id: &str,
|
||||
) {
|
||||
for building_ref in partition_refs {
|
||||
if let Some(partition) = self.get_canonical_partition(&building_ref.0.r#ref).cloned() {
|
||||
// Partition already exists - this is an error unless we're retrying from UpForRetry
|
||||
match partition {
|
||||
Partition::UpForRetry(_) => {
|
||||
// Valid: UpForRetry -> Building (retry after deps satisfied)
|
||||
// Old partition stays in partitions_by_uuid as historical record
|
||||
// Create new Building partition with fresh UUID
|
||||
let uuid =
|
||||
self.create_partition_building(job_run_id, building_ref.0.clone());
|
||||
tracing::info!(
|
||||
partition = %building_ref.0.r#ref,
|
||||
job_run_id = %job_run_id,
|
||||
uuid = %uuid,
|
||||
"Partition: UpForRetry → Building (retry)"
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Invalid state - partition {} cannot start building from state {:?}",
|
||||
building_ref.0.r#ref, partition
|
||||
)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Partition doesn't exist yet - create directly in Building state
|
||||
let uuid = self.create_partition_building(job_run_id, building_ref.0.clone());
|
||||
tracing::info!(
|
||||
partition = %building_ref.0.r#ref,
|
||||
job_run_id = %job_run_id,
|
||||
uuid = %uuid,
|
||||
"Partition: (new) → Building"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition partitions from Building to Live state
|
||||
/// Used when a job run successfully completes
|
||||
pub(crate) fn transition_partitions_to_live(
|
||||
&mut self,
|
||||
partition_refs: &[LivePartitionRef],
|
||||
job_run_id: &str,
|
||||
timestamp: u64,
|
||||
) {
|
||||
for pref in partition_refs {
|
||||
let partition = self
|
||||
.take_canonical_partition(&pref.0.r#ref)
|
||||
.expect(&format!(
|
||||
"BUG: Partition {} must exist and be in Building state before completion",
|
||||
pref.0.r#ref
|
||||
));
|
||||
|
||||
// ONLY valid transition: Building -> Live
|
||||
let transitioned = match partition {
|
||||
Partition::Building(building) => {
|
||||
tracing::info!(
|
||||
partition = %pref.0.r#ref,
|
||||
job_run_id = %job_run_id,
|
||||
"Partition: Building → Live"
|
||||
);
|
||||
Partition::Live(building.complete(timestamp))
|
||||
}
|
||||
// All other states are invalid
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Invalid state - partition {} must be Building to transition to Live, found {:?}",
|
||||
pref.0.r#ref, partition
|
||||
)
|
||||
}
|
||||
};
|
||||
self.update_partition(transitioned);
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition partitions from Building to Failed state
|
||||
/// Used when a job run fails
|
||||
pub(crate) fn transition_partitions_to_failed(
|
||||
&mut self,
|
||||
partition_refs: &[FailedPartitionRef],
|
||||
job_run_id: &str,
|
||||
timestamp: u64,
|
||||
) {
|
||||
for pref in partition_refs {
|
||||
let partition = self
|
||||
.take_canonical_partition(&pref.0.r#ref)
|
||||
.expect(&format!(
|
||||
"BUG: Partition {} must exist and be in Building state before failure",
|
||||
pref.0.r#ref
|
||||
));
|
||||
|
||||
// ONLY valid transition: Building -> Failed
|
||||
let transitioned = match partition {
|
||||
Partition::Building(building) => {
|
||||
tracing::info!(
|
||||
partition = %pref.0.r#ref,
|
||||
job_run_id = %job_run_id,
|
||||
"Partition: Building → Failed"
|
||||
);
|
||||
Partition::Failed(building.fail(timestamp))
|
||||
}
|
||||
// All other states are invalid
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Invalid state - partition {} must be Building to transition to Failed, found {:?}",
|
||||
pref.0.r#ref, partition
|
||||
)
|
||||
}
|
||||
};
|
||||
self.update_partition(transitioned);
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition partitions from Building to UpstreamBuilding state
|
||||
/// Used when a job run encounters missing dependencies and cannot proceed.
|
||||
/// The partition waits for its upstream deps to be built before becoming UpForRetry.
|
||||
pub(crate) fn transition_partitions_to_upstream_building(
|
||||
&mut self,
|
||||
partition_refs: &[BuildingPartitionRef],
|
||||
missing_deps: Vec<PartitionRef>,
|
||||
) {
|
||||
for building_ref in partition_refs {
|
||||
let partition = self
|
||||
.take_canonical_partition(&building_ref.0.r#ref)
|
||||
.expect(&format!(
|
||||
"BUG: Partition {} must exist and be in Building state during dep_miss",
|
||||
building_ref.0.r#ref
|
||||
));
|
||||
|
||||
// Only valid transition: Building -> UpstreamBuilding
|
||||
let transitioned = match partition {
|
||||
Partition::Building(building) => {
|
||||
let partition_uuid = building.uuid;
|
||||
tracing::info!(
|
||||
partition = %building_ref.0.r#ref,
|
||||
uuid = %partition_uuid,
|
||||
missing_deps = ?missing_deps.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
|
||||
"Partition: Building → UpstreamBuilding (dep miss)"
|
||||
);
|
||||
|
||||
// Update downstream_waiting index: for each missing dep, record that this partition is waiting
|
||||
for missing_dep in &missing_deps {
|
||||
self.downstream_waiting
|
||||
.entry(missing_dep.r#ref.clone())
|
||||
.or_default()
|
||||
.push(partition_uuid);
|
||||
}
|
||||
|
||||
Partition::UpstreamBuilding(building.dep_miss(missing_deps.clone()))
|
||||
}
|
||||
// All other states are invalid
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Invalid state - partition {} must be Building during dep_miss, found {:?}",
|
||||
building_ref.0.r#ref, partition
|
||||
)
|
||||
}
|
||||
};
|
||||
self.update_partition(transitioned);
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition partitions from UpstreamBuilding to UpForRetry when their upstream deps become Live.
|
||||
/// This should be called when partitions become Live to check if any downstream partitions can now retry.
|
||||
/// Uses the `downstream_waiting` index for O(1) lookup of affected partitions.
|
||||
pub(crate) fn unblock_downstream_partitions(
|
||||
&mut self,
|
||||
newly_live_partition_refs: &[LivePartitionRef],
|
||||
) {
|
||||
// Collect UUIDs of partitions that might be unblocked using the inverted index
|
||||
let mut uuids_to_check: Vec<Uuid> = Vec::new();
|
||||
for live_ref in newly_live_partition_refs {
|
||||
if let Some(waiting_uuids) = self.downstream_waiting.get(&live_ref.0.r#ref) {
|
||||
uuids_to_check.extend(waiting_uuids.iter().cloned());
|
||||
}
|
||||
}
|
||||
|
||||
// Deduplicate UUIDs (a partition might be waiting for multiple deps that all became live)
|
||||
uuids_to_check.sort();
|
||||
uuids_to_check.dedup();
|
||||
|
||||
for uuid in uuids_to_check {
|
||||
// Get partition by UUID - it might have been transitioned already or no longer exist
|
||||
let Some(partition) = self.partitions_by_uuid.get(&uuid).cloned() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let partition_ref = partition.partition_ref().r#ref.clone();
|
||||
|
||||
// Only process UpstreamBuilding partitions
|
||||
if let Partition::UpstreamBuilding(mut upstream_building) = partition {
|
||||
// Remove satisfied deps from missing_deps
|
||||
for live_ref in newly_live_partition_refs {
|
||||
upstream_building
|
||||
.state
|
||||
.missing_deps
|
||||
.retain(|d| d.r#ref != live_ref.0.r#ref);
|
||||
// Also remove from downstream_waiting index
|
||||
if let Some(waiting) = self.downstream_waiting.get_mut(&live_ref.0.r#ref) {
|
||||
waiting.retain(|u| *u != uuid);
|
||||
}
|
||||
}
|
||||
|
||||
let transitioned = if upstream_building.state.missing_deps.is_empty() {
|
||||
// All deps satisfied, transition to UpForRetry
|
||||
tracing::info!(
|
||||
partition = %partition_ref,
|
||||
uuid = %uuid,
|
||||
"Partition: UpstreamBuilding → UpForRetry (all upstreams satisfied)"
|
||||
);
|
||||
Partition::UpForRetry(upstream_building.upstreams_satisfied())
|
||||
} else {
|
||||
// Still waiting for more deps
|
||||
tracing::debug!(
|
||||
partition = %partition_ref,
|
||||
uuid = %uuid,
|
||||
remaining_deps = ?upstream_building.state.missing_deps.iter().map(|d| &d.r#ref).collect::<Vec<_>>(),
|
||||
"Partition remains in UpstreamBuilding (still waiting for deps)"
|
||||
);
|
||||
Partition::UpstreamBuilding(upstream_building)
|
||||
};
|
||||
|
||||
self.update_partition(transitioned);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Cascade failures to downstream partitions when their upstream dependencies fail.
|
||||
/// Transitions UpstreamBuilding → UpstreamFailed for partitions waiting on failed upstreams.
|
||||
/// Uses the `downstream_waiting` index for O(1) lookup of affected partitions.
|
||||
pub(crate) fn cascade_failures_to_downstream_partitions(
|
||||
&mut self,
|
||||
failed_partition_refs: &[FailedPartitionRef],
|
||||
) {
|
||||
// Collect UUIDs of partitions that are waiting for the failed partitions
|
||||
let mut uuids_to_fail: Vec<Uuid> = Vec::new();
|
||||
for failed_ref in failed_partition_refs {
|
||||
if let Some(waiting_uuids) = self.downstream_waiting.get(&failed_ref.0.r#ref) {
|
||||
uuids_to_fail.extend(waiting_uuids.iter().cloned());
|
||||
}
|
||||
}
|
||||
|
||||
// Deduplicate UUIDs
|
||||
uuids_to_fail.sort();
|
||||
uuids_to_fail.dedup();
|
||||
|
||||
for uuid in uuids_to_fail {
|
||||
// Get partition by UUID
|
||||
let Some(partition) = self.partitions_by_uuid.get(&uuid).cloned() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let partition_ref = partition.partition_ref().r#ref.clone();
|
||||
|
||||
// Only process UpstreamBuilding partitions
|
||||
if let Partition::UpstreamBuilding(upstream_building) = partition {
|
||||
// Collect which upstream refs failed
|
||||
let failed_upstream_refs: Vec<PartitionRef> = failed_partition_refs
|
||||
.iter()
|
||||
.filter(|f| {
|
||||
upstream_building
|
||||
.state
|
||||
.missing_deps
|
||||
.iter()
|
||||
.any(|d| d.r#ref == f.0.r#ref)
|
||||
})
|
||||
.map(|f| f.0.clone())
|
||||
.collect();
|
||||
|
||||
if !failed_upstream_refs.is_empty() {
|
||||
tracing::info!(
|
||||
partition = %partition_ref,
|
||||
uuid = %uuid,
|
||||
failed_upstreams = ?failed_upstream_refs.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
|
||||
"Partition: UpstreamBuilding → UpstreamFailed (upstream failed)"
|
||||
);
|
||||
|
||||
// Remove from downstream_waiting index for all deps
|
||||
for dep in &upstream_building.state.missing_deps {
|
||||
if let Some(waiting) = self.downstream_waiting.get_mut(&dep.r#ref) {
|
||||
waiting.retain(|u| *u != uuid);
|
||||
}
|
||||
}
|
||||
|
||||
// Transition to UpstreamFailed
|
||||
let transitioned = Partition::UpstreamFailed(
|
||||
upstream_building
|
||||
.upstream_failed(failed_upstream_refs, current_timestamp()),
|
||||
);
|
||||
self.update_partition(transitioned);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
118
databuild/build_state/queries.rs
Normal file
118
databuild/build_state/queries.rs
Normal file
|
|
@ -0,0 +1,118 @@
|
|||
//! Query methods for BuildState
|
||||
//!
|
||||
//! Read-only methods for accessing state (get_*, list_*) used by the API layer.
|
||||
|
||||
use crate::{
|
||||
JobRunDetail, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest,
|
||||
ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest,
|
||||
ListWantsResponse, PartitionDetail, TaintDetail, WantDetail,
|
||||
};
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use super::{BuildState, consts};
|
||||
|
||||
impl BuildState {
|
||||
pub fn get_want(&self, want_id: &str) -> Option<WantDetail> {
|
||||
self.wants.get(want_id).map(|w| w.to_detail())
|
||||
}
|
||||
|
||||
pub fn get_taint(&self, taint_id: &str) -> Option<TaintDetail> {
|
||||
self.taints.get(taint_id).cloned()
|
||||
}
|
||||
|
||||
pub fn get_partition(&self, partition_id: &str) -> Option<PartitionDetail> {
|
||||
self.get_canonical_partition(partition_id)
|
||||
.map(|p| p.to_detail())
|
||||
}
|
||||
|
||||
pub fn get_job_run(&self, job_run_id: &str) -> Option<JobRunDetail> {
|
||||
self.job_runs.get(job_run_id).map(|jr| jr.to_detail())
|
||||
}
|
||||
|
||||
pub fn list_wants(&self, request: &ListWantsRequest) -> ListWantsResponse {
|
||||
let page = request.page.unwrap_or(0);
|
||||
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
|
||||
|
||||
let start = page * page_size;
|
||||
|
||||
// Paginate first, then convert only the needed wants to WantDetail
|
||||
let data: Vec<WantDetail> = self
|
||||
.wants
|
||||
.values()
|
||||
.skip(start as usize)
|
||||
.take(page_size as usize)
|
||||
.map(|w| w.to_detail())
|
||||
.collect();
|
||||
|
||||
ListWantsResponse {
|
||||
data,
|
||||
match_count: self.wants.len() as u64,
|
||||
page,
|
||||
page_size,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn list_taints(&self, request: &ListTaintsRequest) -> ListTaintsResponse {
|
||||
let page = request.page.unwrap_or(0);
|
||||
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
|
||||
ListTaintsResponse {
|
||||
data: list_state_items(&self.taints, page, page_size),
|
||||
match_count: self.wants.len() as u64,
|
||||
page,
|
||||
page_size,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn list_partitions(&self, request: &ListPartitionsRequest) -> ListPartitionsResponse {
|
||||
let page = request.page.unwrap_or(0);
|
||||
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
|
||||
// Convert canonical partitions to PartitionDetail for API
|
||||
let partition_details: BTreeMap<String, PartitionDetail> = self
|
||||
.canonical_partitions
|
||||
.iter()
|
||||
.filter_map(|(k, uuid)| {
|
||||
self.partitions_by_uuid
|
||||
.get(uuid)
|
||||
.map(|p| (k.clone(), p.to_detail()))
|
||||
})
|
||||
.collect();
|
||||
ListPartitionsResponse {
|
||||
data: list_state_items(&partition_details, page, page_size),
|
||||
match_count: self.canonical_partitions.len() as u64,
|
||||
page,
|
||||
page_size,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn list_job_runs(&self, request: &ListJobRunsRequest) -> ListJobRunsResponse {
|
||||
let page = request.page.unwrap_or(0);
|
||||
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
|
||||
|
||||
let start = page * page_size;
|
||||
let data: Vec<JobRunDetail> = self
|
||||
.job_runs
|
||||
.values()
|
||||
.skip(start as usize)
|
||||
.take(page_size as usize)
|
||||
.map(|jr| jr.to_detail())
|
||||
.collect();
|
||||
|
||||
ListJobRunsResponse {
|
||||
data,
|
||||
match_count: self.job_runs.len() as u64,
|
||||
page,
|
||||
page_size,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn list_state_items<T: Clone>(map: &BTreeMap<String, T>, page: u64, page_size: u64) -> Vec<T> {
|
||||
// TODO when we add filtering, can we add it generically via some trait or filter object that can be provided?
|
||||
let start = page * page_size;
|
||||
let end = start + page_size;
|
||||
map.values()
|
||||
.skip(start as usize)
|
||||
.take(end as usize)
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
176
databuild/build_state/schedulability.rs
Normal file
176
databuild/build_state/schedulability.rs
Normal file
|
|
@ -0,0 +1,176 @@
|
|||
//! Want schedulability logic
|
||||
//!
|
||||
//! Types and methods for determining whether wants are schedulable based on
|
||||
//! upstream partition states and target partition build status.
|
||||
|
||||
use crate::partition_state::{
|
||||
BuildingPartitionRef, LivePartitionRef, Partition, TaintedPartitionRef,
|
||||
};
|
||||
use crate::{PartitionRef, WantDetail};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::BuildState;
|
||||
|
||||
/// The status of partitions required by a want to build (sensed from dep miss job run)
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
pub struct WantUpstreamStatus {
|
||||
pub live: Vec<LivePartitionRef>,
|
||||
pub tainted: Vec<TaintedPartitionRef>,
|
||||
/// Upstream partitions that are not ready (don't exist, or are in Building/UpstreamBuilding/UpForRetry/Failed/UpstreamFailed states)
|
||||
pub not_ready: Vec<PartitionRef>,
|
||||
/// Target partitions that are currently being built by another job
|
||||
pub building: Vec<BuildingPartitionRef>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
pub struct WantSchedulability {
|
||||
pub want: WantDetail,
|
||||
pub status: WantUpstreamStatus,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
pub struct WantsSchedulability(pub Vec<WantSchedulability>);
|
||||
|
||||
impl WantsSchedulability {
|
||||
pub fn schedulable_wants(self) -> Vec<WantDetail> {
|
||||
self.0
|
||||
.iter()
|
||||
.filter_map(|ws| match ws.is_schedulable() {
|
||||
false => None,
|
||||
true => Some(ws.want.clone()),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl WantSchedulability {
|
||||
pub fn is_schedulable(&self) -> bool {
|
||||
// Want is schedulable if:
|
||||
// - No not-ready upstream dependencies (must all be Live or Tainted)
|
||||
// - No tainted upstream dependencies
|
||||
// - No target partitions currently being built by another job
|
||||
self.status.not_ready.is_empty()
|
||||
&& self.status.tainted.is_empty()
|
||||
&& self.status.building.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
impl BuildState {
|
||||
/// Wants are schedulable when their upstreams are ready and target partitions are not already building
|
||||
pub fn want_schedulability(&self, want: &WantDetail) -> WantSchedulability {
|
||||
// Check upstream partition statuses (dependencies)
|
||||
let mut live: Vec<LivePartitionRef> = Vec::new();
|
||||
let mut tainted: Vec<TaintedPartitionRef> = Vec::new();
|
||||
let mut not_ready: Vec<PartitionRef> = Vec::new(); // Partitions that don't exist or aren't Live
|
||||
|
||||
for upstream_ref in &want.upstreams {
|
||||
match self.get_canonical_partition(&upstream_ref.r#ref) {
|
||||
Some(partition) => {
|
||||
match partition {
|
||||
Partition::Live(p) => live.push(p.get_ref()),
|
||||
Partition::Tainted(p) => tainted.push(p.get_ref()),
|
||||
// All other states (Building, UpstreamBuilding, UpForRetry, Failed, UpstreamFailed) mean upstream is not ready
|
||||
_ => not_ready.push(upstream_ref.clone()),
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// Partition doesn't exist yet - it's not ready
|
||||
not_ready.push(upstream_ref.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check target partition statuses (what this want is trying to build)
|
||||
// If any target partition is already Building, this want should wait
|
||||
let mut building: Vec<BuildingPartitionRef> = Vec::new();
|
||||
for target_ref in &want.partitions {
|
||||
if let Some(partition) = self.get_canonical_partition(&target_ref.r#ref) {
|
||||
if let Partition::Building(p) = partition {
|
||||
building.push(p.get_ref());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
WantSchedulability {
|
||||
want: want.clone(),
|
||||
status: WantUpstreamStatus {
|
||||
live,
|
||||
tainted,
|
||||
not_ready,
|
||||
building,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wants_schedulability(&self) -> WantsSchedulability {
|
||||
WantsSchedulability(
|
||||
self.wants
|
||||
.values()
|
||||
// Use type-safe is_schedulable() - only Idle wants are schedulable
|
||||
.filter(|w| w.is_schedulable())
|
||||
.map(|w| self.want_schedulability(&w.to_detail()))
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::want_state::{IdleState as WantIdleState, Want, WantInfo, WantWithState};
|
||||
use crate::{PartitionDetail, PartitionRef, PartitionStatus, WantStatus};
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
impl WantDetail {
|
||||
fn with_partitions(self, partitions: Vec<PartitionRef>) -> Self {
|
||||
Self { partitions, ..self }
|
||||
}
|
||||
fn with_upstreams(self, upstreams: Vec<PartitionRef>) -> Self {
|
||||
Self { upstreams, ..self }
|
||||
}
|
||||
fn with_status(self, status: Option<WantStatus>) -> Self {
|
||||
Self { status, ..self }
|
||||
}
|
||||
}
|
||||
|
||||
impl PartitionDetail {
|
||||
fn with_status(self, status: Option<PartitionStatus>) -> Self {
|
||||
Self { status, ..self }
|
||||
}
|
||||
fn with_ref(self, r#ref: Option<PartitionRef>) -> Self {
|
||||
Self { r#ref, ..self }
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_wants_noop() {
|
||||
assert_eq!(BuildState::default().wants_schedulability().0.len(), 0);
|
||||
}
|
||||
|
||||
// A want with satisfied upstreams (incl "none") should be schedulable
|
||||
#[test]
|
||||
fn test_simple_want_with_live_upstream_is_schedulable() {
|
||||
// Given...
|
||||
let test_partition = "test_partition";
|
||||
let state = BuildState::default()
|
||||
.with_wants(BTreeMap::from([(
|
||||
"foo".to_string(),
|
||||
Want::Idle(WantWithState {
|
||||
want: WantInfo {
|
||||
partitions: vec![test_partition.into()],
|
||||
..Default::default()
|
||||
},
|
||||
state: WantIdleState {},
|
||||
}),
|
||||
)]))
|
||||
.with_partitions(BTreeMap::from([(
|
||||
test_partition.to_string(),
|
||||
PartitionDetail::default().with_ref(Some(test_partition.into())),
|
||||
)]));
|
||||
|
||||
// Should...
|
||||
let schedulability = state.wants_schedulability();
|
||||
let ws = schedulability.0.first().unwrap();
|
||||
assert!(ws.is_schedulable());
|
||||
}
|
||||
}
|
||||
403
databuild/build_state/want_transitions.rs
Normal file
403
databuild/build_state/want_transitions.rs
Normal file
|
|
@ -0,0 +1,403 @@
|
|||
//! Want state transition logic
|
||||
//!
|
||||
//! Methods for transitioning wants between states and managing dependencies
|
||||
//! between wants (derivative wants from dep misses).
|
||||
|
||||
use crate::PartitionRef;
|
||||
use crate::job_run_state::JobRun;
|
||||
use crate::partition_state::{FailedPartitionRef, LivePartitionRef, Partition};
|
||||
use crate::want_state::{FailedWantId, SuccessfulWantId, Want};
|
||||
|
||||
use super::BuildState;
|
||||
|
||||
impl BuildState {
|
||||
/// Handle creation of a derivative want (created due to job dep miss)
|
||||
///
|
||||
/// When a job reports missing dependencies, it returns WantCreateV1 events for those missing partitions.
|
||||
/// Those events get appended to the BEL and eventually processed by handle_want_create().
|
||||
///
|
||||
/// This function is called when we detect a derivative want (has source.job_triggered) and transitions
|
||||
/// the impacted wants to UpstreamBuilding state, tracking the derivative want ID as an upstream dependency.
|
||||
///
|
||||
/// KEY INSIGHT: We must use the actual want_id from the WantCreateV1 event, not synthetic UUIDs generated
|
||||
/// during event processing. This ensures replay works correctly - the same want IDs are used both during
|
||||
/// original execution and during replay from the BEL.
|
||||
pub(crate) fn handle_derivative_want_creation(
|
||||
&mut self,
|
||||
derivative_want_id: &str,
|
||||
derivative_want_partitions: &[PartitionRef],
|
||||
source_job_run_id: &str,
|
||||
) {
|
||||
// Look up the job run that triggered this derivative want
|
||||
// This job run must be in DepMiss state because it reported missing dependencies
|
||||
let job_run = self.job_runs.get(source_job_run_id).expect(&format!(
|
||||
"BUG: Job run {} must exist when derivative want created",
|
||||
source_job_run_id
|
||||
));
|
||||
|
||||
// Extract the missing deps from the DepMiss job run
|
||||
let missing_deps = match job_run {
|
||||
JobRun::DepMiss(dep_miss) => dep_miss.get_missing_deps(),
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Job run {} must be in DepMiss state when derivative want created, found {:?}",
|
||||
source_job_run_id, job_run
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
// Find which MissingDeps entry corresponds to this derivative want
|
||||
// The derivative want was created for a specific set of missing partitions,
|
||||
// and we need to find which downstream partitions are impacted by those missing partitions
|
||||
for md in missing_deps {
|
||||
// Check if this derivative want's partitions match the missing partitions in this entry
|
||||
// We need exact match because one dep miss event can create multiple derivative wants
|
||||
let partitions_match = md.missing.iter().all(|missing_ref| {
|
||||
derivative_want_partitions
|
||||
.iter()
|
||||
.any(|p| p.r#ref == missing_ref.r#ref)
|
||||
}) && derivative_want_partitions.len() == md.missing.len();
|
||||
|
||||
if partitions_match {
|
||||
// Now we know which partitions are impacted by this missing dependency
|
||||
let impacted_partition_refs: Vec<String> =
|
||||
md.impacted.iter().map(|p| p.r#ref.clone()).collect();
|
||||
|
||||
tracing::debug!(
|
||||
derivative_want_id = %derivative_want_id,
|
||||
source_job_run_id = %source_job_run_id,
|
||||
missing_partitions = ?derivative_want_partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
|
||||
impacted_partitions = ?impacted_partition_refs,
|
||||
"Processing derivative want creation"
|
||||
);
|
||||
|
||||
// Find all wants that include these impacted partitions
|
||||
// These are the wants that need to wait for the derivative want to complete
|
||||
let mut impacted_want_ids: std::collections::HashSet<String> =
|
||||
std::collections::HashSet::new();
|
||||
for partition_ref in &impacted_partition_refs {
|
||||
for want_id in self.get_wants_for_partition(partition_ref) {
|
||||
impacted_want_ids.insert(want_id.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Transition each impacted want to UpstreamBuilding, tracking this derivative want as an upstream
|
||||
for want_id in impacted_want_ids {
|
||||
let want = self.wants.remove(&want_id).expect(&format!(
|
||||
"BUG: Want {} must exist when processing derivative want",
|
||||
want_id
|
||||
));
|
||||
|
||||
let transitioned = match want {
|
||||
Want::Building(building) => {
|
||||
// First dep miss for this want: Building → UpstreamBuilding
|
||||
tracing::info!(
|
||||
want_id = %want_id,
|
||||
derivative_want_id = %derivative_want_id,
|
||||
"Want: Building → UpstreamBuilding (first missing dep detected)"
|
||||
);
|
||||
Want::UpstreamBuilding(
|
||||
building.detect_missing_deps(vec![derivative_want_id.to_string()]),
|
||||
)
|
||||
}
|
||||
Want::UpstreamBuilding(upstream) => {
|
||||
// Additional dep miss: UpstreamBuilding → UpstreamBuilding (add another upstream)
|
||||
// This can happen if multiple jobs report dep misses for different upstreams
|
||||
tracing::info!(
|
||||
want_id = %want_id,
|
||||
derivative_want_id = %derivative_want_id,
|
||||
"Want: UpstreamBuilding → UpstreamBuilding (additional upstream added)"
|
||||
);
|
||||
Want::UpstreamBuilding(
|
||||
upstream.add_upstreams(vec![derivative_want_id.to_string()]),
|
||||
)
|
||||
}
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Want {} in invalid state {:?} when processing derivative want. Should be Building or UpstreamBuilding.",
|
||||
want_id, want
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
self.wants.insert(want_id, transitioned);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Complete wants when all their partitions become Live
|
||||
/// Transitions Building → Successful, returns list of newly successful want IDs
|
||||
pub(crate) fn complete_successful_wants(
|
||||
&mut self,
|
||||
newly_live_partitions: &[LivePartitionRef],
|
||||
job_run_id: &str,
|
||||
timestamp: u64,
|
||||
) -> Vec<SuccessfulWantId> {
|
||||
let mut newly_successful_wants: Vec<SuccessfulWantId> = Vec::new();
|
||||
|
||||
for pref in newly_live_partitions {
|
||||
let want_ids: Vec<String> = self.get_wants_for_partition(&pref.0.r#ref).to_vec();
|
||||
|
||||
for want_id in want_ids {
|
||||
let want = self.wants.remove(&want_id).expect(&format!(
|
||||
"BUG: Want {} must exist when referenced by partition",
|
||||
want_id
|
||||
));
|
||||
|
||||
let transitioned = match want {
|
||||
Want::Building(building) => {
|
||||
// Check if ALL partitions for this want are now Live
|
||||
let all_partitions_live = building.want.partitions.iter().all(|p| {
|
||||
self.get_canonical_partition(&p.r#ref)
|
||||
.map(|partition| partition.is_live())
|
||||
.unwrap_or(false)
|
||||
});
|
||||
|
||||
if all_partitions_live {
|
||||
let successful_want =
|
||||
building.complete(job_run_id.to_string(), timestamp);
|
||||
tracing::info!(
|
||||
want_id = %want_id,
|
||||
job_run_id = %job_run_id,
|
||||
"Want: Building → Successful"
|
||||
);
|
||||
newly_successful_wants.push(successful_want.get_id());
|
||||
Want::Successful(successful_want)
|
||||
} else {
|
||||
Want::Building(building) // Still building other partitions
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Want {} in invalid state {:?} when partition {} became Live. Should be Building.",
|
||||
want_id, want, pref.0.r#ref
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
self.wants.insert(want_id.clone(), transitioned);
|
||||
}
|
||||
}
|
||||
|
||||
newly_successful_wants
|
||||
}
|
||||
|
||||
/// Fail wants when their partitions fail
|
||||
/// Transitions Building → Failed, and adds to already-failed wants
|
||||
/// Returns list of newly failed want IDs for downstream cascade
|
||||
pub(crate) fn fail_directly_affected_wants(
|
||||
&mut self,
|
||||
failed_partitions: &[FailedPartitionRef],
|
||||
) -> Vec<FailedWantId> {
|
||||
let mut newly_failed_wants: Vec<FailedWantId> = Vec::new();
|
||||
|
||||
for pref in failed_partitions {
|
||||
let want_ids: Vec<String> = self.get_wants_for_partition(&pref.0.r#ref).to_vec();
|
||||
|
||||
for want_id in want_ids {
|
||||
let want = self.wants.remove(&want_id).expect(&format!(
|
||||
"BUG: Want {} must exist when referenced by partition",
|
||||
want_id
|
||||
));
|
||||
|
||||
let transitioned = match want {
|
||||
Want::Building(building) => {
|
||||
let failed = building
|
||||
.fail(vec![pref.0.clone()], "Partition build failed".to_string());
|
||||
newly_failed_wants.push(failed.get_id());
|
||||
Want::Failed(failed)
|
||||
}
|
||||
// Failed → Failed: add new failed partition to existing failed state
|
||||
Want::Failed(failed) => {
|
||||
Want::Failed(failed.add_failed_partitions(vec![pref.clone()]))
|
||||
}
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Want {} in invalid state {:?} when partition {} failed. Should be Building or Failed.",
|
||||
want_id, want, pref.0.r#ref
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
self.wants.insert(want_id.clone(), transitioned);
|
||||
}
|
||||
}
|
||||
|
||||
newly_failed_wants
|
||||
}
|
||||
|
||||
/// Unblock downstream wants when their upstream dependencies succeed
|
||||
/// Transitions UpstreamBuilding → Idle (when ready) or Building (when partitions already building)
|
||||
pub(crate) fn unblock_downstream_wants(
|
||||
&mut self,
|
||||
newly_successful_wants: &[SuccessfulWantId],
|
||||
job_run_id: &str,
|
||||
timestamp: u64,
|
||||
) {
|
||||
tracing::debug!(
|
||||
newly_successful_wants = ?newly_successful_wants
|
||||
.iter()
|
||||
.map(|w| &w.0)
|
||||
.collect::<Vec<_>>(),
|
||||
"Checking downstream wants for unblocking"
|
||||
);
|
||||
// Find downstream wants that are waiting for any of the newly successful wants
|
||||
// TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants
|
||||
let downstream_wants_to_check: Vec<String> = self
|
||||
.wants
|
||||
.iter()
|
||||
.filter_map(|(id, want)| {
|
||||
match want {
|
||||
Want::UpstreamBuilding(downstream_want) => {
|
||||
// Is this downstream want waiting for any of the newly successful wants?
|
||||
let is_affected =
|
||||
downstream_want.state.upstream_want_ids.iter().any(|up_id| {
|
||||
newly_successful_wants.iter().any(|swid| &swid.0 == up_id)
|
||||
});
|
||||
if is_affected { Some(id.clone()) } else { None }
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
tracing::debug!(
|
||||
downstream_wants_to_check = ?downstream_wants_to_check,
|
||||
"Found downstream wants affected by upstream completion"
|
||||
);
|
||||
|
||||
for want_id in downstream_wants_to_check {
|
||||
let want = self
|
||||
.wants
|
||||
.remove(&want_id)
|
||||
.expect(&format!("BUG: Want {} must exist", want_id));
|
||||
|
||||
let transitioned = match want {
|
||||
Want::UpstreamBuilding(downstream_want) => {
|
||||
tracing::debug!(
|
||||
want_id = %want_id,
|
||||
upstreams = ?downstream_want.state.upstream_want_ids,
|
||||
"Checking if all upstreams are satisfied"
|
||||
);
|
||||
// Check if ALL of this downstream want's upstream dependencies are now Successful
|
||||
let all_upstreams_successful = downstream_want
|
||||
.state
|
||||
.upstream_want_ids
|
||||
.iter()
|
||||
.all(|up_want_id| {
|
||||
self.wants
|
||||
.get(up_want_id)
|
||||
.map(|w| matches!(w, Want::Successful(_)))
|
||||
.unwrap_or(false)
|
||||
});
|
||||
tracing::debug!(
|
||||
want_id = %want_id,
|
||||
all_upstreams_successful = %all_upstreams_successful,
|
||||
"Upstream satisfaction check complete"
|
||||
);
|
||||
|
||||
if all_upstreams_successful {
|
||||
// Check if any of this want's partitions are still being built
|
||||
// If a job dep-missed, its partitions transitioned back to Missing
|
||||
// But other jobs might still be building other partitions for this want
|
||||
let any_partition_building =
|
||||
downstream_want.want.partitions.iter().any(|p| {
|
||||
self.get_canonical_partition(&p.r#ref)
|
||||
.map(|partition| matches!(partition, Partition::Building(_)))
|
||||
.unwrap_or(false)
|
||||
});
|
||||
tracing::debug!(
|
||||
want_id = %want_id,
|
||||
any_partition_building = %any_partition_building,
|
||||
"Partition building status check"
|
||||
);
|
||||
|
||||
if any_partition_building {
|
||||
// Some partitions still being built, continue in Building state
|
||||
tracing::info!(
|
||||
want_id = %want_id,
|
||||
job_run_id = %job_run_id,
|
||||
"Want: UpstreamBuilding → Building (upstreams satisfied, partitions building)"
|
||||
);
|
||||
Want::Building(
|
||||
downstream_want
|
||||
.continue_building(job_run_id.to_string(), timestamp),
|
||||
)
|
||||
} else {
|
||||
// No partitions being built, become schedulable again
|
||||
tracing::info!(
|
||||
want_id = %want_id,
|
||||
"Want: UpstreamBuilding → Idle (upstreams satisfied, ready to schedule)"
|
||||
);
|
||||
Want::Idle(downstream_want.upstreams_satisfied())
|
||||
}
|
||||
} else {
|
||||
// Upstreams not all satisfied yet, stay in UpstreamBuilding
|
||||
tracing::debug!(
|
||||
want_id = %want_id,
|
||||
"Want remains in UpstreamBuilding state (upstreams not yet satisfied)"
|
||||
);
|
||||
Want::UpstreamBuilding(downstream_want)
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
panic!("BUG: Want {} should be UpstreamBuilding here", want_id);
|
||||
}
|
||||
};
|
||||
|
||||
self.wants.insert(want_id, transitioned);
|
||||
}
|
||||
}
|
||||
|
||||
/// Cascade failures to downstream wants when their upstream dependencies fail
|
||||
/// Transitions UpstreamBuilding → UpstreamFailed
|
||||
pub(crate) fn cascade_failures_to_downstream_wants(
|
||||
&mut self,
|
||||
newly_failed_wants: &[FailedWantId],
|
||||
timestamp: u64,
|
||||
) {
|
||||
// Find downstream wants that are waiting for any of the newly failed wants
|
||||
// TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants
|
||||
let downstream_wants_to_fail: Vec<String> = self
|
||||
.wants
|
||||
.iter()
|
||||
.filter_map(|(id, want)| {
|
||||
match want {
|
||||
Want::UpstreamBuilding(downstream_want) => {
|
||||
// Is this downstream want waiting for any of the newly failed wants?
|
||||
let is_affected =
|
||||
downstream_want.state.upstream_want_ids.iter().any(|up_id| {
|
||||
newly_failed_wants.iter().any(|fwid| &fwid.0 == up_id)
|
||||
});
|
||||
if is_affected { Some(id.clone()) } else { None }
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
for want_id in downstream_wants_to_fail {
|
||||
let want = self
|
||||
.wants
|
||||
.remove(&want_id)
|
||||
.expect(&format!("BUG: Want {} must exist", want_id));
|
||||
|
||||
let transitioned = match want {
|
||||
Want::UpstreamBuilding(downstream_want) => Want::UpstreamFailed(
|
||||
downstream_want.upstream_failed(
|
||||
newly_failed_wants
|
||||
.iter()
|
||||
.map(|fwid| fwid.0.clone())
|
||||
.collect(),
|
||||
timestamp,
|
||||
),
|
||||
),
|
||||
_ => {
|
||||
panic!("BUG: Want {} should be UpstreamBuilding here", want_id);
|
||||
}
|
||||
};
|
||||
|
||||
self.wants.insert(want_id, transitioned);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -13,8 +13,8 @@ use lib::orchestrator::{Orchestrator, OrchestratorConfig};
|
|||
#[command(name = "databuild")]
|
||||
#[command(about = "DataBuild CLI - Build system for data pipelines", long_about = None)]
|
||||
struct Cli {
|
||||
/// Server URL (default: http://localhost:3000)
|
||||
#[arg(long, default_value = "http://localhost:3000", global = true)]
|
||||
/// Server URL
|
||||
#[arg(long, default_value = "http://localhost:3538", global = true)]
|
||||
server: String,
|
||||
|
||||
#[command(subcommand)]
|
||||
|
|
@ -26,7 +26,7 @@ enum Commands {
|
|||
/// Start the DataBuild HTTP server
|
||||
Serve {
|
||||
/// Port to listen on
|
||||
#[arg(long, default_value = "3000")]
|
||||
#[arg(long, default_value = "3538")]
|
||||
port: u16,
|
||||
|
||||
/// Database URL (default: :memory: for in-memory SQLite)
|
||||
|
|
|
|||
|
|
@ -1,17 +1,23 @@
|
|||
use crate::build_event_log::BELStorage;
|
||||
use crate::build_state::BuildState;
|
||||
use crate::commands::Command;
|
||||
use crate::web::templates::{
|
||||
BaseContext, HomePage, JobRunDetailPage, JobRunDetailView, JobRunsListPage,
|
||||
PartitionDetailPage, PartitionDetailView, PartitionsListPage, WantDetailPage, WantDetailView,
|
||||
WantsListPage,
|
||||
};
|
||||
use crate::{
|
||||
CancelWantRequest, CreateWantRequest, CreateWantResponse, GetWantRequest, GetWantResponse,
|
||||
ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse,
|
||||
ListWantsRequest, ListWantsResponse,
|
||||
ListWantsRequest, ListWantsResponse, PartitionStatusCode,
|
||||
};
|
||||
use askama::Template;
|
||||
use axum::{
|
||||
Json, Router,
|
||||
extract::{Path, Query, Request, State},
|
||||
http::{HeaderValue, Method, StatusCode},
|
||||
middleware::{self, Next},
|
||||
response::{IntoResponse, Response},
|
||||
response::{Html, IntoResponse, Response},
|
||||
routing::{delete, get, post},
|
||||
};
|
||||
use std::sync::{
|
||||
|
|
@ -80,7 +86,7 @@ async fn update_last_request_time(
|
|||
pub fn create_router(state: AppState) -> Router {
|
||||
// Configure CORS for web app development
|
||||
let cors = CorsLayer::new()
|
||||
.allow_origin("http://localhost:3000".parse::<HeaderValue>().unwrap())
|
||||
.allow_origin("http://localhost:3538".parse::<HeaderValue>().unwrap())
|
||||
.allow_methods([Method::GET, Method::POST, Method::DELETE, Method::OPTIONS])
|
||||
.allow_headers([
|
||||
axum::http::header::CONTENT_TYPE,
|
||||
|
|
@ -90,15 +96,21 @@ pub fn create_router(state: AppState) -> Router {
|
|||
Router::new()
|
||||
// Health check
|
||||
.route("/health", get(health))
|
||||
// Want endpoints
|
||||
.route("/api/wants", get(list_wants))
|
||||
// HTML pages
|
||||
.route("/", get(home_page))
|
||||
.route("/wants", get(wants_list_page))
|
||||
.route("/wants/:id", get(want_detail_page))
|
||||
.route("/partitions", get(partitions_list_page))
|
||||
.route("/partitions/*id", get(partition_detail_page))
|
||||
.route("/job_runs", get(job_runs_list_page))
|
||||
.route("/job_runs/:id", get(job_run_detail_page))
|
||||
// JSON API endpoints
|
||||
.route("/api/wants", get(list_wants_json))
|
||||
.route("/api/wants", post(create_want))
|
||||
.route("/api/wants/:id", get(get_want))
|
||||
.route("/api/wants/:id", get(get_want_json))
|
||||
.route("/api/wants/:id", delete(cancel_want))
|
||||
// Partition endpoints
|
||||
.route("/api/partitions", get(list_partitions))
|
||||
// Job run endpoints
|
||||
.route("/api/job_runs", get(list_job_runs))
|
||||
.route("/api/partitions", get(list_partitions_json))
|
||||
.route("/api/job_runs", get(list_job_runs_json))
|
||||
// Add CORS middleware
|
||||
.layer(cors)
|
||||
// Add middleware to track request time
|
||||
|
|
@ -138,7 +150,251 @@ impl ErrorResponse {
|
|||
}
|
||||
|
||||
// ============================================================================
|
||||
// Handlers
|
||||
// HTML Page Handlers
|
||||
// ============================================================================
|
||||
|
||||
/// Home page
|
||||
async fn home_page(State(state): State<AppState>) -> impl IntoResponse {
|
||||
let build_state = match state.build_state.read() {
|
||||
Ok(s) => s,
|
||||
Err(_) => return Html("<h1>Error: state lock poisoned</h1>".to_string()).into_response(),
|
||||
};
|
||||
|
||||
// Count active wants (not successful or canceled)
|
||||
let active_wants_count = build_state
|
||||
.list_wants(&ListWantsRequest::default())
|
||||
.data
|
||||
.iter()
|
||||
.filter(|w| {
|
||||
w.status
|
||||
.as_ref()
|
||||
.map(|s| s.name != "Successful" && s.name != "Canceled")
|
||||
.unwrap_or(true)
|
||||
})
|
||||
.count() as u64;
|
||||
|
||||
// Count active job runs (running or queued)
|
||||
let active_job_runs_count = build_state
|
||||
.list_job_runs(&ListJobRunsRequest::default())
|
||||
.data
|
||||
.iter()
|
||||
.filter(|jr| {
|
||||
jr.status
|
||||
.as_ref()
|
||||
.map(|s| s.name == "Running" || s.name == "Queued")
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.count() as u64;
|
||||
|
||||
// Count live partitions
|
||||
let live_partitions_count = build_state
|
||||
.list_partitions(&ListPartitionsRequest::default())
|
||||
.data
|
||||
.iter()
|
||||
.filter(|p| {
|
||||
p.status
|
||||
.as_ref()
|
||||
.map(|s| s.code == PartitionStatusCode::PartitionLive as i32)
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.count() as u64;
|
||||
|
||||
let template = HomePage {
|
||||
base: BaseContext::default(),
|
||||
active_wants_count,
|
||||
active_job_runs_count,
|
||||
live_partitions_count,
|
||||
};
|
||||
|
||||
match template.render() {
|
||||
Ok(html) => Html(html).into_response(),
|
||||
Err(e) => {
|
||||
tracing::error!("Template render error: {}", e);
|
||||
Html(format!("<h1>Template error: {}</h1>", e)).into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Wants list page
|
||||
async fn wants_list_page(
|
||||
State(state): State<AppState>,
|
||||
Query(params): Query<ListWantsRequest>,
|
||||
) -> impl IntoResponse {
|
||||
let build_state = match state.build_state.read() {
|
||||
Ok(s) => s,
|
||||
Err(_) => return Html("<h1>Error: state lock poisoned</h1>".to_string()).into_response(),
|
||||
};
|
||||
|
||||
let response = build_state.list_wants(¶ms);
|
||||
let template = WantsListPage {
|
||||
base: BaseContext::default(),
|
||||
wants: response
|
||||
.data
|
||||
.into_iter()
|
||||
.map(WantDetailView::from)
|
||||
.collect(),
|
||||
page: response.page,
|
||||
page_size: response.page_size,
|
||||
total_count: response.match_count,
|
||||
};
|
||||
|
||||
match template.render() {
|
||||
Ok(html) => Html(html).into_response(),
|
||||
Err(e) => {
|
||||
tracing::error!("Template render error: {}", e);
|
||||
Html(format!("<h1>Template error: {}</h1>", e)).into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Want detail page
|
||||
async fn want_detail_page(
|
||||
State(state): State<AppState>,
|
||||
Path(want_id): Path<String>,
|
||||
) -> impl IntoResponse {
|
||||
let build_state = match state.build_state.read() {
|
||||
Ok(s) => s,
|
||||
Err(_) => return Html("<h1>Error: state lock poisoned</h1>".to_string()).into_response(),
|
||||
};
|
||||
|
||||
match build_state.get_want(&want_id) {
|
||||
Some(want) => {
|
||||
let template = WantDetailPage {
|
||||
base: BaseContext::default(),
|
||||
want: WantDetailView::from(want),
|
||||
};
|
||||
match template.render() {
|
||||
Ok(html) => Html(html).into_response(),
|
||||
Err(e) => Html(format!("<h1>Template error: {}</h1>", e)).into_response(),
|
||||
}
|
||||
}
|
||||
None => (
|
||||
StatusCode::NOT_FOUND,
|
||||
Html("<h1>Want not found</h1>".to_string()),
|
||||
)
|
||||
.into_response(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Partitions list page
|
||||
async fn partitions_list_page(
|
||||
State(state): State<AppState>,
|
||||
Query(params): Query<ListPartitionsRequest>,
|
||||
) -> impl IntoResponse {
|
||||
let build_state = match state.build_state.read() {
|
||||
Ok(s) => s,
|
||||
Err(_) => return Html("<h1>Error: state lock poisoned</h1>".to_string()).into_response(),
|
||||
};
|
||||
|
||||
let response = build_state.list_partitions(¶ms);
|
||||
let template = PartitionsListPage {
|
||||
base: BaseContext::default(),
|
||||
partitions: response
|
||||
.data
|
||||
.into_iter()
|
||||
.map(PartitionDetailView::from)
|
||||
.collect(),
|
||||
page: response.page,
|
||||
page_size: response.page_size,
|
||||
total_count: response.match_count,
|
||||
};
|
||||
|
||||
match template.render() {
|
||||
Ok(html) => Html(html).into_response(),
|
||||
Err(e) => Html(format!("<h1>Template error: {}</h1>", e)).into_response(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Partition detail page
|
||||
async fn partition_detail_page(
|
||||
State(state): State<AppState>,
|
||||
Path(partition_ref): Path<String>,
|
||||
) -> impl IntoResponse {
|
||||
let build_state = match state.build_state.read() {
|
||||
Ok(s) => s,
|
||||
Err(_) => return Html("<h1>Error: state lock poisoned</h1>".to_string()).into_response(),
|
||||
};
|
||||
|
||||
// Axum's Path extractor automatically percent-decodes the path parameter
|
||||
match build_state.get_partition(&partition_ref) {
|
||||
Some(partition) => {
|
||||
let template = PartitionDetailPage {
|
||||
base: BaseContext::default(),
|
||||
partition: PartitionDetailView::from(partition),
|
||||
};
|
||||
match template.render() {
|
||||
Ok(html) => Html(html).into_response(),
|
||||
Err(e) => Html(format!("<h1>Template error: {}</h1>", e)).into_response(),
|
||||
}
|
||||
}
|
||||
None => (
|
||||
StatusCode::NOT_FOUND,
|
||||
Html("<h1>Partition not found</h1>".to_string()),
|
||||
)
|
||||
.into_response(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Job runs list page
|
||||
async fn job_runs_list_page(
|
||||
State(state): State<AppState>,
|
||||
Query(params): Query<ListJobRunsRequest>,
|
||||
) -> impl IntoResponse {
|
||||
let build_state = match state.build_state.read() {
|
||||
Ok(s) => s,
|
||||
Err(_) => return Html("<h1>Error: state lock poisoned</h1>".to_string()).into_response(),
|
||||
};
|
||||
|
||||
let response = build_state.list_job_runs(¶ms);
|
||||
let template = JobRunsListPage {
|
||||
base: BaseContext::default(),
|
||||
job_runs: response
|
||||
.data
|
||||
.into_iter()
|
||||
.map(JobRunDetailView::from)
|
||||
.collect(),
|
||||
page: response.page,
|
||||
page_size: response.page_size,
|
||||
total_count: response.match_count,
|
||||
};
|
||||
|
||||
match template.render() {
|
||||
Ok(html) => Html(html).into_response(),
|
||||
Err(e) => Html(format!("<h1>Template error: {}</h1>", e)).into_response(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Job run detail page
|
||||
async fn job_run_detail_page(
|
||||
State(state): State<AppState>,
|
||||
Path(job_run_id): Path<String>,
|
||||
) -> impl IntoResponse {
|
||||
let build_state = match state.build_state.read() {
|
||||
Ok(s) => s,
|
||||
Err(_) => return Html("<h1>Error: state lock poisoned</h1>".to_string()).into_response(),
|
||||
};
|
||||
|
||||
match build_state.get_job_run(&job_run_id) {
|
||||
Some(job_run) => {
|
||||
let template = JobRunDetailPage {
|
||||
base: BaseContext::default(),
|
||||
job_run: JobRunDetailView::from(job_run),
|
||||
};
|
||||
match template.render() {
|
||||
Ok(html) => Html(html).into_response(),
|
||||
Err(e) => Html(format!("<h1>Template error: {}</h1>", e)).into_response(),
|
||||
}
|
||||
}
|
||||
None => (
|
||||
StatusCode::NOT_FOUND,
|
||||
Html("<h1>Job run not found</h1>".to_string()),
|
||||
)
|
||||
.into_response(),
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// JSON API Handlers
|
||||
// ============================================================================
|
||||
|
||||
/// Health check endpoint
|
||||
|
|
@ -146,12 +402,11 @@ async fn health() -> impl IntoResponse {
|
|||
(StatusCode::OK, "OK")
|
||||
}
|
||||
|
||||
/// List all wants
|
||||
async fn list_wants(
|
||||
/// List all wants (JSON)
|
||||
async fn list_wants_json(
|
||||
State(state): State<AppState>,
|
||||
Query(params): Query<ListWantsRequest>,
|
||||
) -> impl IntoResponse {
|
||||
// Read from shared build state
|
||||
let build_state = match state.build_state.read() {
|
||||
Ok(state) => state,
|
||||
Err(e) => {
|
||||
|
|
@ -170,9 +425,11 @@ async fn list_wants(
|
|||
(StatusCode::OK, Json(response)).into_response()
|
||||
}
|
||||
|
||||
/// Get a specific want by ID
|
||||
async fn get_want(State(state): State<AppState>, Path(want_id): Path<String>) -> impl IntoResponse {
|
||||
// Read from shared build state
|
||||
/// Get a specific want by ID (JSON)
|
||||
async fn get_want_json(
|
||||
State(state): State<AppState>,
|
||||
Path(want_id): Path<String>,
|
||||
) -> impl IntoResponse {
|
||||
let build_state = match state.build_state.read() {
|
||||
Ok(state) => state,
|
||||
Err(e) => {
|
||||
|
|
@ -320,12 +577,11 @@ async fn cancel_want(
|
|||
}
|
||||
}
|
||||
|
||||
/// List all partitions
|
||||
async fn list_partitions(
|
||||
/// List all partitions (JSON)
|
||||
async fn list_partitions_json(
|
||||
State(state): State<AppState>,
|
||||
Query(params): Query<ListPartitionsRequest>,
|
||||
) -> impl IntoResponse {
|
||||
// Read from shared build state
|
||||
let build_state = match state.build_state.read() {
|
||||
Ok(state) => state,
|
||||
Err(e) => {
|
||||
|
|
@ -344,12 +600,11 @@ async fn list_partitions(
|
|||
(StatusCode::OK, Json(response)).into_response()
|
||||
}
|
||||
|
||||
/// List all job runs
|
||||
async fn list_job_runs(
|
||||
/// List all job runs (JSON)
|
||||
async fn list_job_runs_json(
|
||||
State(state): State<AppState>,
|
||||
Query(params): Query<ListJobRunsRequest>,
|
||||
) -> impl IntoResponse {
|
||||
// Read from shared build state
|
||||
let build_state = match state.build_state.read() {
|
||||
Ok(state) => state,
|
||||
Err(e) => {
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ pub mod orchestrator;
|
|||
mod partition_state;
|
||||
mod util;
|
||||
mod want_state;
|
||||
pub mod web;
|
||||
|
||||
// Include generated protobuf code
|
||||
include!("databuild.rs");
|
||||
|
|
|
|||
6
databuild/web/mod.rs
Normal file
6
databuild/web/mod.rs
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
//! Web templates and handlers for the DataBuild dashboard
|
||||
//!
|
||||
//! Server-side rendered HTML using Askama templates with CSS View Transitions
|
||||
//! for smooth navigation. No JavaScript framework required.
|
||||
|
||||
pub mod templates;
|
||||
1316
databuild/web/templates.rs
Normal file
1316
databuild/web/templates.rs
Normal file
File diff suppressed because it is too large
Load diff
|
|
@ -9,9 +9,9 @@ The service provides two primary capabilities:
|
|||
|
||||
## Correctness Strategy
|
||||
- Rely on databuild.proto, call same shared code in core
|
||||
- Fully asserted type safety from core to service to web app
|
||||
- Core -- databuild.proto --> service -- openapi --> web app
|
||||
- No magic strings (how? protobuf doesn't have consts. enums values? code gen over yaml?)
|
||||
- Fully asserted type safety from core to service to web app via Askama compile-time template checking
|
||||
- `databuild.proto` → prost (Rust structs) → Askama templates (compile-checked field access) → HTML
|
||||
- No magic strings: enum variants and field names are checked at compile time
|
||||
|
||||
## Cross-Graph Coordination
|
||||
Services expose the `GraphService` API for cross-graph dependency management:
|
||||
|
|
@ -29,11 +29,39 @@ needed by the [web app](#web-app).
|
|||
[Notes about details, context, and askama views.](https://claude.ai/share/76622c1c-7489-496e-be81-a64fef24e636)
|
||||
|
||||
## Web App
|
||||
The web app visualizes databuild application state via features like listing past builds, job statistics,
|
||||
partition liveness, build request status, etc. This section specifies the hierarchy of functions of the web app. Pages
|
||||
The web app visualizes databuild application state via features like listing past builds, job statistics,
|
||||
partition liveness, build request status, etc. This section specifies the hierarchy of functions of the web app. Pages
|
||||
are described in visual order (generally top to bottom).
|
||||
|
||||
General requirements:
|
||||
### Implementation Strategy: Pure MPA with View Transitions
|
||||
|
||||
The web app is a traditional multi-page application (MPA) using server-side rendering. No JavaScript framework.
|
||||
|
||||
**Stack:**
|
||||
- **Askama**: Compile-time checked HTML templates in Rust
|
||||
- **CSS View Transitions API**: Smooth navigation animations between pages
|
||||
- **Plain HTML forms**: For mutations (create want, cancel job, etc.)
|
||||
|
||||
**Why this approach:**
|
||||
- **Fast**: No JS bundle to download/parse; HTML streams directly from server
|
||||
- **Simple**: Single Rust codebase; no frontend build pipeline
|
||||
- **Type-safe**: Askama checks template field access at compile time
|
||||
- **Easy to change**: Templates are just HTML; Rust compiler catches breaking changes
|
||||
|
||||
**View Transitions** (supported in Chrome 126+, Safari 18.2+, Firefox 144+) provide SPA-like smooth
|
||||
navigation without JavaScript. The browser snapshots the old page, loads the new one, and animates
|
||||
matching elements:
|
||||
|
||||
```css
|
||||
@view-transition { navigation: auto; }
|
||||
.want-card { view-transition-name: want-card; }
|
||||
```
|
||||
|
||||
**Trade-off**: No partial page updates. Pagination and navigation reload the full page, but view
|
||||
transitions make this feel instant. For live updates, users refresh manually or we add
|
||||
`<meta http-equiv="refresh">` on status pages.
|
||||
|
||||
### General Requirements
|
||||
- Nav at top of page
|
||||
- DataBuild logo in top left
|
||||
- Navigation links at the top allowing navigation to each list page:
|
||||
|
|
@ -43,9 +71,7 @@ General requirements:
|
|||
- Triggers list page
|
||||
- Build event log page
|
||||
- Graph label at top right
|
||||
- Search box for finding builds, jobs, and partitions (needs a new service API?)
|
||||
|
||||
The site is implemented via Aksama templating and HTMX for dynamic updates.
|
||||
- Search box for finding builds, jobs, and partitions
|
||||
|
||||
### Home Page
|
||||
Jumping off point to navigate and build.
|
||||
|
|
|
|||
|
|
@ -8,26 +8,25 @@ Wants declare intent to have partitions exist. The graph continuously reconciles
|
|||
|
||||
## Want Identity
|
||||
|
||||
Wants are idempotent through deterministic ID generation:
|
||||
Wants use UUID-based identity:
|
||||
|
||||
```protobuf
|
||||
message PartitionWant {
|
||||
string want_id = 1; // Hash(partition_ref + data_timestamp + source)
|
||||
string root_want_id = 2; // Original user want
|
||||
string parent_want_id = 3; // Want that triggered this
|
||||
PartitionRef partition_ref = 4;
|
||||
uint64 data_timestamp = 5; // Business time (e.g., "2024-01-01" → midnight UTC)
|
||||
uint64 ttl_seconds = 6; // From data_timestamp
|
||||
uint64 sla_seconds = 7; // From data_timestamp
|
||||
WantSource source = 8;
|
||||
message WantCreateEventV1 {
|
||||
string want_id = 1; // UUID generated at creation time
|
||||
repeated PartitionRef partitions = 2; // Partitions this want requests
|
||||
uint64 data_timestamp = 3; // Business time (e.g., "2024-01-01" → midnight UTC)
|
||||
uint64 ttl_seconds = 4; // From data_timestamp
|
||||
uint64 sla_seconds = 5; // From data_timestamp
|
||||
EventSource source = 6; // Origin: job, API, CLI, web app...
|
||||
optional string comment = 7;
|
||||
}
|
||||
```
|
||||
|
||||
Multiple identical want requests produce the same `want_id`, preventing duplication.
|
||||
Want IDs are UUIDs generated at creation time. Duplicate prevention is handled at the scheduling layer: the orchestrator checks canonical partition state before scheduling jobs, so multiple wants for the same partition simply observe the same build progress rather than triggering redundant work.
|
||||
|
||||
## Execution Flow
|
||||
|
||||
1. **Want Registration**: User/trigger creates wants with deterministic IDs
|
||||
1. **Want Registration**: User/trigger creates wants (UUIDs assigned at creation)
|
||||
2. **Immediate Dispatch**: Graph attempts execution without checking dependencies
|
||||
3. **Runtime Discovery**: Jobs fail with `MissingDependenciesError(partitions)`
|
||||
4. **Want Propagation**: Graph creates upstream wants from missing dependencies
|
||||
|
|
|
|||
74
scripts/run_multihop_example.sh
Executable file
74
scripts/run_multihop_example.sh
Executable file
|
|
@ -0,0 +1,74 @@
|
|||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
# Navigate to repository root
|
||||
cd "$(dirname "$0")/.."
|
||||
|
||||
# Configuration
|
||||
PORT=3050
|
||||
DB_PATH="/tmp/databuild_multihop.db"
|
||||
FLAG_FILE="/tmp/databuild_multihop_alpha_complete"
|
||||
PID_FILE="/tmp/databuild_multihop.pid"
|
||||
|
||||
echo "=== DataBuild Multi-Hop Example ==="
|
||||
echo
|
||||
|
||||
# Clean up previous state
|
||||
echo "Cleaning up previous state..."
|
||||
rm -f "$DB_PATH" "$FLAG_FILE" "$PID_FILE"
|
||||
pkill -f "databuild.*serve.*port $PORT" || true
|
||||
sleep 1
|
||||
|
||||
# Build the binary
|
||||
echo "Building databuild CLI..."
|
||||
bazel build //databuild:databuild
|
||||
|
||||
# Start the server in background
|
||||
echo "Starting databuild server on port $PORT..."
|
||||
./bazel-bin/databuild/databuild serve \
|
||||
--port $PORT \
|
||||
--database "$DB_PATH" \
|
||||
--config examples/multihop/config.json &
|
||||
|
||||
SERVER_PID=$!
|
||||
echo $SERVER_PID > "$PID_FILE"
|
||||
echo "Server started with PID $SERVER_PID"
|
||||
|
||||
# Wait for server to be ready
|
||||
echo "Waiting for server to start..."
|
||||
sleep 2
|
||||
|
||||
# Test server health
|
||||
if curl -s http://localhost:$PORT/health > /dev/null 2>&1; then
|
||||
echo "Server is ready!"
|
||||
else
|
||||
echo "WARNING: Server health check failed, but continuing..."
|
||||
fi
|
||||
|
||||
echo
|
||||
echo "=== Server is running ==="
|
||||
echo
|
||||
echo "You can now interact with the server:"
|
||||
echo
|
||||
echo " # Create a want for data/beta (triggers dependency chain)"
|
||||
echo " ./bazel-bin/databuild/databuild --server http://localhost:$PORT want data/beta"
|
||||
echo
|
||||
echo " # Monitor wants"
|
||||
echo " ./bazel-bin/databuild/databuild --server http://localhost:$PORT wants list"
|
||||
echo
|
||||
echo " # Monitor job runs"
|
||||
echo " ./bazel-bin/databuild/databuild --server http://localhost:$PORT job-runs list"
|
||||
echo
|
||||
echo " # Monitor partitions"
|
||||
echo " ./bazel-bin/databuild/databuild --server http://localhost:$PORT partitions list"
|
||||
echo
|
||||
echo "To stop the server:"
|
||||
echo " kill $SERVER_PID"
|
||||
echo " # or: pkill -f 'databuild.*serve.*port $PORT'"
|
||||
echo
|
||||
echo "Server logs will appear below. Press Ctrl+C to stop."
|
||||
echo "=========================================="
|
||||
echo
|
||||
|
||||
# Wait for the server process
|
||||
wait $SERVER_PID
|
||||
Loading…
Reference in a new issue