Compare commits

..

11 commits

19 changed files with 5073 additions and 284 deletions

View file

@ -1,24 +1,18 @@
# Python Deps
load("@rules_python//python:pip.bzl", "compile_pip_requirements")
filegroup(
name = "jq",
srcs = ["//databuild/runtime:jq"],
visibility = ["//visibility:public"],
)
# Export the E2E test runner script
exports_files(["run_e2e_tests.sh"])
# End-to-End Test Runner
sh_binary(
name = "run_e2e_tests",
srcs = ["run_e2e_tests.sh"],
data = [
"//tests/end_to_end:test_utils",
],
visibility = ["//visibility:public"],
)
## Export the E2E test runner script
#exports_files(["run_e2e_tests.sh"])
#
## End-to-End Test Runner
#sh_binary(
# name = "run_e2e_tests",
# srcs = ["run_e2e_tests.sh"],
# data = [
# "//tests/end_to_end:test_utils",
# ],
# visibility = ["//visibility:public"],
#)
# `bazel run //:requirements.update` will regenerate the requirements_txt file
compile_pip_requirements(

View file

@ -65,6 +65,46 @@ crate.spec(
package = "regex",
version = "1.10",
)
crate.spec(
features = ["full"],
package = "tokio",
version = "1.0",
)
crate.spec(
package = "axum",
version = "0.7",
)
crate.spec(
package = "tower",
version = "0.4",
)
crate.spec(
features = ["trace", "cors"],
package = "tower-http",
version = "0.5",
)
crate.spec(
package = "tracing",
version = "0.1",
)
crate.spec(
package = "tracing-subscriber",
version = "0.3",
)
crate.spec(
features = ["derive"],
package = "clap",
version = "4.0",
)
crate.spec(
features = ["blocking", "json"],
package = "reqwest",
version = "0.11",
)
crate.spec(
package = "toml",
version = "0.8",
)
crate.from_specs()
use_repo(crate, "crates")

File diff suppressed because one or more lines are too long

View file

@ -18,13 +18,14 @@ rust_binary(
# DataBuild library using generated prost code
rust_library(
name = "databuild",
name = "lib",
srcs = glob(["**/*.rs"]) + [
":generate_databuild_rust",
],
edition = "2021",
visibility = ["//visibility:public"],
deps = [
"@crates//:axum",
"@crates//:prost",
"@crates//:prost-types",
"@crates//:regex",
@ -32,17 +33,40 @@ rust_library(
"@crates//:schemars",
"@crates//:serde",
"@crates//:serde_json",
"@crates//:tokio",
"@crates//:toml",
"@crates//:tower",
"@crates//:tower-http",
"@crates//:tracing",
"@crates//:uuid",
],
)
rust_test(
name = "databuild_test",
crate = ":databuild",
crate = ":lib",
data = ["//databuild/test:test_job_helper"],
env = {"RUST_BACKTRACE": "1"},
)
# DataBuild CLI binary
rust_binary(
name = "databuild",
srcs = ["cli_main.rs"],
edition = "2021",
visibility = ["//visibility:public"],
deps = [
":lib",
"@crates//:axum",
"@crates//:clap",
"@crates//:reqwest",
"@crates//:serde_json",
"@crates//:tokio",
"@crates//:tracing",
"@crates//:tracing-subscriber",
],
)
# Legacy filegroup for backwards compatibility
filegroup(
name = "proto",

View file

@ -11,15 +11,18 @@ use crate::{
use prost::Message;
use rusqlite::Connection;
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
pub trait BELStorage {
pub trait BELStorage: Send + Sync {
fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError>;
fn list_events(
&self,
since_idx: u64,
limit: u64,
) -> Result<Vec<DataBuildEvent>, DatabuildError>;
fn get_event(&self, event_id: u64) -> Result<Option<DataBuildEvent>, DatabuildError>;
fn latest_event_id(&self) -> Result<u64, DatabuildError>;
}
#[derive(Debug, Clone)]
@ -64,15 +67,23 @@ impl BELStorage for MemoryBELStorage {
.take(limit as usize)
.collect())
}
fn get_event(&self, event_id: u64) -> Result<Option<DataBuildEvent>, DatabuildError> {
Ok(self.events.iter().find(|e| e.event_id == event_id).cloned())
}
fn latest_event_id(&self) -> Result<u64, DatabuildError> {
Ok(self.events.len().saturating_sub(1) as u64)
}
}
#[derive(Debug)]
struct SqliteBELStorage {
connection: Connection,
#[derive(Debug, Clone)]
pub struct SqliteBELStorage {
connection: Arc<Mutex<Connection>>,
}
impl SqliteBELStorage {
fn create(database_url: &str) -> Result<SqliteBELStorage, DatabuildError> {
pub fn create(database_url: &str) -> Result<SqliteBELStorage, DatabuildError> {
let connection = Connection::open(database_url)?;
// Create the events table
@ -85,7 +96,9 @@ impl SqliteBELStorage {
(),
)?;
Ok(SqliteBELStorage { connection })
Ok(SqliteBELStorage {
connection: Arc::new(Mutex::new(connection)),
})
}
}
@ -105,12 +118,17 @@ impl BELStorage for SqliteBELStorage {
let mut buf = Vec::new();
prost::Message::encode(&dbe, &mut buf)?;
self.connection.execute(
let connection = self
.connection
.lock()
.map_err(|e| format!("Failed to acquire lock: {}", e))?;
connection.execute(
"INSERT INTO events (timestamp, event_data) VALUES (?1, ?2)",
(&timestamp, &buf),
)?;
let event_id = self.connection.last_insert_rowid() as u64;
let event_id = connection.last_insert_rowid() as u64;
Ok(event_id)
}
@ -119,7 +137,12 @@ impl BELStorage for SqliteBELStorage {
since_idx: u64,
limit: u64,
) -> Result<Vec<DataBuildEvent>, DatabuildError> {
let mut stmt = self.connection.prepare(
let connection = self
.connection
.lock()
.map_err(|e| format!("Failed to acquire lock: {}", e))?;
let mut stmt = connection.prepare(
"SELECT event_id, timestamp, event_data FROM events
WHERE timestamp > ?1
ORDER BY event_id
@ -156,22 +179,93 @@ impl BELStorage for SqliteBELStorage {
Ok(events)
}
fn get_event(&self, event_id: u64) -> Result<Option<DataBuildEvent>, DatabuildError> {
let connection = self
.connection
.lock()
.map_err(|e| format!("Failed to acquire lock: {}", e))?;
let mut stmt = connection
.prepare("SELECT event_id, timestamp, event_data FROM events WHERE event_id = ?1")?;
let result = stmt.query_row([event_id], |row| {
let event_id: u64 = row.get(0)?;
let timestamp: u64 = row.get(1)?;
let event_data: Vec<u8> = row.get(2)?;
// Deserialize the event using prost
let mut dbe = DataBuildEvent::decode(event_data.as_slice()).map_err(|_e| {
rusqlite::Error::InvalidColumnType(
0,
"event_data".to_string(),
rusqlite::types::Type::Blob,
)
})?;
// Update the event_id from the database
dbe.event_id = event_id;
dbe.timestamp = timestamp;
Ok(dbe)
});
match result {
Ok(event) => Ok(Some(event)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
fn latest_event_id(&self) -> Result<u64, DatabuildError> {
let connection = self
.connection
.lock()
.map_err(|e| format!("Failed to acquire lock: {}", e))?;
let result: Result<u64, rusqlite::Error> =
connection.query_row("SELECT MAX(event_id) FROM events", [], |row| row.get(0));
match result {
Ok(id) => Ok(id),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0),
Err(e) => Err(e.into()),
}
}
}
#[derive(Debug, Default)]
pub struct BuildEventLog<S: BELStorage + Debug> {
pub storage: S,
pub state: BuildState,
/// Optional event broadcaster for HTTP server mirroring
#[cfg_attr(not(feature = "server"), allow(dead_code))]
pub event_broadcaster: Option<tokio::sync::broadcast::Sender<Event>>,
}
impl<S: BELStorage + Debug> BuildEventLog<S> {
pub fn new(storage: S, state: BuildState) -> BuildEventLog<S> {
BuildEventLog { storage, state }
BuildEventLog {
storage,
state,
event_broadcaster: None,
}
}
pub fn with_broadcaster(mut self, broadcaster: tokio::sync::broadcast::Sender<Event>) -> Self {
self.event_broadcaster = Some(broadcaster);
self
}
pub fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> {
let events = self.state.handle_event(&event);
let idx = self.storage.append_event(event)?;
// Broadcast event to HTTP server (if configured)
if let Some(ref tx) = self.event_broadcaster {
let _ = tx.send(event.clone());
}
// Recursion here might be dangerous, but in theory the event propagation always terminates
for event in events {
self.append_event(&event)?;
@ -179,6 +273,13 @@ impl<S: BELStorage + Debug> BuildEventLog<S> {
Ok(idx)
}
pub fn append_event_no_recurse(&mut self, event: &Event) -> Result<u64, DatabuildError> {
self.state.handle_event(&event);
let idx = self.storage.append_event(event)?;
// Recursion here might be dangerous, but in theory the event propagation always terminates
Ok(idx)
}
// API methods
pub fn api_handle_list_wants(&self, req: ListWantsRequest) -> ListWantsResponse {
self.state.list_wants(&req)
@ -244,6 +345,7 @@ impl Clone for BuildEventLog<MemoryBELStorage> {
Self {
storage: self.storage.clone(),
state: self.state.clone(),
event_broadcaster: self.event_broadcaster.clone(),
}
}
}
@ -261,7 +363,11 @@ mod tests {
let storage =
SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage");
let state = BuildState::default();
let mut log = BuildEventLog { storage, state };
let mut log = BuildEventLog {
storage,
state,
event_broadcaster: None,
};
let want_id = "sqlite_test_1234".to_string();

View file

@ -1,5 +1,6 @@
use crate::data_build_event::Event;
use crate::data_deps::{WantTimestamps, missing_deps_to_want_events};
use crate::event_source::Source as EventSourceVariant;
use crate::job_run_state::{JobRun, JobRunWithState, QueuedState as JobQueuedState};
use crate::partition_state::{
BuildingPartitionRef, FailedPartitionRef, LivePartitionRef, MissingPartitionRef, MissingState,
@ -19,6 +20,7 @@ use crate::{
};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use tracing;
/**
Design Notes
@ -45,7 +47,7 @@ critical that these state machines, their states, and their transitions are type
/// Tracks all application state, defines valid state transitions, and manages cross-state machine
/// state transitions (e.g. job run success resulting in partition going from Building to Live)
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct BuildState {
wants: BTreeMap<String, Want>, // Type-safe want storage
taints: BTreeMap<String, TaintDetail>,
@ -53,18 +55,21 @@ pub struct BuildState {
job_runs: BTreeMap<String, JobRun>, // Type-safe job run storage
}
impl Default for BuildState {
fn default() -> Self {
Self {
wants: Default::default(),
taints: Default::default(),
partitions: Default::default(),
job_runs: Default::default(),
}
}
}
impl BuildState {
/// Reconstruct BuildState from a sequence of events (for read path in web server)
/// This allows the web server to rebuild state from BEL storage without holding a lock
pub fn from_events(events: &[crate::DataBuildEvent]) -> Self {
let mut state = BuildState::default();
for event in events {
if let Some(ref inner_event) = event.event {
// handle_event returns Vec<Event> for cascading events, but we ignore them
// since we're replaying from a complete event log
state.handle_event(inner_event);
}
}
state
}
pub fn count_job_runs(&self) -> usize {
self.job_runs.len()
}
@ -86,6 +91,123 @@ impl BuildState {
}
}
/// Handle creation of a derivative want (created due to job dep miss)
///
/// When a job reports missing dependencies, it returns WantCreateV1 events for those missing partitions.
/// Those events get appended to the BEL and eventually processed by handle_want_create().
///
/// This function is called when we detect a derivative want (has source.job_triggered) and transitions
/// the impacted wants to UpstreamBuilding state, tracking the derivative want ID as an upstream dependency.
///
/// KEY INSIGHT: We must use the actual want_id from the WantCreateV1 event, not synthetic UUIDs generated
/// during event processing. This ensures replay works correctly - the same want IDs are used both during
/// original execution and during replay from the BEL.
fn handle_derivative_want_creation(
&mut self,
derivative_want_id: &str,
derivative_want_partitions: &[PartitionRef],
source_job_run_id: &str,
) {
// Look up the job run that triggered this derivative want
// This job run must be in DepMiss state because it reported missing dependencies
let job_run = self.job_runs.get(source_job_run_id).expect(&format!(
"BUG: Job run {} must exist when derivative want created",
source_job_run_id
));
// Extract the missing deps from the DepMiss job run
let missing_deps = match job_run {
JobRun::DepMiss(dep_miss) => dep_miss.get_missing_deps(),
_ => {
panic!(
"BUG: Job run {} must be in DepMiss state when derivative want created, found {:?}",
source_job_run_id, job_run
);
}
};
// Find which MissingDeps entry corresponds to this derivative want
// The derivative want was created for a specific set of missing partitions,
// and we need to find which downstream partitions are impacted by those missing partitions
for md in missing_deps {
// Check if this derivative want's partitions match the missing partitions in this entry
// We need exact match because one dep miss event can create multiple derivative wants
let partitions_match = md.missing.iter().all(|missing_ref| {
derivative_want_partitions
.iter()
.any(|p| p.r#ref == missing_ref.r#ref)
}) && derivative_want_partitions.len() == md.missing.len();
if partitions_match {
// Now we know which partitions are impacted by this missing dependency
let impacted_partition_refs: Vec<String> =
md.impacted.iter().map(|p| p.r#ref.clone()).collect();
tracing::debug!(
derivative_want_id = %derivative_want_id,
source_job_run_id = %source_job_run_id,
missing_partitions = ?derivative_want_partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
impacted_partitions = ?impacted_partition_refs,
"Processing derivative want creation"
);
// Find all wants that include these impacted partitions
// These are the wants that need to wait for the derivative want to complete
let mut impacted_want_ids: std::collections::HashSet<String> =
std::collections::HashSet::new();
for partition_ref in &impacted_partition_refs {
if let Some(partition) = self.partitions.get(partition_ref) {
for want_id in partition.want_ids() {
impacted_want_ids.insert(want_id.clone());
}
}
}
// Transition each impacted want to UpstreamBuilding, tracking this derivative want as an upstream
for want_id in impacted_want_ids {
let want = self.wants.remove(&want_id).expect(&format!(
"BUG: Want {} must exist when processing derivative want",
want_id
));
let transitioned = match want {
Want::Building(building) => {
// First dep miss for this want: Building → UpstreamBuilding
tracing::info!(
want_id = %want_id,
derivative_want_id = %derivative_want_id,
"Want: Building → UpstreamBuilding (first missing dep detected)"
);
Want::UpstreamBuilding(
building.detect_missing_deps(vec![derivative_want_id.to_string()]),
)
}
Want::UpstreamBuilding(upstream) => {
// Additional dep miss: UpstreamBuilding → UpstreamBuilding (add another upstream)
// This can happen if multiple jobs report dep misses for different upstreams
tracing::info!(
want_id = %want_id,
derivative_want_id = %derivative_want_id,
"Want: UpstreamBuilding → UpstreamBuilding (additional upstream added)"
);
Want::UpstreamBuilding(
upstream.add_upstreams(vec![derivative_want_id.to_string()]),
)
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} when processing derivative want. Should be Building or UpstreamBuilding.",
want_id, want
);
}
};
self.wants.insert(want_id, transitioned);
}
}
}
}
/// Transition partitions from Missing to Building state
/// Used when a job run starts building partitions
fn transition_partitions_to_building(
@ -99,6 +221,11 @@ impl BuildState {
let transitioned = match partition {
// Valid: Missing -> Building
Partition::Missing(missing) => {
tracing::info!(
partition = %building_ref.0.r#ref,
job_run_id = %job_run_id,
"Partition: Missing → Building"
);
Partition::Building(missing.start_building(job_run_id.to_string()))
}
// Invalid state: partition should not already be Building, Live, Failed, or Tainted
@ -116,6 +243,11 @@ impl BuildState {
let missing = Partition::new_missing(building_ref.0.clone());
if let Partition::Missing(m) = missing {
let building = m.start_building(job_run_id.to_string());
tracing::info!(
partition = %building_ref.0.r#ref,
job_run_id = %job_run_id,
"Partition: Missing → Building (created)"
);
self.partitions
.insert(building_ref.0.r#ref.clone(), Partition::Building(building));
}
@ -140,6 +272,11 @@ impl BuildState {
// ONLY valid transition: Building -> Live
let transitioned = match partition {
Partition::Building(building) => {
tracing::info!(
partition = %pref.0.r#ref,
job_run_id = %job_run_id,
"Partition: Building → Live"
);
Partition::Live(building.complete(job_run_id.to_string(), timestamp))
}
// All other states are invalid
@ -171,6 +308,11 @@ impl BuildState {
// ONLY valid transition: Building -> Failed
let transitioned = match partition {
Partition::Building(building) => {
tracing::info!(
partition = %pref.0.r#ref,
job_run_id = %job_run_id,
"Partition: Building → Failed"
);
Partition::Failed(building.fail(job_run_id.to_string(), timestamp))
}
// All other states are invalid
@ -199,7 +341,13 @@ impl BuildState {
// Only valid transition: Building -> Missing
let transitioned = match partition {
Partition::Building(building) => Partition::Missing(building.reset_to_missing()),
Partition::Building(building) => {
tracing::info!(
partition = %building_ref.0.r#ref,
"Partition: Building → Missing (dep miss)"
);
Partition::Missing(building.reset_to_missing())
}
// All other states are invalid
_ => {
panic!(
@ -249,6 +397,11 @@ impl BuildState {
if all_partitions_live {
let successful_want =
building.complete(job_run_id.to_string(), timestamp);
tracing::info!(
want_id = %want_id,
job_run_id = %job_run_id,
"Want: Building → Successful"
);
newly_successful_wants.push(successful_want.get_id());
Want::Successful(successful_want)
} else {
@ -326,6 +479,13 @@ impl BuildState {
job_run_id: &str,
timestamp: u64,
) {
tracing::debug!(
newly_successful_wants = ?newly_successful_wants
.iter()
.map(|w| &w.0)
.collect::<Vec<_>>(),
"Checking downstream wants for unblocking"
);
// Find downstream wants that are waiting for any of the newly successful wants
// TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants
let downstream_wants_to_check: Vec<String> = self
@ -345,6 +505,10 @@ impl BuildState {
}
})
.collect();
tracing::debug!(
downstream_wants_to_check = ?downstream_wants_to_check,
"Found downstream wants affected by upstream completion"
);
for want_id in downstream_wants_to_check {
let want = self
@ -354,6 +518,11 @@ impl BuildState {
let transitioned = match want {
Want::UpstreamBuilding(downstream_want) => {
tracing::debug!(
want_id = %want_id,
upstreams = ?downstream_want.state.upstream_want_ids,
"Checking if all upstreams are satisfied"
);
// Check if ALL of this downstream want's upstream dependencies are now Successful
let all_upstreams_successful = downstream_want
.state
@ -365,6 +534,11 @@ impl BuildState {
.map(|w| matches!(w, Want::Successful(_)))
.unwrap_or(false)
});
tracing::debug!(
want_id = %want_id,
all_upstreams_successful = %all_upstreams_successful,
"Upstream satisfaction check complete"
);
if all_upstreams_successful {
// Check if any of this want's partitions are still being built
@ -377,19 +551,37 @@ impl BuildState {
.map(|partition| matches!(partition, Partition::Building(_)))
.unwrap_or(false)
});
tracing::debug!(
want_id = %want_id,
any_partition_building = %any_partition_building,
"Partition building status check"
);
if any_partition_building {
// Some partitions still being built, continue in Building state
tracing::info!(
want_id = %want_id,
job_run_id = %job_run_id,
"Want: UpstreamBuilding → Building (upstreams satisfied, partitions building)"
);
Want::Building(
downstream_want
.continue_building(job_run_id.to_string(), timestamp),
)
} else {
// No partitions being built, become schedulable again
tracing::info!(
want_id = %want_id,
"Want: UpstreamBuilding → Idle (upstreams satisfied, ready to schedule)"
);
Want::Idle(downstream_want.upstreams_satisfied())
}
} else {
// Upstreams not all satisfied yet, stay in UpstreamBuilding
tracing::debug!(
want_id = %want_id,
"Want remains in UpstreamBuilding state (upstreams not yet satisfied)"
);
Want::UpstreamBuilding(downstream_want)
}
}
@ -454,109 +646,6 @@ impl BuildState {
}
}
/// Build a mapping from partition references to the want IDs that will build them
/// Used to track which upstream wants a downstream want depends on after a dep miss
fn build_partition_to_want_mapping(
&self,
want_events: &[Event],
) -> std::collections::HashMap<String, String> {
let mut partition_to_want_map: std::collections::HashMap<String, String> =
std::collections::HashMap::new();
for event_item in want_events {
if let Event::WantCreateV1(want_create) = event_item {
for pref in &want_create.partitions {
partition_to_want_map.insert(pref.r#ref.clone(), want_create.want_id.clone());
}
}
}
partition_to_want_map
}
/// Collect upstream want IDs that a servicing want now depends on based on dep misses
/// Returns a deduplicated, sorted list of upstream want IDs
fn collect_upstream_want_ids(
&self,
servicing_want: &crate::WantAttributedPartitions,
missing_deps: &[crate::MissingDeps],
partition_to_want_map: &std::collections::HashMap<String, String>,
) -> Vec<String> {
let mut new_upstream_want_ids = Vec::new();
for missing_dep in missing_deps {
// Only process if this want contains an impacted partition
let is_impacted = missing_dep.impacted.iter().any(|imp| {
servicing_want
.partitions
.iter()
.any(|p| p.r#ref == imp.r#ref)
});
if is_impacted {
// For each missing partition, find the want ID that will build it
for missing_partition in &missing_dep.missing {
if let Some(want_id) = partition_to_want_map.get(&missing_partition.r#ref) {
new_upstream_want_ids.push(want_id.clone());
}
}
}
}
// Dedupe upstream want IDs (one job might report same dep multiple times)
new_upstream_want_ids.sort();
new_upstream_want_ids.dedup();
new_upstream_want_ids
}
/// Transition wants to UpstreamBuilding when they have missing dependencies
/// Handles Building → UpstreamBuilding and UpstreamBuilding → UpstreamBuilding (add upstreams)
fn transition_wants_to_upstream_building(
&mut self,
servicing_wants: &[crate::WantAttributedPartitions],
missing_deps: &[crate::MissingDeps],
partition_to_want_map: &std::collections::HashMap<String, String>,
) {
// For each want serviced by this job run, check if it was impacted by missing deps
for servicing_want in servicing_wants {
let want = self.wants.remove(&servicing_want.want_id).expect(&format!(
"BUG: Want {} must exist when serviced by job run",
servicing_want.want_id
));
// Collect the upstream want IDs that this want now depends on
let new_upstream_want_ids =
self.collect_upstream_want_ids(servicing_want, missing_deps, partition_to_want_map);
let transitioned = if !new_upstream_want_ids.is_empty() {
match want {
Want::Building(building) => {
// First dep miss for this want: Building → UpstreamBuilding
Want::UpstreamBuilding(building.detect_missing_deps(new_upstream_want_ids))
}
Want::UpstreamBuilding(upstream) => {
// Already in UpstreamBuilding, add more upstreams (self-transition)
// This can happen if multiple jobs report dep misses, or one job reports multiple dep misses
Want::UpstreamBuilding(upstream.add_upstreams(new_upstream_want_ids))
}
_ => {
panic!(
"BUG: Want {} in invalid state {:?} when job run had dep miss. Should be Building or UpstreamBuilding.",
servicing_want.want_id, want
);
}
}
} else {
// No new upstreams for this want (it wasn't impacted), keep current state
want
};
self.wants
.insert(servicing_want.want_id.clone(), transitioned);
}
}
/// Handles reacting to events, updating state, and erroring if its an invalid state transition
/// Event handlers can return vecs of events that will then be appended to the BEL
pub fn handle_event(&mut self, event: &Event) -> Vec<Event> {
@ -582,6 +671,25 @@ impl BuildState {
fn handle_want_create(&mut self, event: &WantCreateEventV1) -> Vec<Event> {
// Use From impl to create want in Idle state
let want_idle: WantWithState<WantIdleState> = event.clone().into();
// Log creation with derivative vs user-created distinction
if let Some(source) = &event.source {
if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source {
tracing::info!(
want_id = %event.want_id,
partitions = ?event.partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
source_job_run_id = %job_triggered.job_run_id,
"Want created (derivative - auto-created due to missing dependency)"
);
}
} else {
tracing::info!(
want_id = %event.want_id,
partitions = ?event.partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
"Want created (user-requested)"
);
}
self.wants
.insert(event.want_id.clone(), Want::Idle(want_idle));
@ -590,6 +698,17 @@ impl BuildState {
self.add_want_to_partition(pref, &event.want_id);
}
// If this is a derivative want (triggered by a job's dep miss), transition impacted wants to UpstreamBuilding
if let Some(source) = &event.source {
if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source {
self.handle_derivative_want_creation(
&event.want_id,
&event.partitions,
&job_triggered.job_run_id,
);
}
}
vec![]
}
@ -657,6 +776,11 @@ impl BuildState {
let transitioned = match want {
Want::Idle(idle) => {
// First job starting for this want
tracing::info!(
want_id = %wap.want_id,
job_run_id = %event.job_run_id,
"Want: Idle → Building (job scheduled)"
);
Want::Building(idle.start_building(current_timestamp()))
}
Want::Building(building) => {
@ -698,7 +822,13 @@ impl BuildState {
let running = match job_run {
// First heartbeat: Queued -> Running
JobRun::Queued(queued) => queued.start_running(current_timestamp()),
JobRun::Queued(queued) => {
tracing::info!(
job_run_id = %event.job_run_id,
"JobRun: Queued → Running"
);
queued.start_running(current_timestamp())
}
// Subsequent heartbeat: update timestamp
JobRun::Running(running) => running.heartbeat(current_timestamp()),
_ => {
@ -721,7 +851,13 @@ impl BuildState {
));
let succeeded = match job_run {
JobRun::Running(running) => running.succeed(current_timestamp()),
JobRun::Running(running) => {
tracing::info!(
job_run_id = %event.job_run_id,
"JobRun: Running → Succeeded"
);
running.succeed(current_timestamp())
}
_ => {
panic!(
"BUG: Success event received for job run {} in invalid state {:?}. Job must be Running to succeed.",
@ -766,7 +902,14 @@ impl BuildState {
));
let failed = match job_run {
JobRun::Running(running) => running.fail(current_timestamp(), event.reason.clone()),
JobRun::Running(running) => {
tracing::info!(
job_run_id = %event.job_run_id,
reason = %event.reason,
"JobRun: Running → Failed"
);
running.fail(current_timestamp(), event.reason.clone())
}
_ => {
panic!(
"BUG: Failure event received for job run {} in invalid state {:?}. Job must be Running to fail.",
@ -838,11 +981,20 @@ impl BuildState {
));
let dep_miss = match job_run {
JobRun::Running(running) => running.dep_miss(
current_timestamp(),
event.missing_deps.clone(),
event.read_deps.clone(),
),
JobRun::Running(running) => {
tracing::info!(
job_run_id = %event.job_run_id,
missing_deps = ?event.missing_deps.iter()
.flat_map(|md| md.missing.iter().map(|p| &p.r#ref))
.collect::<Vec<_>>(),
"JobRun: Running → DepMiss (missing dependencies detected)"
);
running.dep_miss(
current_timestamp(),
event.missing_deps.clone(),
event.read_deps.clone(),
)
}
_ => {
panic!(
"BUG: DepMiss event received for job run {} in invalid state {:?}. Job must be Running to hit dep miss.",
@ -864,32 +1016,26 @@ impl BuildState {
let building_refs_to_reset = dep_miss.get_building_partitions_to_reset();
self.reset_partitions_to_missing(&building_refs_to_reset);
// Create wants from dep misses
// Generate WantCreateV1 events for the missing dependencies
// These events will be returned and appended to the BEL by BuildEventLog.append_event()
let want_events = missing_deps_to_want_events(
dep_miss.get_missing_deps().to_vec(),
&event.job_run_id,
want_timestamps,
);
// Building → UpstreamBuilding OR UpstreamBuilding → UpstreamBuilding (add upstreams)
// Store the job run in DepMiss state so we can access the missing_deps later
// When the derivative WantCreateV1 events get processed by handle_want_create(),
// they will look up this job run and use handle_derivative_want_creation() to
// transition impacted wants to UpstreamBuilding with the correct want IDs.
//
// When a job reports missing dependencies, we need to:
// 1. Create new wants for the missing partitions (done above via want_events)
// 2. Transition the current want to UpstreamBuilding, tracking which upstream wants it's waiting for
// Build a map: partition_ref -> want_id that will build it
let partition_to_want_map = self.build_partition_to_want_mapping(&want_events);
// Transition servicing wants to UpstreamBuilding when they have missing dependencies
self.transition_wants_to_upstream_building(
&dep_miss.info.servicing_wants,
dep_miss.get_missing_deps(),
&partition_to_want_map,
);
// KEY: We do NOT transition wants here because the want_events have randomly generated UUIDs
// that won't match during replay. Instead, we transition wants when processing the actual
// WantCreateV1 events that get written to and read from the BEL.
self.job_runs
.insert(event.job_run_id.clone(), JobRun::DepMiss(dep_miss));
// Return derivative want events to be appended to the BEL
want_events
}
@ -1169,7 +1315,8 @@ mod tests {
mod sqlite_build_state {
mod want {
use crate::build_state::BuildState;
use crate::{WantCancelEventV1, WantCreateEventV1, WantDetail};
use crate::data_build_event::Event;
use crate::{MissingDeps, WantCancelEventV1, WantCreateEventV1, WantDetail};
#[test]
fn test_should_create_want() {
@ -1206,6 +1353,143 @@ mod tests {
Some(crate::WantStatusCode::WantCanceled.into())
);
}
#[test]
fn test_multihop_dependency_replay() {
use crate::data_build_event::Event;
use crate::{
JobRunBufferEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1,
JobRunSuccessEventV1, MissingDeps, PartitionRef, WantAttributedPartitions,
WantCreateEventV1,
};
let mut state = BuildState::default();
let mut events = vec![];
// 1. Create want for data/beta
let beta_want_id = "beta-want".to_string();
let mut create_beta = WantCreateEventV1::default();
create_beta.want_id = beta_want_id.clone();
create_beta.partitions = vec![PartitionRef {
r#ref: "data/beta".to_string(),
}];
events.push(Event::WantCreateV1(create_beta));
// 2. Queue beta job (first attempt)
let beta_job_1_id = "beta-job-1".to_string();
let mut buffer_beta_1 = JobRunBufferEventV1::default();
buffer_beta_1.job_run_id = beta_job_1_id.clone();
buffer_beta_1.job_label = "//job_beta".to_string();
buffer_beta_1.want_attributed_partitions = vec![WantAttributedPartitions {
want_id: beta_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
}];
buffer_beta_1.building_partitions = vec![PartitionRef {
r#ref: "data/beta".to_string(),
}];
events.push(Event::JobRunBufferV1(buffer_beta_1));
// 3. Beta job starts running
let mut heartbeat_beta_1 = JobRunHeartbeatEventV1::default();
heartbeat_beta_1.job_run_id = beta_job_1_id.clone();
events.push(Event::JobRunHeartbeatV1(heartbeat_beta_1));
// 4. Beta job reports missing dependency on data/alpha
let mut dep_miss_beta_1 = JobRunMissingDepsEventV1::default();
dep_miss_beta_1.job_run_id = beta_job_1_id.clone();
dep_miss_beta_1.missing_deps = vec![MissingDeps {
impacted: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
missing: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
}];
events.push(Event::JobRunMissingDepsV1(dep_miss_beta_1));
// 5. Create derivative want for data/alpha
let alpha_want_id = "alpha-want".to_string();
let mut create_alpha = WantCreateEventV1::default();
create_alpha.want_id = alpha_want_id.clone();
create_alpha.partitions = vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}];
events.push(Event::WantCreateV1(create_alpha));
// 6. Queue alpha job
let alpha_job_id = "alpha-job".to_string();
let mut buffer_alpha = JobRunBufferEventV1::default();
buffer_alpha.job_run_id = alpha_job_id.clone();
buffer_alpha.job_label = "//job_alpha".to_string();
buffer_alpha.want_attributed_partitions = vec![WantAttributedPartitions {
want_id: alpha_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}],
}];
buffer_alpha.building_partitions = vec![PartitionRef {
r#ref: "data/alpha".to_string(),
}];
events.push(Event::JobRunBufferV1(buffer_alpha));
// 7. Alpha job starts running
let mut heartbeat_alpha = JobRunHeartbeatEventV1::default();
heartbeat_alpha.job_run_id = alpha_job_id.clone();
events.push(Event::JobRunHeartbeatV1(heartbeat_alpha));
// 8. Alpha job succeeds
let mut success_alpha = JobRunSuccessEventV1::default();
success_alpha.job_run_id = alpha_job_id.clone();
events.push(Event::JobRunSuccessV1(success_alpha));
// 9. Queue beta job again (second attempt) - THIS IS THE CRITICAL MOMENT
let beta_job_2_id = "beta-job-2".to_string();
let mut buffer_beta_2 = JobRunBufferEventV1::default();
buffer_beta_2.job_run_id = beta_job_2_id.clone();
buffer_beta_2.job_label = "//job_beta".to_string();
buffer_beta_2.want_attributed_partitions = vec![WantAttributedPartitions {
want_id: beta_want_id.clone(),
partitions: vec![PartitionRef {
r#ref: "data/beta".to_string(),
}],
}];
buffer_beta_2.building_partitions = vec![PartitionRef {
r#ref: "data/beta".to_string(),
}];
events.push(Event::JobRunBufferV1(buffer_beta_2));
// 10. Beta job starts running
let mut heartbeat_beta_2 = JobRunHeartbeatEventV1::default();
heartbeat_beta_2.job_run_id = beta_job_2_id.clone();
events.push(Event::JobRunHeartbeatV1(heartbeat_beta_2));
// 11. Beta job succeeds
let mut success_beta_2 = JobRunSuccessEventV1::default();
success_beta_2.job_run_id = beta_job_2_id.clone();
events.push(Event::JobRunSuccessV1(success_beta_2));
// Process all events - this simulates replay
for event in &events {
state.handle_event(event);
}
// Verify final state
let beta_want = state.get_want(&beta_want_id).unwrap();
assert_eq!(
beta_want.status,
Some(crate::WantStatusCode::WantSuccessful.into()),
"Beta want should be successful after multi-hop dependency resolution"
);
let alpha_want = state.get_want(&alpha_want_id).unwrap();
assert_eq!(
alpha_want.status,
Some(crate::WantStatusCode::WantSuccessful.into()),
"Alpha want should be successful"
);
}
}
}
}

402
databuild/cli_main.rs Normal file
View file

@ -0,0 +1,402 @@
use clap::{Parser, Subcommand};
use reqwest::blocking::Client;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::sync::{broadcast, mpsc};
use lib::build_event_log::SqliteBELStorage;
use lib::build_state::BuildState;
use lib::config::DatabuildConfig;
use lib::http_server::{create_router, AppState};
use lib::orchestrator::{Orchestrator, OrchestratorConfig};
#[derive(Parser)]
#[command(name = "databuild")]
#[command(about = "DataBuild CLI - Build system for data pipelines", long_about = None)]
struct Cli {
/// Server URL (default: http://localhost:3000)
#[arg(long, default_value = "http://localhost:3000", global = true)]
server: String,
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
/// Start the DataBuild HTTP server
Serve {
/// Port to listen on
#[arg(long, default_value = "3000")]
port: u16,
/// Database URL (default: :memory: for in-memory SQLite)
#[arg(long, default_value = ":memory:")]
database: String,
/// Path to configuration file (JSON or TOML)
#[arg(long)]
config: Option<String>,
},
/// Create a new want (trigger partition builds)
Want {
/// Partition references to build (e.g., "data/users", "metrics/daily")
partitions: Vec<String>,
},
/// List and manage wants
Wants {
#[command(subcommand)]
command: WantsCommands,
},
/// List and inspect partitions
Partitions {
#[command(subcommand)]
command: Option<PartitionsCommands>,
},
/// List and inspect job runs
JobRuns {
#[command(subcommand)]
command: Option<JobRunsCommands>,
},
}
#[derive(Subcommand)]
enum WantsCommands {
/// List all wants
List,
}
#[derive(Subcommand)]
enum PartitionsCommands {
/// List all partitions
List,
}
#[derive(Subcommand)]
enum JobRunsCommands {
/// List all job runs
List,
}
fn main() {
let cli = Cli::parse();
match cli.command {
Commands::Serve { port, database, config } => {
cmd_serve(port, &database, config.as_deref());
}
Commands::Want { partitions } => {
cmd_want(&cli.server, partitions);
}
Commands::Wants { command } => match command {
WantsCommands::List => {
cmd_wants_list(&cli.server);
}
},
Commands::Partitions { command } => match command {
Some(PartitionsCommands::List) | None => {
cmd_partitions_list(&cli.server);
}
},
Commands::JobRuns { command } => match command {
Some(JobRunsCommands::List) | None => {
cmd_job_runs_list(&cli.server);
}
},
}
}
// ============================================================================
// Command Implementations
// ============================================================================
#[tokio::main]
async fn cmd_serve(port: u16, database: &str, config_path: Option<&str>) {
// Initialize logging
tracing_subscriber::fmt::init();
// Load configuration if provided
let jobs = if let Some(path) = config_path {
match DatabuildConfig::from_file(path) {
Ok(config) => {
println!("Loaded configuration from: {}", path);
println!(" Jobs: {}", config.jobs.len());
config.into_job_configurations()
}
Err(e) => {
eprintln!("Failed to load configuration from {}: {}", path, e);
std::process::exit(1);
}
}
} else {
Vec::new()
};
// Create SQLite BEL storage (shared between orchestrator and HTTP server)
let bel_storage = Arc::new(
SqliteBELStorage::create(database).expect("Failed to create BEL storage"),
);
// Create command channel for orchestrator communication
let (command_tx, command_rx) = mpsc::channel(100);
// Create event broadcast channel (orchestrator -> HTTP server)
let (event_tx, _event_rx) = broadcast::channel(1000);
// Create shutdown broadcast channel
let (shutdown_tx, _shutdown_rx) = broadcast::channel(1);
// Create shared mirrored build state for HTTP server
let mirrored_state = Arc::new(RwLock::new(BuildState::default()));
// Spawn state-mirror task to keep HTTP server's build state in sync
let mirror_clone = mirrored_state.clone();
let mut mirror_rx = event_tx.subscribe();
tokio::spawn(async move {
while let Ok(event) = mirror_rx.recv().await {
match mirror_clone.write() {
Ok(mut state) => {
state.handle_event(&event);
}
Err(e) => {
eprintln!("State mirror task: RwLock poisoned, cannot update state: {}", e);
break;
}
}
}
});
// Spawn orchestrator in background thread
// Note: Orchestrator needs its own BEL storage instance for writes
let orch_bel_storage = SqliteBELStorage::create(database).expect("Failed to create BEL storage");
let orch_shutdown_rx = shutdown_tx.subscribe();
let orch_handle = std::thread::spawn(move || {
// Create orchestrator with both channels and jobs from config
let config = OrchestratorConfig { jobs };
let mut orchestrator = Orchestrator::new_with_channels(
orch_bel_storage,
config,
command_rx,
event_tx,
);
let mut shutdown_rx = orch_shutdown_rx;
// Run orchestrator loop
loop {
// Check for shutdown signal
if shutdown_rx.try_recv().is_ok() {
println!("Orchestrator received shutdown signal");
break;
}
if let Err(e) = orchestrator.step() {
eprintln!("Orchestrator error: {}", e);
}
// Small sleep to avoid busy-waiting
std::thread::sleep(std::time::Duration::from_millis(10));
}
});
// Create app state with mirrored state, shared storage, command sender, and shutdown channel
let state = AppState::new(mirrored_state, bel_storage, command_tx, shutdown_tx.clone());
// Spawn idle timeout checker task
let idle_state = state.clone();
let idle_shutdown_tx = shutdown_tx.clone();
tokio::spawn(async move {
let idle_timeout = Duration::from_secs(3 * 60 * 60); // 3 hours
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
let last_request = idle_state.last_request_time.load(std::sync::atomic::Ordering::Relaxed);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
if now - last_request > idle_timeout.as_millis() as u64 {
eprintln!(
"Server idle for {} hours, shutting down",
idle_timeout.as_secs() / 3600
);
let _ = idle_shutdown_tx.send(());
break;
}
}
});
// Create router
let app = create_router(state);
// Bind to specified port
let addr = format!("127.0.0.1:{}", port);
let listener = tokio::net::TcpListener::bind(&addr)
.await
.unwrap_or_else(|_| panic!("Failed to bind to {}", addr));
println!("DataBuild server listening on http://{}", addr);
println!(" GET /health");
println!(" GET /api/wants");
println!(" POST /api/wants");
println!(" GET /api/wants/:id");
println!(" GET /api/partitions");
println!(" GET /api/job_runs");
// Subscribe to shutdown signal for graceful shutdown
let mut server_shutdown_rx = shutdown_tx.subscribe();
// Run the server with graceful shutdown
axum::serve(listener, app)
.with_graceful_shutdown(async move {
let _ = server_shutdown_rx.recv().await;
println!("HTTP server received shutdown signal");
})
.await
.expect("Server error");
// Wait for orchestrator to finish
let _ = orch_handle.join();
println!("Shutdown complete");
}
fn cmd_want(server_url: &str, partitions: Vec<String>) {
let client = Client::new();
// Convert partition strings to PartitionRef objects
let partition_refs: Vec<serde_json::Value> = partitions
.iter()
.map(|p| serde_json::json!({"ref": p}))
.collect();
// Get current timestamp (milliseconds since epoch)
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let request = serde_json::json!({
"partitions": partition_refs,
"data_timestamp": now,
"ttl_seconds": 3600, // 1 hour default
"sla_seconds": 300 // 5 minutes default
});
let url = format!("{}/api/wants", server_url);
match client.post(&url)
.json(&request)
.send()
{
Ok(response) => {
if response.status().is_success() {
println!("Want created successfully");
if let Ok(body) = response.text() {
println!("{}", body);
}
} else {
eprintln!("Failed to create want: {}", response.status());
if let Ok(body) = response.text() {
eprintln!("{}", body);
}
std::process::exit(1);
}
}
Err(e) => {
eprintln!("Failed to connect to server: {}", e);
eprintln!("Is the server running? Try: databuild serve");
std::process::exit(1);
}
}
}
fn cmd_wants_list(server_url: &str) {
let client = Client::new();
let url = format!("{}/api/wants", server_url);
match client.get(&url).send() {
Ok(response) => {
if response.status().is_success() {
match response.json::<serde_json::Value>() {
Ok(json) => {
println!("{}", serde_json::to_string_pretty(&json).unwrap());
}
Err(e) => {
eprintln!("Failed to parse response: {}", e);
std::process::exit(1);
}
}
} else {
eprintln!("Request failed: {}", response.status());
std::process::exit(1);
}
}
Err(e) => {
eprintln!("Failed to connect to server: {}", e);
eprintln!("Is the server running? Try: databuild serve");
std::process::exit(1);
}
}
}
fn cmd_partitions_list(server_url: &str) {
let client = Client::new();
let url = format!("{}/api/partitions", server_url);
match client.get(&url).send() {
Ok(response) => {
if response.status().is_success() {
match response.json::<serde_json::Value>() {
Ok(json) => {
println!("{}", serde_json::to_string_pretty(&json).unwrap());
}
Err(e) => {
eprintln!("Failed to parse response: {}", e);
std::process::exit(1);
}
}
} else {
eprintln!("Request failed: {}", response.status());
std::process::exit(1);
}
}
Err(e) => {
eprintln!("Failed to connect to server: {}", e);
eprintln!("Is the server running? Try: databuild serve");
std::process::exit(1);
}
}
}
fn cmd_job_runs_list(server_url: &str) {
let client = Client::new();
let url = format!("{}/api/job_runs", server_url);
match client.get(&url).send() {
Ok(response) => {
if response.status().is_success() {
match response.json::<serde_json::Value>() {
Ok(json) => {
println!("{}", serde_json::to_string_pretty(&json).unwrap());
}
Err(e) => {
eprintln!("Failed to parse response: {}", e);
std::process::exit(1);
}
}
} else {
eprintln!("Request failed: {}", response.status());
std::process::exit(1);
}
}
Err(e) => {
eprintln!("Failed to connect to server: {}", e);
eprintln!("Is the server running? Try: databuild serve");
std::process::exit(1);
}
}
}

19
databuild/commands.rs Normal file
View file

@ -0,0 +1,19 @@
use crate::util::DatabuildError;
use crate::{CancelWantRequest, CancelWantResponse, CreateWantRequest, CreateWantResponse};
use tokio::sync::oneshot;
/// Commands that can be sent to the orchestrator via the command channel.
/// Only write operations need commands; reads go directly to BEL storage.
pub enum Command {
/// Create a new want
CreateWant {
request: CreateWantRequest,
reply: oneshot::Sender<Result<CreateWantResponse, DatabuildError>>,
},
/// Cancel an existing want
CancelWant {
request: CancelWantRequest,
reply: oneshot::Sender<Result<CancelWantResponse, DatabuildError>>,
},
}

92
databuild/config.rs Normal file
View file

@ -0,0 +1,92 @@
use crate::JobConfig;
use crate::job::JobConfiguration;
use crate::util::DatabuildError;
use std::fs;
use std::path::Path;
/// Configuration file format for DataBuild application
#[derive(Debug, serde::Deserialize)]
pub struct DatabuildConfig {
/// List of job configurations
pub jobs: Vec<JobConfig>,
}
impl DatabuildConfig {
/// Load configuration from a file, auto-detecting format from extension
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self, DatabuildError> {
let path = path.as_ref();
let contents = fs::read_to_string(path)
.map_err(|e| DatabuildError::from(format!("Failed to read config file: {}", e)))?;
// Determine format from file extension
let extension = path.extension().and_then(|s| s.to_str()).unwrap_or("");
match extension {
"json" => Self::from_json(&contents),
"toml" => Self::from_toml(&contents),
_ => Err(DatabuildError::from(format!(
"Unknown config file extension: {}. Use .json or .toml",
extension
))),
}
}
/// Parse configuration from JSON string
pub fn from_json(s: &str) -> Result<Self, DatabuildError> {
serde_json::from_str(s)
.map_err(|e| DatabuildError::from(format!("Failed to parse JSON config: {}", e)))
}
/// Parse configuration from TOML string
pub fn from_toml(s: &str) -> Result<Self, DatabuildError> {
toml::from_str(s)
.map_err(|e| DatabuildError::from(format!("Failed to parse TOML config: {}", e)))
}
/// Convert to a list of JobConfiguration
pub fn into_job_configurations(self) -> Vec<JobConfiguration> {
self.jobs.into_iter().map(|jc| jc.into()).collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_json_config() {
let json = r#"
{
"jobs": [
{
"label": "//test:job_alpha",
"entrypoint": "/usr/bin/python3",
"environment": {"FOO": "bar"},
"partition_patterns": ["data/alpha/.*"]
}
]
}
"#;
let config = DatabuildConfig::from_json(json).unwrap();
assert_eq!(config.jobs.len(), 1);
assert_eq!(config.jobs[0].label, "//test:job_alpha");
}
#[test]
fn test_parse_toml_config() {
let toml = r#"
[[jobs]]
label = "//test:job_alpha"
entrypoint = "/usr/bin/python3"
partition_patterns = ["data/alpha/.*"]
[jobs.environment]
FOO = "bar"
"#;
let config = DatabuildConfig::from_toml(toml).unwrap();
assert_eq!(config.jobs.len(), 1);
assert_eq!(config.jobs[0].label, "//test:job_alpha");
}
}

369
databuild/http_server.rs Normal file
View file

@ -0,0 +1,369 @@
use crate::build_event_log::BELStorage;
use crate::build_state::BuildState;
use crate::commands::Command;
use crate::{
CancelWantRequest, CreateWantRequest, CreateWantResponse, GetWantRequest, GetWantResponse,
ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse,
ListWantsRequest, ListWantsResponse,
};
use axum::{
Json, Router,
extract::{Path, Query, Request, State},
http::{HeaderValue, Method, StatusCode},
middleware::{self, Next},
response::{IntoResponse, Response},
routing::{delete, get, post},
};
use std::sync::{
Arc, RwLock,
atomic::{AtomicU64, Ordering},
};
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::{broadcast, mpsc, oneshot};
use tower_http::cors::CorsLayer;
/// Shared application state for HTTP handlers
#[derive(Clone)]
pub struct AppState {
/// Mirrored build state (updated via event broadcast from orchestrator)
pub build_state: Arc<RwLock<BuildState>>,
/// Shared read-only access to BEL storage (for event log queries if needed)
pub bel_storage: Arc<dyn BELStorage>,
/// Command sender for write operations (sends to orchestrator)
pub command_tx: mpsc::Sender<Command>,
/// For idle timeout tracking (epoch millis)
pub last_request_time: Arc<AtomicU64>,
/// Broadcast channel for shutdown signal
pub shutdown_tx: broadcast::Sender<()>,
}
impl AppState {
pub fn new(
build_state: Arc<RwLock<BuildState>>,
bel_storage: Arc<dyn BELStorage>,
command_tx: mpsc::Sender<Command>,
shutdown_tx: broadcast::Sender<()>,
) -> Self {
// Initialize last_request_time to current time
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Self {
build_state,
bel_storage,
command_tx,
last_request_time: Arc::new(AtomicU64::new(now)),
shutdown_tx,
}
}
}
/// Middleware to update last request time
async fn update_last_request_time(
State(state): State<AppState>,
req: Request,
next: Next,
) -> Response {
state.last_request_time.store(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);
next.run(req).await
}
/// Create the Axum router with all endpoints
pub fn create_router(state: AppState) -> Router {
// Configure CORS for web app development
let cors = CorsLayer::new()
.allow_origin("http://localhost:3000".parse::<HeaderValue>().unwrap())
.allow_methods([Method::GET, Method::POST, Method::DELETE, Method::OPTIONS])
.allow_headers([
axum::http::header::CONTENT_TYPE,
axum::http::header::AUTHORIZATION,
]);
Router::new()
// Health check
.route("/health", get(health))
// Want endpoints
.route("/api/wants", get(list_wants))
.route("/api/wants", post(create_want))
.route("/api/wants/:id", get(get_want))
.route("/api/wants/:id", delete(cancel_want))
// Partition endpoints
.route("/api/partitions", get(list_partitions))
// Job run endpoints
.route("/api/job_runs", get(list_job_runs))
// Add CORS middleware
.layer(cors)
// Add middleware to track request time
.layer(middleware::from_fn_with_state(
state.clone(),
update_last_request_time,
))
.with_state(state)
}
// ============================================================================
// Error Handling
// ============================================================================
/// Standard error response structure
#[derive(serde::Serialize)]
struct ErrorResponse {
error: String,
#[serde(skip_serializing_if = "Option::is_none")]
details: Option<serde_json::Value>,
}
impl ErrorResponse {
fn new(error: impl Into<String>) -> Self {
Self {
error: error.into(),
details: None,
}
}
fn with_details(error: impl Into<String>, details: serde_json::Value) -> Self {
Self {
error: error.into(),
details: Some(details),
}
}
}
// ============================================================================
// Handlers
// ============================================================================
/// Health check endpoint
async fn health() -> impl IntoResponse {
(StatusCode::OK, "OK")
}
/// List all wants
async fn list_wants(
State(state): State<AppState>,
Query(params): Query<ListWantsRequest>,
) -> impl IntoResponse {
// Read from shared build state
let build_state = match state.build_state.read() {
Ok(state) => state,
Err(e) => {
tracing::error!("Failed to acquire read lock on build state: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse::new(
"Internal server error: state lock poisoned",
)),
)
.into_response();
}
};
let response = build_state.list_wants(&params);
(StatusCode::OK, Json(response)).into_response()
}
/// Get a specific want by ID
async fn get_want(State(state): State<AppState>, Path(want_id): Path<String>) -> impl IntoResponse {
// Read from shared build state
let build_state = match state.build_state.read() {
Ok(state) => state,
Err(e) => {
tracing::error!("Failed to acquire read lock on build state: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse::new(
"Internal server error: state lock poisoned",
)),
)
.into_response();
}
};
let response = build_state.get_want(&want_id);
match response {
Some(want) => (StatusCode::OK, Json(GetWantResponse { data: Some(want) })).into_response(),
None => {
tracing::debug!("Want not found: {}", want_id);
(
StatusCode::NOT_FOUND,
Json(ErrorResponse::with_details(
"Want not found",
serde_json::json!({"want_id": want_id}),
)),
)
.into_response()
}
}
}
/// Create a new want
async fn create_want(
State(state): State<AppState>,
Json(req): Json<CreateWantRequest>,
) -> impl IntoResponse {
// Create oneshot channel for reply
let (reply_tx, reply_rx) = oneshot::channel();
// Send command to orchestrator
let command = Command::CreateWant {
request: req,
reply: reply_tx,
};
if let Err(_) = state.command_tx.send(command).await {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": "Failed to send command to orchestrator"
})),
)
.into_response();
}
// Wait for orchestrator reply
match reply_rx.await {
Ok(Ok(response)) => {
tracing::info!(
"Created want: {}",
response
.data
.as_ref()
.map(|w| &w.want_id)
.unwrap_or(&"unknown".to_string())
);
(StatusCode::OK, Json(response)).into_response()
}
Ok(Err(e)) => {
tracing::error!("Failed to create want: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse::new(format!("Failed to create want: {}", e))),
)
.into_response()
}
Err(_) => {
tracing::error!("Orchestrator did not respond to create want command");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse::new("Orchestrator did not respond")),
)
.into_response()
}
}
}
/// Cancel a want
async fn cancel_want(
State(state): State<AppState>,
Path(want_id): Path<String>,
) -> impl IntoResponse {
// Create oneshot channel for reply
let (reply_tx, reply_rx) = oneshot::channel();
// Send command to orchestrator
let command = Command::CancelWant {
request: CancelWantRequest {
want_id,
source: None, // HTTP requests don't have a source
comment: None,
},
reply: reply_tx,
};
if let Err(_) = state.command_tx.send(command).await {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": "Failed to send command to orchestrator"
})),
)
.into_response();
}
// Wait for orchestrator reply
match reply_rx.await {
Ok(Ok(response)) => {
tracing::info!(
"Cancelled want: {}",
response
.data
.as_ref()
.map(|w| &w.want_id)
.unwrap_or(&"unknown".to_string())
);
(StatusCode::OK, Json(response)).into_response()
}
Ok(Err(e)) => {
tracing::error!("Failed to cancel want: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse::new(format!("Failed to cancel want: {}", e))),
)
.into_response()
}
Err(_) => {
tracing::error!("Orchestrator did not respond to cancel want command");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse::new("Orchestrator did not respond")),
)
.into_response()
}
}
}
/// List all partitions
async fn list_partitions(
State(state): State<AppState>,
Query(params): Query<ListPartitionsRequest>,
) -> impl IntoResponse {
// Read from shared build state
let build_state = match state.build_state.read() {
Ok(state) => state,
Err(e) => {
tracing::error!("Failed to acquire read lock on build state: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse::new(
"Internal server error: state lock poisoned",
)),
)
.into_response();
}
};
let response = build_state.list_partitions(&params);
(StatusCode::OK, Json(response)).into_response()
}
/// List all job runs
async fn list_job_runs(
State(state): State<AppState>,
Query(params): Query<ListJobRunsRequest>,
) -> impl IntoResponse {
// Read from shared build state
let build_state = match state.build_state.read() {
Ok(state) => state,
Err(e) => {
tracing::error!("Failed to acquire read lock on build state: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse::new(
"Internal server error: state lock poisoned",
)),
)
.into_response();
}
};
let response = build_state.list_job_runs(&params);
(StatusCode::OK, Json(response)).into_response()
}

View file

@ -2,12 +2,14 @@ use crate::job_run::{JobRunHandle, SubProcessBackend};
use crate::util::DatabuildError;
use crate::{JobConfig, PartitionRef, WantDetail};
use regex::Regex;
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct JobConfiguration {
pub label: String,
pub patterns: Vec<String>,
pub entry_point: String,
pub environment: HashMap<String, String>,
}
impl JobConfiguration {
@ -38,6 +40,7 @@ impl From<JobConfig> for JobConfiguration {
label: config.label,
patterns: config.partition_patterns,
entry_point: config.entrypoint,
environment: config.environment,
}
}
}

View file

@ -1,12 +1,15 @@
mod build_event_log;
mod build_state;
pub mod build_event_log;
pub mod build_state;
pub mod commands;
pub mod config;
mod data_deps;
mod event_transforms;
pub mod http_server;
mod job;
mod job_run;
mod job_run_state;
mod mock_job_run;
mod orchestrator;
pub mod orchestrator;
mod partition_state;
mod util;
mod want_state;

View file

@ -1,4 +1,5 @@
use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage};
use crate::commands::Command;
use crate::data_build_event::Event;
use crate::job::JobConfiguration;
use crate::job_run::SubProcessBackend;
@ -6,6 +7,8 @@ use crate::util::DatabuildError;
use crate::{JobRunBufferEventV1, PartitionRef, WantDetail};
use std::collections::HashMap;
use std::fmt::Debug;
use tokio::sync::{broadcast, mpsc};
use uuid::Uuid;
/**
Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads +
@ -17,10 +20,16 @@ JTBDs:
- Orchestrator polls queued and active job runs, keeping track of their state, and scheduling queued
jobs when possible
*/
struct Orchestrator<S: BELStorage + Debug> {
pub struct Orchestrator<S: BELStorage + Debug> {
pub bel: BuildEventLog<S>,
pub config: OrchestratorConfig,
pub job_runs: Vec<crate::job_run::JobRunHandle<SubProcessBackend>>,
/// Optional command receiver for write operations from HTTP server
pub command_rx: Option<mpsc::Receiver<Command>>,
/// Optional event broadcaster for state updates to HTTP server
pub event_tx: Option<broadcast::Sender<Event>>,
/// Environment variables for each job run (keyed by job_run_id)
pub job_environments: HashMap<Uuid, HashMap<String, String>>,
}
impl Default for Orchestrator<MemoryBELStorage> {
@ -29,6 +38,9 @@ impl Default for Orchestrator<MemoryBELStorage> {
bel: Default::default(),
config: Default::default(),
job_runs: Default::default(),
command_rx: None,
event_tx: None,
job_environments: HashMap::new(),
}
}
}
@ -39,6 +51,9 @@ impl Orchestrator<MemoryBELStorage> {
bel: self.bel.clone(),
config: self.config.clone(),
job_runs: Default::default(),
command_rx: None,
event_tx: None,
job_environments: HashMap::new(),
}
}
}
@ -61,8 +76,8 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
}
#[derive(Debug, Clone)]
struct OrchestratorConfig {
jobs: Vec<JobConfiguration>,
pub struct OrchestratorConfig {
pub jobs: Vec<JobConfiguration>,
}
impl Default for OrchestratorConfig {
@ -121,26 +136,68 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
bel: BuildEventLog::new(storage, Default::default()),
config,
job_runs: Vec::new(),
command_rx: None,
event_tx: None,
job_environments: HashMap::new(),
}
}
pub fn new_with_commands(
storage: S,
config: OrchestratorConfig,
command_rx: mpsc::Receiver<Command>,
) -> Self {
Self {
bel: BuildEventLog::new(storage, Default::default()),
config,
job_runs: Vec::new(),
command_rx: Some(command_rx),
event_tx: None,
job_environments: HashMap::new(),
}
}
pub fn new_with_channels(
storage: S,
config: OrchestratorConfig,
command_rx: mpsc::Receiver<Command>,
event_tx: broadcast::Sender<Event>,
) -> Self {
Self {
bel: BuildEventLog::new(storage, Default::default()).with_broadcaster(event_tx.clone()),
config,
job_runs: Vec::new(),
command_rx: Some(command_rx),
event_tx: Some(event_tx),
job_environments: HashMap::new(),
}
}
/// Append event to BEL (which handles state mutation, storage, broadcasting, and cascading events)
fn append_and_broadcast(&mut self, event: &Event) -> Result<u64, DatabuildError> {
self.bel.append_event(event)
}
fn schedule_queued_jobs(&mut self) -> Result<(), DatabuildError> {
use crate::JobRunHeartbeatEventV1;
use crate::data_build_event::Event;
use crate::job_run::JobRunHandle;
let mut new_jobs = Vec::new();
let mut events_to_append = Vec::new();
for job in self.job_runs.drain(..) {
let transitioned = match job {
JobRunHandle::NotStarted(not_started) => {
let job_run_id = not_started.job_run_id.clone();
let running = not_started.run(None)?;
// Look up environment for this job run
let env = self.job_environments.get(&job_run_id).cloned();
let running = not_started.run(env)?;
// Emit heartbeat event to notify BuildState that job is now running
// Collect heartbeat event to emit after drain completes
let heartbeat_event = Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 {
job_run_id: job_run_id.to_string(),
});
self.bel.append_event(&heartbeat_event)?;
events_to_append.push(heartbeat_event);
JobRunHandle::Running(running)
}
@ -149,12 +206,19 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
new_jobs.push(transitioned);
}
self.job_runs = new_jobs;
// Now append all collected events
for event in events_to_append {
self.append_and_broadcast(&event)?;
}
Ok(())
}
/// Visits individual job runs, appending resulting events, and moving runs between run status
/// containers. Either jobs are still running, or they are moved to terminal states.
fn poll_job_runs(&mut self) -> Result<(), DatabuildError> {
use crate::data_build_event::Event;
use crate::job_run::{JobRunHandle, VisitResult};
self.schedule_queued_jobs()?;
@ -165,30 +229,41 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
// Need to decide on heartbeat frequency (e.g., every N polls or based on time elapsed)
// Visit all running jobs using type-safe transitions
// Collect events first to avoid borrow checker issues
let mut new_jobs = Vec::new();
let mut events_to_append = Vec::new();
for job in self.job_runs.drain(..) {
let transitioned = match job {
JobRunHandle::Running(running) => match running.visit()? {
VisitResult::StillRunning(still_running) => {
println!("Still running job: {:?}", still_running.job_run_id);
tracing::debug!(
job_run_id = still_running.job_run_id.to_string(),
"Job still running"
);
JobRunHandle::Running(still_running)
}
VisitResult::Completed(completed) => {
println!("Completed job: {:?}", completed.job_run_id);
tracing::debug!(
job_run_id = completed.job_run_id.to_string(),
"Completed job"
);
let event = completed.state.to_event(&completed.job_run_id);
self.bel.append_event(&event)?;
events_to_append.push(event);
JobRunHandle::Completed(completed)
}
VisitResult::Failed(failed) => {
println!("Failed job: {:?}", failed.job_run_id);
tracing::debug!(job_run_id = failed.job_run_id.to_string(), "Failed job");
let event = failed.state.to_event(&failed.job_run_id);
self.bel.append_event(&event)?;
events_to_append.push(event);
JobRunHandle::Failed(failed)
}
VisitResult::DepMiss(dep_miss) => {
println!("Dep miss job: {:?}", dep_miss.job_run_id);
tracing::debug!(
job_run_id = dep_miss.job_run_id.to_string(),
"Job dep miss"
);
let event = dep_miss.state.to_event(&dep_miss.job_run_id);
self.bel.append_event(&event)?;
events_to_append.push(event);
JobRunHandle::DepMiss(dep_miss)
}
},
@ -198,6 +273,11 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
}
self.job_runs = new_jobs;
// Now append all collected events
for event in events_to_append {
self.append_and_broadcast(&event)?;
}
Ok(())
}
@ -254,9 +334,16 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
let job_run = JobRunHandle::spawn(wg.job.entry_point.clone(), args);
// Store environment for this job run
let job_run_id = job_run.job_run_id().clone();
if !wg.job.environment.is_empty() {
self.job_environments
.insert(job_run_id, wg.job.environment.clone());
}
// Create job run buffer event
let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 {
job_run_id: job_run.job_run_id().to_string(),
job_run_id: job_run_id.to_string(),
job_label: wg.job.label,
building_partitions: wg
.wants
@ -266,13 +353,55 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
.collect(),
want_attributed_partitions: wg.wants.iter().map(|w| w.into()).collect(),
});
self.bel.append_event(&job_buffer_event)?;
self.append_and_broadcast(&job_buffer_event)?;
self.job_runs.push(job_run);
Ok(())
}
fn step(&mut self) -> Result<(), DatabuildError> {
/// Process all pending commands from the HTTP server
fn process_commands(&mut self) -> Result<(), DatabuildError> {
// TODO Handle command failure gracefully - what's story there?
// Collect all pending commands first (to avoid borrow checker issues)
let mut commands = Vec::new();
if let Some(ref mut rx) = self.command_rx {
while let Ok(cmd) = rx.try_recv() {
commands.push(cmd);
}
}
// Now process all collected commands
for cmd in commands {
match cmd {
Command::CreateWant { request, reply } => {
// Convert request to event and broadcast it (so HTTP server receives it)
let event: crate::WantCreateEventV1 = request.into();
let result = self
.append_and_broadcast(&crate::data_build_event::Event::WantCreateV1(
event.clone(),
))
.map(|_| self.bel.state.get_want(&event.want_id).into());
let _ = reply.send(result); // Ignore send errors
}
Command::CancelWant { request, reply } => {
// Convert request to event and broadcast it (so HTTP server receives it)
let event: crate::WantCancelEventV1 = request.into();
let result = self
.append_and_broadcast(&crate::data_build_event::Event::WantCancelV1(
event.clone(),
))
.map(|_| self.bel.state.get_want(&event.want_id).into());
let _ = reply.send(result); // Ignore send errors
}
}
}
Ok(())
}
pub fn step(&mut self) -> Result<(), DatabuildError> {
// Process commands first (write operations)
self.process_commands()?;
// Then poll job runs and wants
self.poll_job_runs()?;
self.poll_wants()?;
Ok(())
@ -370,11 +499,13 @@ mod tests {
label: "alpha".to_string(),
patterns: vec!["data/alpha".to_string()],
entry_point: MockJobRun::bin_path(),
environment: Default::default(),
},
JobConfiguration {
label: "beta".to_string(),
patterns: vec!["data/beta".to_string()],
entry_point: MockJobRun::bin_path(),
environment: Default::default(),
},
],
};
@ -737,11 +868,13 @@ echo 'Beta succeeded'
label: "alpha".to_string(),
patterns: vec!["data/alpha".to_string()],
entry_point: alpha_script.to_string(),
environment: Default::default(),
},
JobConfiguration {
label: "beta".to_string(),
patterns: vec!["data/beta".to_string()],
entry_point: beta_script.to_string(),
environment: Default::default(),
},
],
};
@ -919,6 +1052,7 @@ echo 'Beta succeeded'
label: label.to_string(),
patterns: vec![pattern.to_string()],
entry_point: "test_entrypoint".to_string(),
environment: Default::default(),
}
}

577
docs/plans/api.md Normal file
View file

@ -0,0 +1,577 @@
# Web Server Implementation Plan
## Architecture Summary
**Concurrency Model: Event Log Separation**
- Orchestrator runs synchronously in dedicated thread, owns BEL exclusively
- Web server reads from shared BEL storage, sends write commands via channel
- No locks on hot path, orchestrator stays single-threaded
- Eventual consistency for reads (acceptable since builds take time anyway)
**Daemon Model:**
- Server binary started manually from workspace root (for now)
- Server tracks last request time, shuts down after idle timeout (default: 3 hours)
- HTTP REST on localhost random port
- Future: CLI can auto-discover/start server
---
## Thread Model
```
Main Process
├─ HTTP Server (tokio multi-threaded runtime)
│ ├─ Request handlers (async, read from BEL storage)
│ └─ Command sender (send writes to orchestrator)
└─ Orchestrator Thread (std::thread, synchronous)
├─ Receives commands via mpsc channel
├─ Owns BEL (exclusive mutable access)
└─ Runs existing step() loop
```
**Read Path (Low Latency):**
1. HTTP request → Axum handler
2. Read events from shared BEL storage (no lock contention)
3. Reconstruct BuildState from events (can cache this)
4. Return response
**Write Path (Strong Consistency):**
1. HTTP request → Axum handler
2. Send command via channel to orchestrator
3. Orchestrator processes command in its thread
4. Reply sent back via oneshot channel
5. Return response
**Why This Works:**
- Orchestrator remains completely synchronous (no refactoring needed)
- Reads scale horizontally (multiple handlers, no locks)
- Writes are serialized through orchestrator (consistent with current model)
- Event sourcing means reads can be eventually consistent
---
## Phase 1: Foundation - Make BEL Storage Thread-Safe
**Goal:** Allow BEL storage to be safely shared between orchestrator and web server
**Tasks:**
1. Add `Send + Sync` bounds to `BELStorage` trait
2. Wrap `SqliteBELStorage::connection` in `Arc<Mutex<Connection>>` or use r2d2 pool
3. Add read-only methods to BELStorage:
- `list_events(offset: usize, limit: usize) -> Vec<DataBuildEvent>`
- `get_event(event_id: u64) -> Option<DataBuildEvent>`
- `latest_event_id() -> u64`
4. Add builder method to reconstruct BuildState from events:
- `BuildState::from_events(events: &[DataBuildEvent]) -> Self`
**Files Modified:**
- `databuild/build_event_log.rs` - update trait and storage impls
- `databuild/build_state.rs` - add `from_events()` builder
**Acceptance Criteria:**
- `BELStorage` trait has `Send + Sync` bounds
- Can clone `Arc<SqliteBELStorage>` and use from multiple threads
- Can reconstruct BuildState from events without mutating storage
---
## Phase 2: Web Server - HTTP API with Axum
**Goal:** HTTP server serving read/write APIs
**Tasks:**
1. Add dependencies to MODULE.bazel:
```python
crate.spec(package = "tokio", features = ["full"], version = "1.0")
crate.spec(package = "axum", version = "0.7")
crate.spec(package = "tower", version = "0.4")
crate.spec(package = "tower-http", features = ["trace", "cors"], version = "0.5")
```
2. Create `databuild/http_server.rs` module with:
- `AppState` struct holding:
- `bel_storage: Arc<dyn BELStorage>` - shared read access
- `command_tx: mpsc::Sender<Command>` - channel to orchestrator
- `last_request_time: Arc<AtomicU64>` - for idle tracking
- Axum router with all endpoints
- Handler functions delegating to existing `api_handle_*` methods
3. API Endpoints:
```
GET /health → health check
GET /api/wants → list_wants
POST /api/wants → create_want
GET /api/wants/:id → get_want
DELETE /api/wants/:id → cancel_want
GET /api/partitions → list_partitions
GET /api/job_runs → list_job_runs
GET /api/job_runs/:id/logs/stdout → stream_logs (stub)
```
4. Handler pattern (reads):
```rust
async fn list_wants(
State(state): State<AppState>,
Query(params): Query<ListWantsParams>,
) -> Json<ListWantsResponse> {
// Read events from storage
let events = state.bel_storage.list_events(0, 10000)?;
// Reconstruct state
let build_state = BuildState::from_events(&events);
// Use existing API method
Json(build_state.list_wants(&params.into()))
}
```
5. Handler pattern (writes):
```rust
async fn create_want(
State(state): State<AppState>,
Json(req): Json<CreateWantRequest>,
) -> Json<CreateWantResponse> {
// Send command to orchestrator
let (reply_tx, reply_rx) = oneshot::channel();
state.command_tx.send(Command::CreateWant(req, reply_tx)).await?;
// Wait for orchestrator reply
let response = reply_rx.await?;
Json(response)
}
```
**Files Created:**
- `databuild/http_server.rs` - new module
**Files Modified:**
- `databuild/lib.rs` - add `pub mod http_server;`
- `MODULE.bazel` - add dependencies
**Acceptance Criteria:**
- Server starts on localhost random port, prints "Listening on http://127.0.0.1:XXXXX"
- All read endpoints return correct JSON responses
- Write endpoints return stub responses (Phase 4 will connect to orchestrator)
---
## Phase 3: CLI - HTTP Client
**Goal:** CLI that sends HTTP requests to running server
**Tasks:**
1. Add dependencies to MODULE.bazel:
```python
crate.spec(package = "clap", features = ["derive"], version = "4.0")
crate.spec(package = "reqwest", features = ["blocking", "json"], version = "0.11")
```
2. Create `databuild/bin/databuild.rs` main binary:
```rust
#[derive(Parser)]
#[command(name = "databuild")]
enum Cli {
/// Start the databuild server
Serve(ServeArgs),
/// Create a want for partitions
Build(BuildArgs),
/// Want operations
Want(WantCommand),
/// Stream job run logs
Logs(LogsArgs),
}
#[derive(Args)]
struct ServeArgs {
#[arg(long, default_value = "8080")]
port: u16,
}
#[derive(Subcommand)]
enum WantCommand {
Create(CreateWantArgs),
List,
Get { want_id: String },
Cancel { want_id: String },
}
```
3. Server address discovery:
- For now: hardcode `http://localhost:8080` or accept `--server-url` flag
- Future: read from `.databuild/server.json` file
4. HTTP client implementation:
```rust
fn list_wants(server_url: &str) -> Result<Vec<WantDetail>> {
let client = reqwest::blocking::Client::new();
let resp = client.get(&format!("{}/api/wants", server_url))
.send()?
.json::<ListWantsResponse>()?;
Ok(resp.data)
}
```
5. Commands:
- `databuild serve --port 8080` - Start server (blocks)
- `databuild build part1 part2` - Create want for partitions
- `databuild want list` - List all wants
- `databuild want get <id>` - Get specific want
- `databuild want cancel <id>` - Cancel want
- `databuild logs <job_run_id>` - Stream logs (stub)
**Files Created:**
- `databuild/bin/databuild.rs` - new CLI binary
**Files Modified:**
- `databuild/BUILD.bazel` - add `rust_binary` target for databuild CLI
**Acceptance Criteria:**
- Can run `databuild serve` to start server
- Can run `databuild want list` in another terminal and see wants
- Commands print pretty JSON or formatted tables
---
## Phase 4: Orchestrator Integration - Command Channel
**Goal:** Connect orchestrator to web server via message passing
**Tasks:**
1. Create `databuild/commands.rs` with command enum:
```rust
pub enum Command {
CreateWant(CreateWantRequest, oneshot::Sender<CreateWantResponse>),
CancelWant(CancelWantRequest, oneshot::Sender<CancelWantResponse>),
// Only write operations need commands
}
```
2. Update `Orchestrator`:
- Add `command_rx: mpsc::Receiver<Command>` field
- In `step()` method, before polling:
```rust
// Process all pending commands
while let Ok(cmd) = self.command_rx.try_recv() {
match cmd {
Command::CreateWant(req, reply) => {
let resp = self.bel.api_handle_want_create(req);
let _ = reply.send(resp); // Ignore send errors
}
// ... other commands
}
}
```
3. Create server startup function in `http_server.rs`:
```rust
pub fn start_server(
bel_storage: Arc<dyn BELStorage>,
port: u16,
) -> (JoinHandle<()>, mpsc::Sender<Command>) {
let (cmd_tx, cmd_rx) = mpsc::channel(100);
// Spawn orchestrator in background thread
let orch_bel = bel_storage.clone();
let orch_handle = std::thread::spawn(move || {
let mut orch = Orchestrator::new_with_commands(orch_bel, cmd_rx);
orch.join().unwrap();
});
// Start HTTP server in tokio runtime
let runtime = tokio::runtime::Runtime::new().unwrap();
let http_handle = runtime.spawn(async move {
let app_state = AppState {
bel_storage,
command_tx: cmd_tx.clone(),
last_request_time: Arc::new(AtomicU64::new(0)),
};
let app = create_router(app_state);
let addr = SocketAddr::from(([127, 0, 0, 1], port));
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
});
(http_handle, cmd_tx)
}
```
4. Update `databuild serve` command to use `start_server()`
**Files Created:**
- `databuild/commands.rs` - new module
**Files Modified:**
- `databuild/orchestrator.rs` - accept command channel, process in `step()`
- `databuild/http_server.rs` - send commands for writes
- `databuild/bin/databuild.rs` - use `start_server()` in `serve` command
**Acceptance Criteria:**
- Creating a want via HTTP actually creates it in BuildState
- Orchestrator processes commands without blocking its main loop
- Can observe wants being scheduled into job runs
---
## Phase 5: Daemon Lifecycle - Auto-Shutdown
**Goal:** Server shuts down gracefully after idle timeout
**Tasks:**
1. Update AppState to track last request time:
```rust
pub struct AppState {
bel_storage: Arc<dyn BELStorage>,
command_tx: mpsc::Sender<Command>,
last_request_time: Arc<AtomicU64>, // epoch millis
shutdown_tx: broadcast::Sender<()>,
}
```
2. Add Tower middleware to update timestamp:
```rust
async fn update_last_request_time<B>(
State(state): State<AppState>,
req: Request<B>,
next: Next<B>,
) -> Response {
state.last_request_time.store(
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
Ordering::Relaxed,
);
next.run(req).await
}
```
3. Background idle checker task:
```rust
tokio::spawn(async move {
let idle_timeout = Duration::from_secs(3 * 60 * 60); // 3 hours
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
let last_request = state.last_request_time.load(Ordering::Relaxed);
let now = SystemTime::now()...;
if now - last_request > idle_timeout.as_millis() as u64 {
eprintln!("Server idle for {} hours, shutting down",
idle_timeout.as_secs() / 3600);
shutdown_tx.send(()).unwrap();
break;
}
}
});
```
4. Graceful shutdown handling:
```rust
let app = create_router(state);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.with_graceful_shutdown(async {
shutdown_rx.recv().await.ok();
})
.await?;
```
5. Cleanup on shutdown:
- Orchestrator: finish current step, don't start new one
- HTTP server: stop accepting new connections, finish in-flight requests
- Log: "Shutdown complete"
**Files Modified:**
- `databuild/http_server.rs` - add idle tracking, shutdown logic
- `databuild/orchestrator.rs` - accept shutdown signal, check before each step
**Acceptance Criteria:**
- Server shuts down after configured idle timeout
- In-flight requests complete successfully during shutdown
- Shutdown is logged clearly
---
## Phase 6: Testing & Polish
**Goal:** End-to-end testing and production readiness
**Tasks:**
1. Integration tests:
```rust
#[test]
fn test_server_lifecycle() {
// Start server
let (handle, port) = start_test_server();
// Make requests
let wants = reqwest::blocking::get(
&format!("http://localhost:{}/api/wants", port)
).unwrap().json::<ListWantsResponse>().unwrap();
// Stop server
handle.shutdown();
}
```
2. Error handling improvements:
- Proper HTTP status codes (400, 404, 500)
- Structured error responses:
```json
{"error": "Want not found", "want_id": "abc123"}
```
- Add `tracing` crate for structured logging
3. Add CORS middleware for web app:
```rust
let cors = CorsLayer::new()
.allow_origin("http://localhost:3000".parse::<HeaderValue>().unwrap())
.allow_methods([Method::GET, Method::POST, Method::DELETE]);
app.layer(cors)
```
4. Health check endpoint:
```rust
async fn health() -> &'static str {
"OK"
}
```
5. Optional: Metrics endpoint (prometheus format):
```rust
async fn metrics() -> String {
format!(
"# HELP databuild_wants_total Total number of wants\n\
databuild_wants_total {}\n\
# HELP databuild_job_runs_total Total number of job runs\n\
databuild_job_runs_total {}\n",
want_count, job_run_count
)
}
```
**Files Created:**
- `databuild/tests/http_integration_test.rs` - integration tests
**Files Modified:**
- `databuild/http_server.rs` - add CORS, health, metrics, better errors
- `MODULE.bazel` - add `tracing` dependency
**Acceptance Criteria:**
- All endpoints have proper error handling
- CORS works for web app development
- Health check returns 200 OK
- Integration tests pass
---
## Future Enhancements (Not in Initial Plan)
### Workspace Auto-Discovery
- Walk up directory tree looking for `.databuild/` marker
- Store server metadata in `.databuild/server.json`:
```json
{
"pid": 12345,
"port": 54321,
"started_at": "2025-01-22T10:30:00Z",
"workspace_root": "/Users/stuart/Projects/databuild"
}
```
- CLI auto-starts server if not running
### Log Streaming (SSE)
- Implement `GET /api/job_runs/:id/logs/stdout?follow=true`
- Use Server-Sent Events for streaming
- Integrate with FileLogStore from logging.md plan
### State Caching
- Cache reconstructed BuildState for faster reads
- Invalidate cache when new events arrive
- Use `tokio::sync::RwLock<Option<(u64, BuildState)>>` where u64 is latest_event_id
### gRPC Support (If Needed)
- Add Tonic alongside Axum
- Share same orchestrator/command channel
- Useful for language-agnostic clients
---
## Dependencies Summary
New dependencies to add to `MODULE.bazel`:
```python
# Async runtime
crate.spec(package = "tokio", features = ["full"], version = "1.0")
# Web framework
crate.spec(package = "axum", version = "0.7")
crate.spec(package = "tower", version = "0.4")
crate.spec(package = "tower-http", features = ["trace", "cors"], version = "0.5")
# CLI
crate.spec(package = "clap", features = ["derive"], version = "4.0")
# HTTP client for CLI
crate.spec(package = "reqwest", features = ["blocking", "json"], version = "0.11")
# Logging
crate.spec(package = "tracing", version = "0.1")
crate.spec(package = "tracing-subscriber", version = "0.3")
```
---
## Estimated Timeline
- **Phase 1:** 2-3 hours (thread-safe BEL storage)
- **Phase 2:** 4-6 hours (HTTP server with Axum)
- **Phase 3:** 3-4 hours (basic CLI)
- **Phase 4:** 3-4 hours (orchestrator integration)
- **Phase 5:** 2-3 hours (idle shutdown)
- **Phase 6:** 4-6 hours (testing and polish)
**Total:** ~18-26 hours for complete implementation
---
## Design Rationale
### Why Event Log Separation?
**Alternatives Considered:**
1. **Shared State with RwLock**: Orchestrator holds write lock during `step()`, blocking all reads
2. **Actor Model**: Extra overhead from message passing for all operations
**Why Event Log Separation Wins:**
- Orchestrator stays completely synchronous (no refactoring)
- Reads don't block writes (eventual consistency acceptable for build system)
- Natural fit with event sourcing architecture
- Can cache reconstructed state for even better read performance
### Why Not gRPC?
- User requirement: "JSON is a must"
- REST is more debuggable (curl, browser dev tools)
- gRPC adds complexity without clear benefit
- Can add gRPC later if needed (both can coexist)
### Why Axum Over Actix?
- Better compile-time type safety (extractors)
- Cleaner middleware composition (Tower)
- Native async/await (Actix uses actor model internally)
- More ergonomic for this use case
### Why Per-Workspace Server?
- Isolation: builds in different projects don't interfere
- Simpler: no need to route requests by workspace
- Matches Bazel's model (users already understand it)
- Easier to reason about resource usage

View file

@ -0,0 +1,85 @@
# Multi-Hop Dependency Example
This example demonstrates DataBuild's ability to handle multi-hop dependencies between jobs.
## Overview
The example consists of two jobs:
- **job_alpha**: Produces the `data/alpha` partition
- **job_beta**: Depends on `data/alpha` and produces `data/beta`
When you request `data/beta`:
1. Beta job runs and detects missing `data/alpha` dependency
2. Orchestrator creates a want for `data/alpha`
3. Alpha job runs and produces `data/alpha`
4. Beta job runs again and succeeds, producing `data/beta`
## Running the Example
From the repository root:
```bash
# Build the CLI
bazel build //databuild:databuild_cli
# Clean up any previous state
rm -f /tmp/databuild_multihop*.db /tmp/databuild_multihop_alpha_complete
# Start the server with the multihop configuration
./bazel-bin/databuild/databuild_cli serve \
--port 3050 \
--database /tmp/databuild_multihop.db \
--config examples/multihop/config.json
```
In another terminal, create a want for `data/beta`:
```bash
# Create a want for data/beta (which will trigger the dependency chain)
./bazel-bin/databuild/databuild_cli --server http://localhost:3050 \
want data/beta
# Watch the wants
./bazel-bin/databuild/databuild_cli --server http://localhost:3050 \
wants list
# Watch the job runs
./bazel-bin/databuild/databuild_cli --server http://localhost:3050 \
job-runs list
# Watch the partitions
./bazel-bin/databuild/databuild_cli --server http://localhost:3050 \
partitions list
```
## Expected Behavior
1. Initial want for `data/beta` is created
2. Beta job runs, detects missing `data/alpha`, reports dependency miss
3. Orchestrator creates derivative want for `data/alpha`
4. Alpha job runs and succeeds
5. Beta job runs again and succeeds
6. Both partitions are now in `Live` state
## Configuration Format
The example uses JSON format (`config.json`), but TOML is also supported. Here's the equivalent TOML:
```toml
[[jobs]]
label = "//examples/multihop:job_alpha"
entrypoint = "./examples/multihop/job_alpha.sh"
partition_patterns = ["data/alpha"]
[jobs.environment]
JOB_NAME = "alpha"
[[jobs]]
label = "//examples/multihop:job_beta"
entrypoint = "./examples/multihop/job_beta.sh"
partition_patterns = ["data/beta"]
[jobs.environment]
JOB_NAME = "beta"
```

View file

@ -0,0 +1,20 @@
{
"jobs": [
{
"label": "//examples/multihop:job_alpha",
"entrypoint": "./examples/multihop/job_alpha.sh",
"environment": {
"JOB_NAME": "alpha"
},
"partition_patterns": ["data/alpha"]
},
{
"label": "//examples/multihop:job_beta",
"entrypoint": "./examples/multihop/job_beta.sh",
"environment": {
"JOB_NAME": "beta"
},
"partition_patterns": ["data/beta"]
}
]
}

18
examples/multihop/job_alpha.sh Executable file
View file

@ -0,0 +1,18 @@
#!/bin/bash
# Job Alpha: Produces data/alpha partition
# This is a simple upstream job that beta depends on
echo "Job Alpha starting..." >&2
echo " Building partitions: $@" >&2
echo " Environment: JOB_NAME=$JOB_NAME" >&2
# Simulate some work
sleep 0.5
# Create marker file to indicate data/alpha is available
touch /tmp/databuild_multihop_alpha_complete
# Output success - no special output needed, just exit 0
echo "Job Alpha complete!" >&2

27
examples/multihop/job_beta.sh Executable file
View file

@ -0,0 +1,27 @@
#!/bin/bash
# Job Beta: Produces data/beta partition
# Depends on data/alpha from job_alpha
echo "Job Beta starting..." >&2
echo " Building partitions: $@" >&2
echo " Environment: JOB_NAME=$JOB_NAME" >&2
# Check if data/alpha marker exists (this would be real data in a real system)
ALPHA_MARKER="/tmp/databuild_multihop_alpha_complete"
if [ ! -f "$ALPHA_MARKER" ]; then
echo " Missing dependency: data/alpha" >&2
# Report missing dependency
echo 'DATABUILD_MISSING_DEPS_JSON:{"version":"1","missing_deps":[{"impacted":[{"ref":"data/beta"}],"missing":[{"ref":"data/alpha"}]}]}'
exit 1
fi
echo " Found dependency: data/alpha" >&2
# Simulate some work
sleep 0.5
# Output success - no special output needed, just exit 0
echo "Job Beta complete!" >&2

View file

@ -1,51 +1,51 @@
# Test utilities filegroup
filegroup(
name = "test_utils",
srcs = [
"lib/test_utils.sh",
"lib/db_utils.sh",
"lib/service_utils.sh",
],
visibility = ["//visibility:public"],
)
# Delegation test that verifies second builds properly delegate to existing partitions
# Note: This test is designed to be run via the integration test runner (run_e2e_tests.sh)
# rather than through bazel test, due to examples being in .bazelignore
sh_test(
name = "delegation_test",
srcs = ["delegation_test.sh"],
data = [
":test_utils",
],
size = "medium",
timeout = "moderate",
env = {
"PATH": "/usr/bin:/bin:/usr/local/bin",
},
tags = ["e2e", "delegation", "manual"],
args = ["placeholder"],
)
# Simple shell script test that validates the test runner
sh_test(
name = "e2e_runner_test",
srcs = ["validate_runner.sh"],
data = [
"//:run_e2e_tests.sh",
":test_utils",
"lib/test_utils.sh",
"lib/db_utils.sh",
"lib/service_utils.sh",
"simple_test.sh",
"basic_graph_test.sh",
"podcast_reviews_test.sh",
"delegation_test.sh",
],
size = "small",
timeout = "short",
env = {
"PATH": "/usr/bin:/bin:/usr/local/bin",
},
tags = ["e2e"],
)
## Test utilities filegroup
#filegroup(
# name = "test_utils",
# srcs = [
# "lib/test_utils.sh",
# "lib/db_utils.sh",
# "lib/service_utils.sh",
# ],
# visibility = ["//visibility:public"],
#)
#
## Delegation test that verifies second builds properly delegate to existing partitions
## Note: This test is designed to be run via the integration test runner (run_e2e_tests.sh)
## rather than through bazel test, due to examples being in .bazelignore
#sh_test(
# name = "delegation_test",
# srcs = ["delegation_test.sh"],
# data = [
# ":test_utils",
# ],
# size = "medium",
# timeout = "moderate",
# env = {
# "PATH": "/usr/bin:/bin:/usr/local/bin",
# },
# tags = ["e2e", "delegation", "manual"],
# args = ["placeholder"],
#)
#
## Simple shell script test that validates the test runner
#sh_test(
# name = "e2e_runner_test",
# srcs = ["validate_runner.sh"],
# data = [
# "//:run_e2e_tests.sh",
# ":test_utils",
# "lib/test_utils.sh",
# "lib/db_utils.sh",
# "lib/service_utils.sh",
# "simple_test.sh",
# "basic_graph_test.sh",
# "podcast_reviews_test.sh",
# "delegation_test.sh",
# ],
# size = "small",
# timeout = "short",
# env = {
# "PATH": "/usr/bin:/bin:/usr/local/bin",
# },
# tags = ["e2e"],
#)