Compare commits
11 commits
2084fadbb6
...
b978be53f5
| Author | SHA1 | Date | |
|---|---|---|---|
| b978be53f5 | |||
| 895e499cc5 | |||
| f71be8482f | |||
| 7134b5e480 | |||
| 32f35ecbd5 | |||
| a5a1be8855 | |||
| 556ccb8a4b | |||
| f14d93da7a | |||
| be2b15de5e | |||
| da23af3227 | |||
| 6f7c6b3318 |
19 changed files with 5073 additions and 284 deletions
30
BUILD.bazel
30
BUILD.bazel
|
|
@ -1,24 +1,18 @@
|
|||
# Python Deps
|
||||
load("@rules_python//python:pip.bzl", "compile_pip_requirements")
|
||||
|
||||
filegroup(
|
||||
name = "jq",
|
||||
srcs = ["//databuild/runtime:jq"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
|
||||
# Export the E2E test runner script
|
||||
exports_files(["run_e2e_tests.sh"])
|
||||
|
||||
# End-to-End Test Runner
|
||||
sh_binary(
|
||||
name = "run_e2e_tests",
|
||||
srcs = ["run_e2e_tests.sh"],
|
||||
data = [
|
||||
"//tests/end_to_end:test_utils",
|
||||
],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
## Export the E2E test runner script
|
||||
#exports_files(["run_e2e_tests.sh"])
|
||||
#
|
||||
## End-to-End Test Runner
|
||||
#sh_binary(
|
||||
# name = "run_e2e_tests",
|
||||
# srcs = ["run_e2e_tests.sh"],
|
||||
# data = [
|
||||
# "//tests/end_to_end:test_utils",
|
||||
# ],
|
||||
# visibility = ["//visibility:public"],
|
||||
#)
|
||||
|
||||
# `bazel run //:requirements.update` will regenerate the requirements_txt file
|
||||
compile_pip_requirements(
|
||||
|
|
|
|||
40
MODULE.bazel
40
MODULE.bazel
|
|
@ -65,6 +65,46 @@ crate.spec(
|
|||
package = "regex",
|
||||
version = "1.10",
|
||||
)
|
||||
crate.spec(
|
||||
features = ["full"],
|
||||
package = "tokio",
|
||||
version = "1.0",
|
||||
)
|
||||
crate.spec(
|
||||
package = "axum",
|
||||
version = "0.7",
|
||||
)
|
||||
crate.spec(
|
||||
package = "tower",
|
||||
version = "0.4",
|
||||
)
|
||||
crate.spec(
|
||||
features = ["trace", "cors"],
|
||||
package = "tower-http",
|
||||
version = "0.5",
|
||||
)
|
||||
crate.spec(
|
||||
package = "tracing",
|
||||
version = "0.1",
|
||||
)
|
||||
crate.spec(
|
||||
package = "tracing-subscriber",
|
||||
version = "0.3",
|
||||
)
|
||||
crate.spec(
|
||||
features = ["derive"],
|
||||
package = "clap",
|
||||
version = "4.0",
|
||||
)
|
||||
crate.spec(
|
||||
features = ["blocking", "json"],
|
||||
package = "reqwest",
|
||||
version = "0.11",
|
||||
)
|
||||
crate.spec(
|
||||
package = "toml",
|
||||
version = "0.8",
|
||||
)
|
||||
crate.from_specs()
|
||||
use_repo(crate, "crates")
|
||||
|
||||
|
|
|
|||
2676
MODULE.bazel.lock
2676
MODULE.bazel.lock
File diff suppressed because one or more lines are too long
|
|
@ -18,13 +18,14 @@ rust_binary(
|
|||
|
||||
# DataBuild library using generated prost code
|
||||
rust_library(
|
||||
name = "databuild",
|
||||
name = "lib",
|
||||
srcs = glob(["**/*.rs"]) + [
|
||||
":generate_databuild_rust",
|
||||
],
|
||||
edition = "2021",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"@crates//:axum",
|
||||
"@crates//:prost",
|
||||
"@crates//:prost-types",
|
||||
"@crates//:regex",
|
||||
|
|
@ -32,17 +33,40 @@ rust_library(
|
|||
"@crates//:schemars",
|
||||
"@crates//:serde",
|
||||
"@crates//:serde_json",
|
||||
"@crates//:tokio",
|
||||
"@crates//:toml",
|
||||
"@crates//:tower",
|
||||
"@crates//:tower-http",
|
||||
"@crates//:tracing",
|
||||
"@crates//:uuid",
|
||||
],
|
||||
)
|
||||
|
||||
rust_test(
|
||||
name = "databuild_test",
|
||||
crate = ":databuild",
|
||||
crate = ":lib",
|
||||
data = ["//databuild/test:test_job_helper"],
|
||||
env = {"RUST_BACKTRACE": "1"},
|
||||
)
|
||||
|
||||
# DataBuild CLI binary
|
||||
rust_binary(
|
||||
name = "databuild",
|
||||
srcs = ["cli_main.rs"],
|
||||
edition = "2021",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
":lib",
|
||||
"@crates//:axum",
|
||||
"@crates//:clap",
|
||||
"@crates//:reqwest",
|
||||
"@crates//:serde_json",
|
||||
"@crates//:tokio",
|
||||
"@crates//:tracing",
|
||||
"@crates//:tracing-subscriber",
|
||||
],
|
||||
)
|
||||
|
||||
# Legacy filegroup for backwards compatibility
|
||||
filegroup(
|
||||
name = "proto",
|
||||
|
|
|
|||
|
|
@ -11,15 +11,18 @@ use crate::{
|
|||
use prost::Message;
|
||||
use rusqlite::Connection;
|
||||
use std::fmt::Debug;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
pub trait BELStorage {
|
||||
pub trait BELStorage: Send + Sync {
|
||||
fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError>;
|
||||
fn list_events(
|
||||
&self,
|
||||
since_idx: u64,
|
||||
limit: u64,
|
||||
) -> Result<Vec<DataBuildEvent>, DatabuildError>;
|
||||
fn get_event(&self, event_id: u64) -> Result<Option<DataBuildEvent>, DatabuildError>;
|
||||
fn latest_event_id(&self) -> Result<u64, DatabuildError>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
|
@ -64,15 +67,23 @@ impl BELStorage for MemoryBELStorage {
|
|||
.take(limit as usize)
|
||||
.collect())
|
||||
}
|
||||
|
||||
fn get_event(&self, event_id: u64) -> Result<Option<DataBuildEvent>, DatabuildError> {
|
||||
Ok(self.events.iter().find(|e| e.event_id == event_id).cloned())
|
||||
}
|
||||
|
||||
fn latest_event_id(&self) -> Result<u64, DatabuildError> {
|
||||
Ok(self.events.len().saturating_sub(1) as u64)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SqliteBELStorage {
|
||||
connection: Connection,
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SqliteBELStorage {
|
||||
connection: Arc<Mutex<Connection>>,
|
||||
}
|
||||
|
||||
impl SqliteBELStorage {
|
||||
fn create(database_url: &str) -> Result<SqliteBELStorage, DatabuildError> {
|
||||
pub fn create(database_url: &str) -> Result<SqliteBELStorage, DatabuildError> {
|
||||
let connection = Connection::open(database_url)?;
|
||||
|
||||
// Create the events table
|
||||
|
|
@ -85,7 +96,9 @@ impl SqliteBELStorage {
|
|||
(),
|
||||
)?;
|
||||
|
||||
Ok(SqliteBELStorage { connection })
|
||||
Ok(SqliteBELStorage {
|
||||
connection: Arc::new(Mutex::new(connection)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -105,12 +118,17 @@ impl BELStorage for SqliteBELStorage {
|
|||
let mut buf = Vec::new();
|
||||
prost::Message::encode(&dbe, &mut buf)?;
|
||||
|
||||
self.connection.execute(
|
||||
let connection = self
|
||||
.connection
|
||||
.lock()
|
||||
.map_err(|e| format!("Failed to acquire lock: {}", e))?;
|
||||
|
||||
connection.execute(
|
||||
"INSERT INTO events (timestamp, event_data) VALUES (?1, ?2)",
|
||||
(×tamp, &buf),
|
||||
)?;
|
||||
|
||||
let event_id = self.connection.last_insert_rowid() as u64;
|
||||
let event_id = connection.last_insert_rowid() as u64;
|
||||
Ok(event_id)
|
||||
}
|
||||
|
||||
|
|
@ -119,7 +137,12 @@ impl BELStorage for SqliteBELStorage {
|
|||
since_idx: u64,
|
||||
limit: u64,
|
||||
) -> Result<Vec<DataBuildEvent>, DatabuildError> {
|
||||
let mut stmt = self.connection.prepare(
|
||||
let connection = self
|
||||
.connection
|
||||
.lock()
|
||||
.map_err(|e| format!("Failed to acquire lock: {}", e))?;
|
||||
|
||||
let mut stmt = connection.prepare(
|
||||
"SELECT event_id, timestamp, event_data FROM events
|
||||
WHERE timestamp > ?1
|
||||
ORDER BY event_id
|
||||
|
|
@ -156,22 +179,93 @@ impl BELStorage for SqliteBELStorage {
|
|||
|
||||
Ok(events)
|
||||
}
|
||||
|
||||
fn get_event(&self, event_id: u64) -> Result<Option<DataBuildEvent>, DatabuildError> {
|
||||
let connection = self
|
||||
.connection
|
||||
.lock()
|
||||
.map_err(|e| format!("Failed to acquire lock: {}", e))?;
|
||||
|
||||
let mut stmt = connection
|
||||
.prepare("SELECT event_id, timestamp, event_data FROM events WHERE event_id = ?1")?;
|
||||
|
||||
let result = stmt.query_row([event_id], |row| {
|
||||
let event_id: u64 = row.get(0)?;
|
||||
let timestamp: u64 = row.get(1)?;
|
||||
let event_data: Vec<u8> = row.get(2)?;
|
||||
|
||||
// Deserialize the event using prost
|
||||
let mut dbe = DataBuildEvent::decode(event_data.as_slice()).map_err(|_e| {
|
||||
rusqlite::Error::InvalidColumnType(
|
||||
0,
|
||||
"event_data".to_string(),
|
||||
rusqlite::types::Type::Blob,
|
||||
)
|
||||
})?;
|
||||
|
||||
// Update the event_id from the database
|
||||
dbe.event_id = event_id;
|
||||
dbe.timestamp = timestamp;
|
||||
|
||||
Ok(dbe)
|
||||
});
|
||||
|
||||
match result {
|
||||
Ok(event) => Ok(Some(event)),
|
||||
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
fn latest_event_id(&self) -> Result<u64, DatabuildError> {
|
||||
let connection = self
|
||||
.connection
|
||||
.lock()
|
||||
.map_err(|e| format!("Failed to acquire lock: {}", e))?;
|
||||
|
||||
let result: Result<u64, rusqlite::Error> =
|
||||
connection.query_row("SELECT MAX(event_id) FROM events", [], |row| row.get(0));
|
||||
|
||||
match result {
|
||||
Ok(id) => Ok(id),
|
||||
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct BuildEventLog<S: BELStorage + Debug> {
|
||||
pub storage: S,
|
||||
pub state: BuildState,
|
||||
/// Optional event broadcaster for HTTP server mirroring
|
||||
#[cfg_attr(not(feature = "server"), allow(dead_code))]
|
||||
pub event_broadcaster: Option<tokio::sync::broadcast::Sender<Event>>,
|
||||
}
|
||||
|
||||
impl<S: BELStorage + Debug> BuildEventLog<S> {
|
||||
pub fn new(storage: S, state: BuildState) -> BuildEventLog<S> {
|
||||
BuildEventLog { storage, state }
|
||||
BuildEventLog {
|
||||
storage,
|
||||
state,
|
||||
event_broadcaster: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_broadcaster(mut self, broadcaster: tokio::sync::broadcast::Sender<Event>) -> Self {
|
||||
self.event_broadcaster = Some(broadcaster);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> {
|
||||
let events = self.state.handle_event(&event);
|
||||
let idx = self.storage.append_event(event)?;
|
||||
|
||||
// Broadcast event to HTTP server (if configured)
|
||||
if let Some(ref tx) = self.event_broadcaster {
|
||||
let _ = tx.send(event.clone());
|
||||
}
|
||||
|
||||
// Recursion here might be dangerous, but in theory the event propagation always terminates
|
||||
for event in events {
|
||||
self.append_event(&event)?;
|
||||
|
|
@ -179,6 +273,13 @@ impl<S: BELStorage + Debug> BuildEventLog<S> {
|
|||
Ok(idx)
|
||||
}
|
||||
|
||||
pub fn append_event_no_recurse(&mut self, event: &Event) -> Result<u64, DatabuildError> {
|
||||
self.state.handle_event(&event);
|
||||
let idx = self.storage.append_event(event)?;
|
||||
// Recursion here might be dangerous, but in theory the event propagation always terminates
|
||||
Ok(idx)
|
||||
}
|
||||
|
||||
// API methods
|
||||
pub fn api_handle_list_wants(&self, req: ListWantsRequest) -> ListWantsResponse {
|
||||
self.state.list_wants(&req)
|
||||
|
|
@ -244,6 +345,7 @@ impl Clone for BuildEventLog<MemoryBELStorage> {
|
|||
Self {
|
||||
storage: self.storage.clone(),
|
||||
state: self.state.clone(),
|
||||
event_broadcaster: self.event_broadcaster.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -261,7 +363,11 @@ mod tests {
|
|||
let storage =
|
||||
SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage");
|
||||
let state = BuildState::default();
|
||||
let mut log = BuildEventLog { storage, state };
|
||||
let mut log = BuildEventLog {
|
||||
storage,
|
||||
state,
|
||||
event_broadcaster: None,
|
||||
};
|
||||
|
||||
let want_id = "sqlite_test_1234".to_string();
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
use crate::data_build_event::Event;
|
||||
use crate::data_deps::{WantTimestamps, missing_deps_to_want_events};
|
||||
use crate::event_source::Source as EventSourceVariant;
|
||||
use crate::job_run_state::{JobRun, JobRunWithState, QueuedState as JobQueuedState};
|
||||
use crate::partition_state::{
|
||||
BuildingPartitionRef, FailedPartitionRef, LivePartitionRef, MissingPartitionRef, MissingState,
|
||||
|
|
@ -19,6 +20,7 @@ use crate::{
|
|||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::BTreeMap;
|
||||
use tracing;
|
||||
|
||||
/**
|
||||
Design Notes
|
||||
|
|
@ -45,7 +47,7 @@ critical that these state machines, their states, and their transitions are type
|
|||
|
||||
/// Tracks all application state, defines valid state transitions, and manages cross-state machine
|
||||
/// state transitions (e.g. job run success resulting in partition going from Building to Live)
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct BuildState {
|
||||
wants: BTreeMap<String, Want>, // Type-safe want storage
|
||||
taints: BTreeMap<String, TaintDetail>,
|
||||
|
|
@ -53,18 +55,21 @@ pub struct BuildState {
|
|||
job_runs: BTreeMap<String, JobRun>, // Type-safe job run storage
|
||||
}
|
||||
|
||||
impl Default for BuildState {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
wants: Default::default(),
|
||||
taints: Default::default(),
|
||||
partitions: Default::default(),
|
||||
job_runs: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BuildState {
|
||||
/// Reconstruct BuildState from a sequence of events (for read path in web server)
|
||||
/// This allows the web server to rebuild state from BEL storage without holding a lock
|
||||
pub fn from_events(events: &[crate::DataBuildEvent]) -> Self {
|
||||
let mut state = BuildState::default();
|
||||
for event in events {
|
||||
if let Some(ref inner_event) = event.event {
|
||||
// handle_event returns Vec<Event> for cascading events, but we ignore them
|
||||
// since we're replaying from a complete event log
|
||||
state.handle_event(inner_event);
|
||||
}
|
||||
}
|
||||
state
|
||||
}
|
||||
|
||||
pub fn count_job_runs(&self) -> usize {
|
||||
self.job_runs.len()
|
||||
}
|
||||
|
|
@ -86,6 +91,123 @@ impl BuildState {
|
|||
}
|
||||
}
|
||||
|
||||
/// Handle creation of a derivative want (created due to job dep miss)
|
||||
///
|
||||
/// When a job reports missing dependencies, it returns WantCreateV1 events for those missing partitions.
|
||||
/// Those events get appended to the BEL and eventually processed by handle_want_create().
|
||||
///
|
||||
/// This function is called when we detect a derivative want (has source.job_triggered) and transitions
|
||||
/// the impacted wants to UpstreamBuilding state, tracking the derivative want ID as an upstream dependency.
|
||||
///
|
||||
/// KEY INSIGHT: We must use the actual want_id from the WantCreateV1 event, not synthetic UUIDs generated
|
||||
/// during event processing. This ensures replay works correctly - the same want IDs are used both during
|
||||
/// original execution and during replay from the BEL.
|
||||
fn handle_derivative_want_creation(
|
||||
&mut self,
|
||||
derivative_want_id: &str,
|
||||
derivative_want_partitions: &[PartitionRef],
|
||||
source_job_run_id: &str,
|
||||
) {
|
||||
// Look up the job run that triggered this derivative want
|
||||
// This job run must be in DepMiss state because it reported missing dependencies
|
||||
let job_run = self.job_runs.get(source_job_run_id).expect(&format!(
|
||||
"BUG: Job run {} must exist when derivative want created",
|
||||
source_job_run_id
|
||||
));
|
||||
|
||||
// Extract the missing deps from the DepMiss job run
|
||||
let missing_deps = match job_run {
|
||||
JobRun::DepMiss(dep_miss) => dep_miss.get_missing_deps(),
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Job run {} must be in DepMiss state when derivative want created, found {:?}",
|
||||
source_job_run_id, job_run
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
// Find which MissingDeps entry corresponds to this derivative want
|
||||
// The derivative want was created for a specific set of missing partitions,
|
||||
// and we need to find which downstream partitions are impacted by those missing partitions
|
||||
for md in missing_deps {
|
||||
// Check if this derivative want's partitions match the missing partitions in this entry
|
||||
// We need exact match because one dep miss event can create multiple derivative wants
|
||||
let partitions_match = md.missing.iter().all(|missing_ref| {
|
||||
derivative_want_partitions
|
||||
.iter()
|
||||
.any(|p| p.r#ref == missing_ref.r#ref)
|
||||
}) && derivative_want_partitions.len() == md.missing.len();
|
||||
|
||||
if partitions_match {
|
||||
// Now we know which partitions are impacted by this missing dependency
|
||||
let impacted_partition_refs: Vec<String> =
|
||||
md.impacted.iter().map(|p| p.r#ref.clone()).collect();
|
||||
|
||||
tracing::debug!(
|
||||
derivative_want_id = %derivative_want_id,
|
||||
source_job_run_id = %source_job_run_id,
|
||||
missing_partitions = ?derivative_want_partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
|
||||
impacted_partitions = ?impacted_partition_refs,
|
||||
"Processing derivative want creation"
|
||||
);
|
||||
|
||||
// Find all wants that include these impacted partitions
|
||||
// These are the wants that need to wait for the derivative want to complete
|
||||
let mut impacted_want_ids: std::collections::HashSet<String> =
|
||||
std::collections::HashSet::new();
|
||||
for partition_ref in &impacted_partition_refs {
|
||||
if let Some(partition) = self.partitions.get(partition_ref) {
|
||||
for want_id in partition.want_ids() {
|
||||
impacted_want_ids.insert(want_id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Transition each impacted want to UpstreamBuilding, tracking this derivative want as an upstream
|
||||
for want_id in impacted_want_ids {
|
||||
let want = self.wants.remove(&want_id).expect(&format!(
|
||||
"BUG: Want {} must exist when processing derivative want",
|
||||
want_id
|
||||
));
|
||||
|
||||
let transitioned = match want {
|
||||
Want::Building(building) => {
|
||||
// First dep miss for this want: Building → UpstreamBuilding
|
||||
tracing::info!(
|
||||
want_id = %want_id,
|
||||
derivative_want_id = %derivative_want_id,
|
||||
"Want: Building → UpstreamBuilding (first missing dep detected)"
|
||||
);
|
||||
Want::UpstreamBuilding(
|
||||
building.detect_missing_deps(vec![derivative_want_id.to_string()]),
|
||||
)
|
||||
}
|
||||
Want::UpstreamBuilding(upstream) => {
|
||||
// Additional dep miss: UpstreamBuilding → UpstreamBuilding (add another upstream)
|
||||
// This can happen if multiple jobs report dep misses for different upstreams
|
||||
tracing::info!(
|
||||
want_id = %want_id,
|
||||
derivative_want_id = %derivative_want_id,
|
||||
"Want: UpstreamBuilding → UpstreamBuilding (additional upstream added)"
|
||||
);
|
||||
Want::UpstreamBuilding(
|
||||
upstream.add_upstreams(vec![derivative_want_id.to_string()]),
|
||||
)
|
||||
}
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Want {} in invalid state {:?} when processing derivative want. Should be Building or UpstreamBuilding.",
|
||||
want_id, want
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
self.wants.insert(want_id, transitioned);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition partitions from Missing to Building state
|
||||
/// Used when a job run starts building partitions
|
||||
fn transition_partitions_to_building(
|
||||
|
|
@ -99,6 +221,11 @@ impl BuildState {
|
|||
let transitioned = match partition {
|
||||
// Valid: Missing -> Building
|
||||
Partition::Missing(missing) => {
|
||||
tracing::info!(
|
||||
partition = %building_ref.0.r#ref,
|
||||
job_run_id = %job_run_id,
|
||||
"Partition: Missing → Building"
|
||||
);
|
||||
Partition::Building(missing.start_building(job_run_id.to_string()))
|
||||
}
|
||||
// Invalid state: partition should not already be Building, Live, Failed, or Tainted
|
||||
|
|
@ -116,6 +243,11 @@ impl BuildState {
|
|||
let missing = Partition::new_missing(building_ref.0.clone());
|
||||
if let Partition::Missing(m) = missing {
|
||||
let building = m.start_building(job_run_id.to_string());
|
||||
tracing::info!(
|
||||
partition = %building_ref.0.r#ref,
|
||||
job_run_id = %job_run_id,
|
||||
"Partition: Missing → Building (created)"
|
||||
);
|
||||
self.partitions
|
||||
.insert(building_ref.0.r#ref.clone(), Partition::Building(building));
|
||||
}
|
||||
|
|
@ -140,6 +272,11 @@ impl BuildState {
|
|||
// ONLY valid transition: Building -> Live
|
||||
let transitioned = match partition {
|
||||
Partition::Building(building) => {
|
||||
tracing::info!(
|
||||
partition = %pref.0.r#ref,
|
||||
job_run_id = %job_run_id,
|
||||
"Partition: Building → Live"
|
||||
);
|
||||
Partition::Live(building.complete(job_run_id.to_string(), timestamp))
|
||||
}
|
||||
// All other states are invalid
|
||||
|
|
@ -171,6 +308,11 @@ impl BuildState {
|
|||
// ONLY valid transition: Building -> Failed
|
||||
let transitioned = match partition {
|
||||
Partition::Building(building) => {
|
||||
tracing::info!(
|
||||
partition = %pref.0.r#ref,
|
||||
job_run_id = %job_run_id,
|
||||
"Partition: Building → Failed"
|
||||
);
|
||||
Partition::Failed(building.fail(job_run_id.to_string(), timestamp))
|
||||
}
|
||||
// All other states are invalid
|
||||
|
|
@ -199,7 +341,13 @@ impl BuildState {
|
|||
|
||||
// Only valid transition: Building -> Missing
|
||||
let transitioned = match partition {
|
||||
Partition::Building(building) => Partition::Missing(building.reset_to_missing()),
|
||||
Partition::Building(building) => {
|
||||
tracing::info!(
|
||||
partition = %building_ref.0.r#ref,
|
||||
"Partition: Building → Missing (dep miss)"
|
||||
);
|
||||
Partition::Missing(building.reset_to_missing())
|
||||
}
|
||||
// All other states are invalid
|
||||
_ => {
|
||||
panic!(
|
||||
|
|
@ -249,6 +397,11 @@ impl BuildState {
|
|||
if all_partitions_live {
|
||||
let successful_want =
|
||||
building.complete(job_run_id.to_string(), timestamp);
|
||||
tracing::info!(
|
||||
want_id = %want_id,
|
||||
job_run_id = %job_run_id,
|
||||
"Want: Building → Successful"
|
||||
);
|
||||
newly_successful_wants.push(successful_want.get_id());
|
||||
Want::Successful(successful_want)
|
||||
} else {
|
||||
|
|
@ -326,6 +479,13 @@ impl BuildState {
|
|||
job_run_id: &str,
|
||||
timestamp: u64,
|
||||
) {
|
||||
tracing::debug!(
|
||||
newly_successful_wants = ?newly_successful_wants
|
||||
.iter()
|
||||
.map(|w| &w.0)
|
||||
.collect::<Vec<_>>(),
|
||||
"Checking downstream wants for unblocking"
|
||||
);
|
||||
// Find downstream wants that are waiting for any of the newly successful wants
|
||||
// TODO: Consider adding upstream_want_id -> downstream_want_ids index to avoid iterating all wants
|
||||
let downstream_wants_to_check: Vec<String> = self
|
||||
|
|
@ -345,6 +505,10 @@ impl BuildState {
|
|||
}
|
||||
})
|
||||
.collect();
|
||||
tracing::debug!(
|
||||
downstream_wants_to_check = ?downstream_wants_to_check,
|
||||
"Found downstream wants affected by upstream completion"
|
||||
);
|
||||
|
||||
for want_id in downstream_wants_to_check {
|
||||
let want = self
|
||||
|
|
@ -354,6 +518,11 @@ impl BuildState {
|
|||
|
||||
let transitioned = match want {
|
||||
Want::UpstreamBuilding(downstream_want) => {
|
||||
tracing::debug!(
|
||||
want_id = %want_id,
|
||||
upstreams = ?downstream_want.state.upstream_want_ids,
|
||||
"Checking if all upstreams are satisfied"
|
||||
);
|
||||
// Check if ALL of this downstream want's upstream dependencies are now Successful
|
||||
let all_upstreams_successful = downstream_want
|
||||
.state
|
||||
|
|
@ -365,6 +534,11 @@ impl BuildState {
|
|||
.map(|w| matches!(w, Want::Successful(_)))
|
||||
.unwrap_or(false)
|
||||
});
|
||||
tracing::debug!(
|
||||
want_id = %want_id,
|
||||
all_upstreams_successful = %all_upstreams_successful,
|
||||
"Upstream satisfaction check complete"
|
||||
);
|
||||
|
||||
if all_upstreams_successful {
|
||||
// Check if any of this want's partitions are still being built
|
||||
|
|
@ -377,19 +551,37 @@ impl BuildState {
|
|||
.map(|partition| matches!(partition, Partition::Building(_)))
|
||||
.unwrap_or(false)
|
||||
});
|
||||
tracing::debug!(
|
||||
want_id = %want_id,
|
||||
any_partition_building = %any_partition_building,
|
||||
"Partition building status check"
|
||||
);
|
||||
|
||||
if any_partition_building {
|
||||
// Some partitions still being built, continue in Building state
|
||||
tracing::info!(
|
||||
want_id = %want_id,
|
||||
job_run_id = %job_run_id,
|
||||
"Want: UpstreamBuilding → Building (upstreams satisfied, partitions building)"
|
||||
);
|
||||
Want::Building(
|
||||
downstream_want
|
||||
.continue_building(job_run_id.to_string(), timestamp),
|
||||
)
|
||||
} else {
|
||||
// No partitions being built, become schedulable again
|
||||
tracing::info!(
|
||||
want_id = %want_id,
|
||||
"Want: UpstreamBuilding → Idle (upstreams satisfied, ready to schedule)"
|
||||
);
|
||||
Want::Idle(downstream_want.upstreams_satisfied())
|
||||
}
|
||||
} else {
|
||||
// Upstreams not all satisfied yet, stay in UpstreamBuilding
|
||||
tracing::debug!(
|
||||
want_id = %want_id,
|
||||
"Want remains in UpstreamBuilding state (upstreams not yet satisfied)"
|
||||
);
|
||||
Want::UpstreamBuilding(downstream_want)
|
||||
}
|
||||
}
|
||||
|
|
@ -454,109 +646,6 @@ impl BuildState {
|
|||
}
|
||||
}
|
||||
|
||||
/// Build a mapping from partition references to the want IDs that will build them
|
||||
/// Used to track which upstream wants a downstream want depends on after a dep miss
|
||||
fn build_partition_to_want_mapping(
|
||||
&self,
|
||||
want_events: &[Event],
|
||||
) -> std::collections::HashMap<String, String> {
|
||||
let mut partition_to_want_map: std::collections::HashMap<String, String> =
|
||||
std::collections::HashMap::new();
|
||||
|
||||
for event_item in want_events {
|
||||
if let Event::WantCreateV1(want_create) = event_item {
|
||||
for pref in &want_create.partitions {
|
||||
partition_to_want_map.insert(pref.r#ref.clone(), want_create.want_id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
partition_to_want_map
|
||||
}
|
||||
|
||||
/// Collect upstream want IDs that a servicing want now depends on based on dep misses
|
||||
/// Returns a deduplicated, sorted list of upstream want IDs
|
||||
fn collect_upstream_want_ids(
|
||||
&self,
|
||||
servicing_want: &crate::WantAttributedPartitions,
|
||||
missing_deps: &[crate::MissingDeps],
|
||||
partition_to_want_map: &std::collections::HashMap<String, String>,
|
||||
) -> Vec<String> {
|
||||
let mut new_upstream_want_ids = Vec::new();
|
||||
|
||||
for missing_dep in missing_deps {
|
||||
// Only process if this want contains an impacted partition
|
||||
let is_impacted = missing_dep.impacted.iter().any(|imp| {
|
||||
servicing_want
|
||||
.partitions
|
||||
.iter()
|
||||
.any(|p| p.r#ref == imp.r#ref)
|
||||
});
|
||||
|
||||
if is_impacted {
|
||||
// For each missing partition, find the want ID that will build it
|
||||
for missing_partition in &missing_dep.missing {
|
||||
if let Some(want_id) = partition_to_want_map.get(&missing_partition.r#ref) {
|
||||
new_upstream_want_ids.push(want_id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Dedupe upstream want IDs (one job might report same dep multiple times)
|
||||
new_upstream_want_ids.sort();
|
||||
new_upstream_want_ids.dedup();
|
||||
|
||||
new_upstream_want_ids
|
||||
}
|
||||
|
||||
/// Transition wants to UpstreamBuilding when they have missing dependencies
|
||||
/// Handles Building → UpstreamBuilding and UpstreamBuilding → UpstreamBuilding (add upstreams)
|
||||
fn transition_wants_to_upstream_building(
|
||||
&mut self,
|
||||
servicing_wants: &[crate::WantAttributedPartitions],
|
||||
missing_deps: &[crate::MissingDeps],
|
||||
partition_to_want_map: &std::collections::HashMap<String, String>,
|
||||
) {
|
||||
// For each want serviced by this job run, check if it was impacted by missing deps
|
||||
for servicing_want in servicing_wants {
|
||||
let want = self.wants.remove(&servicing_want.want_id).expect(&format!(
|
||||
"BUG: Want {} must exist when serviced by job run",
|
||||
servicing_want.want_id
|
||||
));
|
||||
|
||||
// Collect the upstream want IDs that this want now depends on
|
||||
let new_upstream_want_ids =
|
||||
self.collect_upstream_want_ids(servicing_want, missing_deps, partition_to_want_map);
|
||||
|
||||
let transitioned = if !new_upstream_want_ids.is_empty() {
|
||||
match want {
|
||||
Want::Building(building) => {
|
||||
// First dep miss for this want: Building → UpstreamBuilding
|
||||
Want::UpstreamBuilding(building.detect_missing_deps(new_upstream_want_ids))
|
||||
}
|
||||
Want::UpstreamBuilding(upstream) => {
|
||||
// Already in UpstreamBuilding, add more upstreams (self-transition)
|
||||
// This can happen if multiple jobs report dep misses, or one job reports multiple dep misses
|
||||
Want::UpstreamBuilding(upstream.add_upstreams(new_upstream_want_ids))
|
||||
}
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Want {} in invalid state {:?} when job run had dep miss. Should be Building or UpstreamBuilding.",
|
||||
servicing_want.want_id, want
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No new upstreams for this want (it wasn't impacted), keep current state
|
||||
want
|
||||
};
|
||||
|
||||
self.wants
|
||||
.insert(servicing_want.want_id.clone(), transitioned);
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles reacting to events, updating state, and erroring if its an invalid state transition
|
||||
/// Event handlers can return vecs of events that will then be appended to the BEL
|
||||
pub fn handle_event(&mut self, event: &Event) -> Vec<Event> {
|
||||
|
|
@ -582,6 +671,25 @@ impl BuildState {
|
|||
fn handle_want_create(&mut self, event: &WantCreateEventV1) -> Vec<Event> {
|
||||
// Use From impl to create want in Idle state
|
||||
let want_idle: WantWithState<WantIdleState> = event.clone().into();
|
||||
|
||||
// Log creation with derivative vs user-created distinction
|
||||
if let Some(source) = &event.source {
|
||||
if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source {
|
||||
tracing::info!(
|
||||
want_id = %event.want_id,
|
||||
partitions = ?event.partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
|
||||
source_job_run_id = %job_triggered.job_run_id,
|
||||
"Want created (derivative - auto-created due to missing dependency)"
|
||||
);
|
||||
}
|
||||
} else {
|
||||
tracing::info!(
|
||||
want_id = %event.want_id,
|
||||
partitions = ?event.partitions.iter().map(|p| &p.r#ref).collect::<Vec<_>>(),
|
||||
"Want created (user-requested)"
|
||||
);
|
||||
}
|
||||
|
||||
self.wants
|
||||
.insert(event.want_id.clone(), Want::Idle(want_idle));
|
||||
|
||||
|
|
@ -590,6 +698,17 @@ impl BuildState {
|
|||
self.add_want_to_partition(pref, &event.want_id);
|
||||
}
|
||||
|
||||
// If this is a derivative want (triggered by a job's dep miss), transition impacted wants to UpstreamBuilding
|
||||
if let Some(source) = &event.source {
|
||||
if let Some(EventSourceVariant::JobTriggered(job_triggered)) = &source.source {
|
||||
self.handle_derivative_want_creation(
|
||||
&event.want_id,
|
||||
&event.partitions,
|
||||
&job_triggered.job_run_id,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
vec![]
|
||||
}
|
||||
|
||||
|
|
@ -657,6 +776,11 @@ impl BuildState {
|
|||
let transitioned = match want {
|
||||
Want::Idle(idle) => {
|
||||
// First job starting for this want
|
||||
tracing::info!(
|
||||
want_id = %wap.want_id,
|
||||
job_run_id = %event.job_run_id,
|
||||
"Want: Idle → Building (job scheduled)"
|
||||
);
|
||||
Want::Building(idle.start_building(current_timestamp()))
|
||||
}
|
||||
Want::Building(building) => {
|
||||
|
|
@ -698,7 +822,13 @@ impl BuildState {
|
|||
|
||||
let running = match job_run {
|
||||
// First heartbeat: Queued -> Running
|
||||
JobRun::Queued(queued) => queued.start_running(current_timestamp()),
|
||||
JobRun::Queued(queued) => {
|
||||
tracing::info!(
|
||||
job_run_id = %event.job_run_id,
|
||||
"JobRun: Queued → Running"
|
||||
);
|
||||
queued.start_running(current_timestamp())
|
||||
}
|
||||
// Subsequent heartbeat: update timestamp
|
||||
JobRun::Running(running) => running.heartbeat(current_timestamp()),
|
||||
_ => {
|
||||
|
|
@ -721,7 +851,13 @@ impl BuildState {
|
|||
));
|
||||
|
||||
let succeeded = match job_run {
|
||||
JobRun::Running(running) => running.succeed(current_timestamp()),
|
||||
JobRun::Running(running) => {
|
||||
tracing::info!(
|
||||
job_run_id = %event.job_run_id,
|
||||
"JobRun: Running → Succeeded"
|
||||
);
|
||||
running.succeed(current_timestamp())
|
||||
}
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Success event received for job run {} in invalid state {:?}. Job must be Running to succeed.",
|
||||
|
|
@ -766,7 +902,14 @@ impl BuildState {
|
|||
));
|
||||
|
||||
let failed = match job_run {
|
||||
JobRun::Running(running) => running.fail(current_timestamp(), event.reason.clone()),
|
||||
JobRun::Running(running) => {
|
||||
tracing::info!(
|
||||
job_run_id = %event.job_run_id,
|
||||
reason = %event.reason,
|
||||
"JobRun: Running → Failed"
|
||||
);
|
||||
running.fail(current_timestamp(), event.reason.clone())
|
||||
}
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: Failure event received for job run {} in invalid state {:?}. Job must be Running to fail.",
|
||||
|
|
@ -838,11 +981,20 @@ impl BuildState {
|
|||
));
|
||||
|
||||
let dep_miss = match job_run {
|
||||
JobRun::Running(running) => running.dep_miss(
|
||||
current_timestamp(),
|
||||
event.missing_deps.clone(),
|
||||
event.read_deps.clone(),
|
||||
),
|
||||
JobRun::Running(running) => {
|
||||
tracing::info!(
|
||||
job_run_id = %event.job_run_id,
|
||||
missing_deps = ?event.missing_deps.iter()
|
||||
.flat_map(|md| md.missing.iter().map(|p| &p.r#ref))
|
||||
.collect::<Vec<_>>(),
|
||||
"JobRun: Running → DepMiss (missing dependencies detected)"
|
||||
);
|
||||
running.dep_miss(
|
||||
current_timestamp(),
|
||||
event.missing_deps.clone(),
|
||||
event.read_deps.clone(),
|
||||
)
|
||||
}
|
||||
_ => {
|
||||
panic!(
|
||||
"BUG: DepMiss event received for job run {} in invalid state {:?}. Job must be Running to hit dep miss.",
|
||||
|
|
@ -864,32 +1016,26 @@ impl BuildState {
|
|||
let building_refs_to_reset = dep_miss.get_building_partitions_to_reset();
|
||||
self.reset_partitions_to_missing(&building_refs_to_reset);
|
||||
|
||||
// Create wants from dep misses
|
||||
// Generate WantCreateV1 events for the missing dependencies
|
||||
// These events will be returned and appended to the BEL by BuildEventLog.append_event()
|
||||
let want_events = missing_deps_to_want_events(
|
||||
dep_miss.get_missing_deps().to_vec(),
|
||||
&event.job_run_id,
|
||||
want_timestamps,
|
||||
);
|
||||
|
||||
// Building → UpstreamBuilding OR UpstreamBuilding → UpstreamBuilding (add upstreams)
|
||||
// Store the job run in DepMiss state so we can access the missing_deps later
|
||||
// When the derivative WantCreateV1 events get processed by handle_want_create(),
|
||||
// they will look up this job run and use handle_derivative_want_creation() to
|
||||
// transition impacted wants to UpstreamBuilding with the correct want IDs.
|
||||
//
|
||||
// When a job reports missing dependencies, we need to:
|
||||
// 1. Create new wants for the missing partitions (done above via want_events)
|
||||
// 2. Transition the current want to UpstreamBuilding, tracking which upstream wants it's waiting for
|
||||
|
||||
// Build a map: partition_ref -> want_id that will build it
|
||||
let partition_to_want_map = self.build_partition_to_want_mapping(&want_events);
|
||||
|
||||
// Transition servicing wants to UpstreamBuilding when they have missing dependencies
|
||||
self.transition_wants_to_upstream_building(
|
||||
&dep_miss.info.servicing_wants,
|
||||
dep_miss.get_missing_deps(),
|
||||
&partition_to_want_map,
|
||||
);
|
||||
|
||||
// KEY: We do NOT transition wants here because the want_events have randomly generated UUIDs
|
||||
// that won't match during replay. Instead, we transition wants when processing the actual
|
||||
// WantCreateV1 events that get written to and read from the BEL.
|
||||
self.job_runs
|
||||
.insert(event.job_run_id.clone(), JobRun::DepMiss(dep_miss));
|
||||
|
||||
// Return derivative want events to be appended to the BEL
|
||||
want_events
|
||||
}
|
||||
|
||||
|
|
@ -1169,7 +1315,8 @@ mod tests {
|
|||
mod sqlite_build_state {
|
||||
mod want {
|
||||
use crate::build_state::BuildState;
|
||||
use crate::{WantCancelEventV1, WantCreateEventV1, WantDetail};
|
||||
use crate::data_build_event::Event;
|
||||
use crate::{MissingDeps, WantCancelEventV1, WantCreateEventV1, WantDetail};
|
||||
|
||||
#[test]
|
||||
fn test_should_create_want() {
|
||||
|
|
@ -1206,6 +1353,143 @@ mod tests {
|
|||
Some(crate::WantStatusCode::WantCanceled.into())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multihop_dependency_replay() {
|
||||
use crate::data_build_event::Event;
|
||||
use crate::{
|
||||
JobRunBufferEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1,
|
||||
JobRunSuccessEventV1, MissingDeps, PartitionRef, WantAttributedPartitions,
|
||||
WantCreateEventV1,
|
||||
};
|
||||
|
||||
let mut state = BuildState::default();
|
||||
let mut events = vec![];
|
||||
|
||||
// 1. Create want for data/beta
|
||||
let beta_want_id = "beta-want".to_string();
|
||||
let mut create_beta = WantCreateEventV1::default();
|
||||
create_beta.want_id = beta_want_id.clone();
|
||||
create_beta.partitions = vec![PartitionRef {
|
||||
r#ref: "data/beta".to_string(),
|
||||
}];
|
||||
events.push(Event::WantCreateV1(create_beta));
|
||||
|
||||
// 2. Queue beta job (first attempt)
|
||||
let beta_job_1_id = "beta-job-1".to_string();
|
||||
let mut buffer_beta_1 = JobRunBufferEventV1::default();
|
||||
buffer_beta_1.job_run_id = beta_job_1_id.clone();
|
||||
buffer_beta_1.job_label = "//job_beta".to_string();
|
||||
buffer_beta_1.want_attributed_partitions = vec![WantAttributedPartitions {
|
||||
want_id: beta_want_id.clone(),
|
||||
partitions: vec![PartitionRef {
|
||||
r#ref: "data/beta".to_string(),
|
||||
}],
|
||||
}];
|
||||
buffer_beta_1.building_partitions = vec![PartitionRef {
|
||||
r#ref: "data/beta".to_string(),
|
||||
}];
|
||||
events.push(Event::JobRunBufferV1(buffer_beta_1));
|
||||
|
||||
// 3. Beta job starts running
|
||||
let mut heartbeat_beta_1 = JobRunHeartbeatEventV1::default();
|
||||
heartbeat_beta_1.job_run_id = beta_job_1_id.clone();
|
||||
events.push(Event::JobRunHeartbeatV1(heartbeat_beta_1));
|
||||
|
||||
// 4. Beta job reports missing dependency on data/alpha
|
||||
let mut dep_miss_beta_1 = JobRunMissingDepsEventV1::default();
|
||||
dep_miss_beta_1.job_run_id = beta_job_1_id.clone();
|
||||
dep_miss_beta_1.missing_deps = vec![MissingDeps {
|
||||
impacted: vec![PartitionRef {
|
||||
r#ref: "data/beta".to_string(),
|
||||
}],
|
||||
missing: vec![PartitionRef {
|
||||
r#ref: "data/alpha".to_string(),
|
||||
}],
|
||||
}];
|
||||
events.push(Event::JobRunMissingDepsV1(dep_miss_beta_1));
|
||||
|
||||
// 5. Create derivative want for data/alpha
|
||||
let alpha_want_id = "alpha-want".to_string();
|
||||
let mut create_alpha = WantCreateEventV1::default();
|
||||
create_alpha.want_id = alpha_want_id.clone();
|
||||
create_alpha.partitions = vec![PartitionRef {
|
||||
r#ref: "data/alpha".to_string(),
|
||||
}];
|
||||
events.push(Event::WantCreateV1(create_alpha));
|
||||
|
||||
// 6. Queue alpha job
|
||||
let alpha_job_id = "alpha-job".to_string();
|
||||
let mut buffer_alpha = JobRunBufferEventV1::default();
|
||||
buffer_alpha.job_run_id = alpha_job_id.clone();
|
||||
buffer_alpha.job_label = "//job_alpha".to_string();
|
||||
buffer_alpha.want_attributed_partitions = vec![WantAttributedPartitions {
|
||||
want_id: alpha_want_id.clone(),
|
||||
partitions: vec![PartitionRef {
|
||||
r#ref: "data/alpha".to_string(),
|
||||
}],
|
||||
}];
|
||||
buffer_alpha.building_partitions = vec![PartitionRef {
|
||||
r#ref: "data/alpha".to_string(),
|
||||
}];
|
||||
events.push(Event::JobRunBufferV1(buffer_alpha));
|
||||
|
||||
// 7. Alpha job starts running
|
||||
let mut heartbeat_alpha = JobRunHeartbeatEventV1::default();
|
||||
heartbeat_alpha.job_run_id = alpha_job_id.clone();
|
||||
events.push(Event::JobRunHeartbeatV1(heartbeat_alpha));
|
||||
|
||||
// 8. Alpha job succeeds
|
||||
let mut success_alpha = JobRunSuccessEventV1::default();
|
||||
success_alpha.job_run_id = alpha_job_id.clone();
|
||||
events.push(Event::JobRunSuccessV1(success_alpha));
|
||||
|
||||
// 9. Queue beta job again (second attempt) - THIS IS THE CRITICAL MOMENT
|
||||
let beta_job_2_id = "beta-job-2".to_string();
|
||||
let mut buffer_beta_2 = JobRunBufferEventV1::default();
|
||||
buffer_beta_2.job_run_id = beta_job_2_id.clone();
|
||||
buffer_beta_2.job_label = "//job_beta".to_string();
|
||||
buffer_beta_2.want_attributed_partitions = vec![WantAttributedPartitions {
|
||||
want_id: beta_want_id.clone(),
|
||||
partitions: vec![PartitionRef {
|
||||
r#ref: "data/beta".to_string(),
|
||||
}],
|
||||
}];
|
||||
buffer_beta_2.building_partitions = vec![PartitionRef {
|
||||
r#ref: "data/beta".to_string(),
|
||||
}];
|
||||
events.push(Event::JobRunBufferV1(buffer_beta_2));
|
||||
|
||||
// 10. Beta job starts running
|
||||
let mut heartbeat_beta_2 = JobRunHeartbeatEventV1::default();
|
||||
heartbeat_beta_2.job_run_id = beta_job_2_id.clone();
|
||||
events.push(Event::JobRunHeartbeatV1(heartbeat_beta_2));
|
||||
|
||||
// 11. Beta job succeeds
|
||||
let mut success_beta_2 = JobRunSuccessEventV1::default();
|
||||
success_beta_2.job_run_id = beta_job_2_id.clone();
|
||||
events.push(Event::JobRunSuccessV1(success_beta_2));
|
||||
|
||||
// Process all events - this simulates replay
|
||||
for event in &events {
|
||||
state.handle_event(event);
|
||||
}
|
||||
|
||||
// Verify final state
|
||||
let beta_want = state.get_want(&beta_want_id).unwrap();
|
||||
assert_eq!(
|
||||
beta_want.status,
|
||||
Some(crate::WantStatusCode::WantSuccessful.into()),
|
||||
"Beta want should be successful after multi-hop dependency resolution"
|
||||
);
|
||||
|
||||
let alpha_want = state.get_want(&alpha_want_id).unwrap();
|
||||
assert_eq!(
|
||||
alpha_want.status,
|
||||
Some(crate::WantStatusCode::WantSuccessful.into()),
|
||||
"Alpha want should be successful"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
402
databuild/cli_main.rs
Normal file
402
databuild/cli_main.rs
Normal file
|
|
@ -0,0 +1,402 @@
|
|||
use clap::{Parser, Subcommand};
|
||||
use reqwest::blocking::Client;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
use lib::build_event_log::SqliteBELStorage;
|
||||
use lib::build_state::BuildState;
|
||||
use lib::config::DatabuildConfig;
|
||||
use lib::http_server::{create_router, AppState};
|
||||
use lib::orchestrator::{Orchestrator, OrchestratorConfig};
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(name = "databuild")]
|
||||
#[command(about = "DataBuild CLI - Build system for data pipelines", long_about = None)]
|
||||
struct Cli {
|
||||
/// Server URL (default: http://localhost:3000)
|
||||
#[arg(long, default_value = "http://localhost:3000", global = true)]
|
||||
server: String,
|
||||
|
||||
#[command(subcommand)]
|
||||
command: Commands,
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum Commands {
|
||||
/// Start the DataBuild HTTP server
|
||||
Serve {
|
||||
/// Port to listen on
|
||||
#[arg(long, default_value = "3000")]
|
||||
port: u16,
|
||||
|
||||
/// Database URL (default: :memory: for in-memory SQLite)
|
||||
#[arg(long, default_value = ":memory:")]
|
||||
database: String,
|
||||
|
||||
/// Path to configuration file (JSON or TOML)
|
||||
#[arg(long)]
|
||||
config: Option<String>,
|
||||
},
|
||||
|
||||
/// Create a new want (trigger partition builds)
|
||||
Want {
|
||||
/// Partition references to build (e.g., "data/users", "metrics/daily")
|
||||
partitions: Vec<String>,
|
||||
},
|
||||
|
||||
/// List and manage wants
|
||||
Wants {
|
||||
#[command(subcommand)]
|
||||
command: WantsCommands,
|
||||
},
|
||||
|
||||
/// List and inspect partitions
|
||||
Partitions {
|
||||
#[command(subcommand)]
|
||||
command: Option<PartitionsCommands>,
|
||||
},
|
||||
|
||||
/// List and inspect job runs
|
||||
JobRuns {
|
||||
#[command(subcommand)]
|
||||
command: Option<JobRunsCommands>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum WantsCommands {
|
||||
/// List all wants
|
||||
List,
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum PartitionsCommands {
|
||||
/// List all partitions
|
||||
List,
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum JobRunsCommands {
|
||||
/// List all job runs
|
||||
List,
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let cli = Cli::parse();
|
||||
|
||||
match cli.command {
|
||||
Commands::Serve { port, database, config } => {
|
||||
cmd_serve(port, &database, config.as_deref());
|
||||
}
|
||||
Commands::Want { partitions } => {
|
||||
cmd_want(&cli.server, partitions);
|
||||
}
|
||||
Commands::Wants { command } => match command {
|
||||
WantsCommands::List => {
|
||||
cmd_wants_list(&cli.server);
|
||||
}
|
||||
},
|
||||
Commands::Partitions { command } => match command {
|
||||
Some(PartitionsCommands::List) | None => {
|
||||
cmd_partitions_list(&cli.server);
|
||||
}
|
||||
},
|
||||
Commands::JobRuns { command } => match command {
|
||||
Some(JobRunsCommands::List) | None => {
|
||||
cmd_job_runs_list(&cli.server);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Command Implementations
|
||||
// ============================================================================
|
||||
|
||||
#[tokio::main]
|
||||
async fn cmd_serve(port: u16, database: &str, config_path: Option<&str>) {
|
||||
|
||||
// Initialize logging
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
// Load configuration if provided
|
||||
let jobs = if let Some(path) = config_path {
|
||||
match DatabuildConfig::from_file(path) {
|
||||
Ok(config) => {
|
||||
println!("Loaded configuration from: {}", path);
|
||||
println!(" Jobs: {}", config.jobs.len());
|
||||
config.into_job_configurations()
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Failed to load configuration from {}: {}", path, e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
// Create SQLite BEL storage (shared between orchestrator and HTTP server)
|
||||
let bel_storage = Arc::new(
|
||||
SqliteBELStorage::create(database).expect("Failed to create BEL storage"),
|
||||
);
|
||||
|
||||
// Create command channel for orchestrator communication
|
||||
let (command_tx, command_rx) = mpsc::channel(100);
|
||||
|
||||
// Create event broadcast channel (orchestrator -> HTTP server)
|
||||
let (event_tx, _event_rx) = broadcast::channel(1000);
|
||||
|
||||
// Create shutdown broadcast channel
|
||||
let (shutdown_tx, _shutdown_rx) = broadcast::channel(1);
|
||||
|
||||
// Create shared mirrored build state for HTTP server
|
||||
let mirrored_state = Arc::new(RwLock::new(BuildState::default()));
|
||||
|
||||
// Spawn state-mirror task to keep HTTP server's build state in sync
|
||||
let mirror_clone = mirrored_state.clone();
|
||||
let mut mirror_rx = event_tx.subscribe();
|
||||
tokio::spawn(async move {
|
||||
while let Ok(event) = mirror_rx.recv().await {
|
||||
match mirror_clone.write() {
|
||||
Ok(mut state) => {
|
||||
state.handle_event(&event);
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("State mirror task: RwLock poisoned, cannot update state: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Spawn orchestrator in background thread
|
||||
// Note: Orchestrator needs its own BEL storage instance for writes
|
||||
let orch_bel_storage = SqliteBELStorage::create(database).expect("Failed to create BEL storage");
|
||||
let orch_shutdown_rx = shutdown_tx.subscribe();
|
||||
let orch_handle = std::thread::spawn(move || {
|
||||
// Create orchestrator with both channels and jobs from config
|
||||
let config = OrchestratorConfig { jobs };
|
||||
let mut orchestrator = Orchestrator::new_with_channels(
|
||||
orch_bel_storage,
|
||||
config,
|
||||
command_rx,
|
||||
event_tx,
|
||||
);
|
||||
let mut shutdown_rx = orch_shutdown_rx;
|
||||
|
||||
// Run orchestrator loop
|
||||
loop {
|
||||
// Check for shutdown signal
|
||||
if shutdown_rx.try_recv().is_ok() {
|
||||
println!("Orchestrator received shutdown signal");
|
||||
break;
|
||||
}
|
||||
|
||||
if let Err(e) = orchestrator.step() {
|
||||
eprintln!("Orchestrator error: {}", e);
|
||||
}
|
||||
// Small sleep to avoid busy-waiting
|
||||
std::thread::sleep(std::time::Duration::from_millis(10));
|
||||
}
|
||||
});
|
||||
|
||||
// Create app state with mirrored state, shared storage, command sender, and shutdown channel
|
||||
let state = AppState::new(mirrored_state, bel_storage, command_tx, shutdown_tx.clone());
|
||||
|
||||
// Spawn idle timeout checker task
|
||||
let idle_state = state.clone();
|
||||
let idle_shutdown_tx = shutdown_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let idle_timeout = Duration::from_secs(3 * 60 * 60); // 3 hours
|
||||
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(60)).await;
|
||||
|
||||
let last_request = idle_state.last_request_time.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let now = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
|
||||
if now - last_request > idle_timeout.as_millis() as u64 {
|
||||
eprintln!(
|
||||
"Server idle for {} hours, shutting down",
|
||||
idle_timeout.as_secs() / 3600
|
||||
);
|
||||
let _ = idle_shutdown_tx.send(());
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Create router
|
||||
let app = create_router(state);
|
||||
|
||||
// Bind to specified port
|
||||
let addr = format!("127.0.0.1:{}", port);
|
||||
let listener = tokio::net::TcpListener::bind(&addr)
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("Failed to bind to {}", addr));
|
||||
|
||||
println!("DataBuild server listening on http://{}", addr);
|
||||
println!(" GET /health");
|
||||
println!(" GET /api/wants");
|
||||
println!(" POST /api/wants");
|
||||
println!(" GET /api/wants/:id");
|
||||
println!(" GET /api/partitions");
|
||||
println!(" GET /api/job_runs");
|
||||
|
||||
// Subscribe to shutdown signal for graceful shutdown
|
||||
let mut server_shutdown_rx = shutdown_tx.subscribe();
|
||||
|
||||
// Run the server with graceful shutdown
|
||||
axum::serve(listener, app)
|
||||
.with_graceful_shutdown(async move {
|
||||
let _ = server_shutdown_rx.recv().await;
|
||||
println!("HTTP server received shutdown signal");
|
||||
})
|
||||
.await
|
||||
.expect("Server error");
|
||||
|
||||
// Wait for orchestrator to finish
|
||||
let _ = orch_handle.join();
|
||||
println!("Shutdown complete");
|
||||
}
|
||||
|
||||
fn cmd_want(server_url: &str, partitions: Vec<String>) {
|
||||
let client = Client::new();
|
||||
|
||||
// Convert partition strings to PartitionRef objects
|
||||
let partition_refs: Vec<serde_json::Value> = partitions
|
||||
.iter()
|
||||
.map(|p| serde_json::json!({"ref": p}))
|
||||
.collect();
|
||||
|
||||
// Get current timestamp (milliseconds since epoch)
|
||||
let now = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
|
||||
let request = serde_json::json!({
|
||||
"partitions": partition_refs,
|
||||
"data_timestamp": now,
|
||||
"ttl_seconds": 3600, // 1 hour default
|
||||
"sla_seconds": 300 // 5 minutes default
|
||||
});
|
||||
|
||||
let url = format!("{}/api/wants", server_url);
|
||||
|
||||
match client.post(&url)
|
||||
.json(&request)
|
||||
.send()
|
||||
{
|
||||
Ok(response) => {
|
||||
if response.status().is_success() {
|
||||
println!("Want created successfully");
|
||||
if let Ok(body) = response.text() {
|
||||
println!("{}", body);
|
||||
}
|
||||
} else {
|
||||
eprintln!("Failed to create want: {}", response.status());
|
||||
if let Ok(body) = response.text() {
|
||||
eprintln!("{}", body);
|
||||
}
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Failed to connect to server: {}", e);
|
||||
eprintln!("Is the server running? Try: databuild serve");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn cmd_wants_list(server_url: &str) {
|
||||
let client = Client::new();
|
||||
let url = format!("{}/api/wants", server_url);
|
||||
|
||||
match client.get(&url).send() {
|
||||
Ok(response) => {
|
||||
if response.status().is_success() {
|
||||
match response.json::<serde_json::Value>() {
|
||||
Ok(json) => {
|
||||
println!("{}", serde_json::to_string_pretty(&json).unwrap());
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Failed to parse response: {}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
eprintln!("Request failed: {}", response.status());
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Failed to connect to server: {}", e);
|
||||
eprintln!("Is the server running? Try: databuild serve");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn cmd_partitions_list(server_url: &str) {
|
||||
let client = Client::new();
|
||||
let url = format!("{}/api/partitions", server_url);
|
||||
|
||||
match client.get(&url).send() {
|
||||
Ok(response) => {
|
||||
if response.status().is_success() {
|
||||
match response.json::<serde_json::Value>() {
|
||||
Ok(json) => {
|
||||
println!("{}", serde_json::to_string_pretty(&json).unwrap());
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Failed to parse response: {}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
eprintln!("Request failed: {}", response.status());
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Failed to connect to server: {}", e);
|
||||
eprintln!("Is the server running? Try: databuild serve");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn cmd_job_runs_list(server_url: &str) {
|
||||
let client = Client::new();
|
||||
let url = format!("{}/api/job_runs", server_url);
|
||||
|
||||
match client.get(&url).send() {
|
||||
Ok(response) => {
|
||||
if response.status().is_success() {
|
||||
match response.json::<serde_json::Value>() {
|
||||
Ok(json) => {
|
||||
println!("{}", serde_json::to_string_pretty(&json).unwrap());
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Failed to parse response: {}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
eprintln!("Request failed: {}", response.status());
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Failed to connect to server: {}", e);
|
||||
eprintln!("Is the server running? Try: databuild serve");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
19
databuild/commands.rs
Normal file
19
databuild/commands.rs
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
use crate::util::DatabuildError;
|
||||
use crate::{CancelWantRequest, CancelWantResponse, CreateWantRequest, CreateWantResponse};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
/// Commands that can be sent to the orchestrator via the command channel.
|
||||
/// Only write operations need commands; reads go directly to BEL storage.
|
||||
pub enum Command {
|
||||
/// Create a new want
|
||||
CreateWant {
|
||||
request: CreateWantRequest,
|
||||
reply: oneshot::Sender<Result<CreateWantResponse, DatabuildError>>,
|
||||
},
|
||||
|
||||
/// Cancel an existing want
|
||||
CancelWant {
|
||||
request: CancelWantRequest,
|
||||
reply: oneshot::Sender<Result<CancelWantResponse, DatabuildError>>,
|
||||
},
|
||||
}
|
||||
92
databuild/config.rs
Normal file
92
databuild/config.rs
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
use crate::JobConfig;
|
||||
use crate::job::JobConfiguration;
|
||||
use crate::util::DatabuildError;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
|
||||
/// Configuration file format for DataBuild application
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
pub struct DatabuildConfig {
|
||||
/// List of job configurations
|
||||
pub jobs: Vec<JobConfig>,
|
||||
}
|
||||
|
||||
impl DatabuildConfig {
|
||||
/// Load configuration from a file, auto-detecting format from extension
|
||||
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self, DatabuildError> {
|
||||
let path = path.as_ref();
|
||||
let contents = fs::read_to_string(path)
|
||||
.map_err(|e| DatabuildError::from(format!("Failed to read config file: {}", e)))?;
|
||||
|
||||
// Determine format from file extension
|
||||
let extension = path.extension().and_then(|s| s.to_str()).unwrap_or("");
|
||||
|
||||
match extension {
|
||||
"json" => Self::from_json(&contents),
|
||||
"toml" => Self::from_toml(&contents),
|
||||
_ => Err(DatabuildError::from(format!(
|
||||
"Unknown config file extension: {}. Use .json or .toml",
|
||||
extension
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse configuration from JSON string
|
||||
pub fn from_json(s: &str) -> Result<Self, DatabuildError> {
|
||||
serde_json::from_str(s)
|
||||
.map_err(|e| DatabuildError::from(format!("Failed to parse JSON config: {}", e)))
|
||||
}
|
||||
|
||||
/// Parse configuration from TOML string
|
||||
pub fn from_toml(s: &str) -> Result<Self, DatabuildError> {
|
||||
toml::from_str(s)
|
||||
.map_err(|e| DatabuildError::from(format!("Failed to parse TOML config: {}", e)))
|
||||
}
|
||||
|
||||
/// Convert to a list of JobConfiguration
|
||||
pub fn into_job_configurations(self) -> Vec<JobConfiguration> {
|
||||
self.jobs.into_iter().map(|jc| jc.into()).collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_parse_json_config() {
|
||||
let json = r#"
|
||||
{
|
||||
"jobs": [
|
||||
{
|
||||
"label": "//test:job_alpha",
|
||||
"entrypoint": "/usr/bin/python3",
|
||||
"environment": {"FOO": "bar"},
|
||||
"partition_patterns": ["data/alpha/.*"]
|
||||
}
|
||||
]
|
||||
}
|
||||
"#;
|
||||
|
||||
let config = DatabuildConfig::from_json(json).unwrap();
|
||||
assert_eq!(config.jobs.len(), 1);
|
||||
assert_eq!(config.jobs[0].label, "//test:job_alpha");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_toml_config() {
|
||||
let toml = r#"
|
||||
[[jobs]]
|
||||
label = "//test:job_alpha"
|
||||
entrypoint = "/usr/bin/python3"
|
||||
partition_patterns = ["data/alpha/.*"]
|
||||
|
||||
[jobs.environment]
|
||||
FOO = "bar"
|
||||
"#;
|
||||
|
||||
let config = DatabuildConfig::from_toml(toml).unwrap();
|
||||
assert_eq!(config.jobs.len(), 1);
|
||||
assert_eq!(config.jobs[0].label, "//test:job_alpha");
|
||||
}
|
||||
}
|
||||
369
databuild/http_server.rs
Normal file
369
databuild/http_server.rs
Normal file
|
|
@ -0,0 +1,369 @@
|
|||
use crate::build_event_log::BELStorage;
|
||||
use crate::build_state::BuildState;
|
||||
use crate::commands::Command;
|
||||
use crate::{
|
||||
CancelWantRequest, CreateWantRequest, CreateWantResponse, GetWantRequest, GetWantResponse,
|
||||
ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse,
|
||||
ListWantsRequest, ListWantsResponse,
|
||||
};
|
||||
use axum::{
|
||||
Json, Router,
|
||||
extract::{Path, Query, Request, State},
|
||||
http::{HeaderValue, Method, StatusCode},
|
||||
middleware::{self, Next},
|
||||
response::{IntoResponse, Response},
|
||||
routing::{delete, get, post},
|
||||
};
|
||||
use std::sync::{
|
||||
Arc, RwLock,
|
||||
atomic::{AtomicU64, Ordering},
|
||||
};
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use tokio::sync::{broadcast, mpsc, oneshot};
|
||||
use tower_http::cors::CorsLayer;
|
||||
|
||||
/// Shared application state for HTTP handlers
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
/// Mirrored build state (updated via event broadcast from orchestrator)
|
||||
pub build_state: Arc<RwLock<BuildState>>,
|
||||
/// Shared read-only access to BEL storage (for event log queries if needed)
|
||||
pub bel_storage: Arc<dyn BELStorage>,
|
||||
/// Command sender for write operations (sends to orchestrator)
|
||||
pub command_tx: mpsc::Sender<Command>,
|
||||
/// For idle timeout tracking (epoch millis)
|
||||
pub last_request_time: Arc<AtomicU64>,
|
||||
/// Broadcast channel for shutdown signal
|
||||
pub shutdown_tx: broadcast::Sender<()>,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
pub fn new(
|
||||
build_state: Arc<RwLock<BuildState>>,
|
||||
bel_storage: Arc<dyn BELStorage>,
|
||||
command_tx: mpsc::Sender<Command>,
|
||||
shutdown_tx: broadcast::Sender<()>,
|
||||
) -> Self {
|
||||
// Initialize last_request_time to current time
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
|
||||
Self {
|
||||
build_state,
|
||||
bel_storage,
|
||||
command_tx,
|
||||
last_request_time: Arc::new(AtomicU64::new(now)),
|
||||
shutdown_tx,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Middleware to update last request time
|
||||
async fn update_last_request_time(
|
||||
State(state): State<AppState>,
|
||||
req: Request,
|
||||
next: Next,
|
||||
) -> Response {
|
||||
state.last_request_time.store(
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis() as u64,
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
next.run(req).await
|
||||
}
|
||||
|
||||
/// Create the Axum router with all endpoints
|
||||
pub fn create_router(state: AppState) -> Router {
|
||||
// Configure CORS for web app development
|
||||
let cors = CorsLayer::new()
|
||||
.allow_origin("http://localhost:3000".parse::<HeaderValue>().unwrap())
|
||||
.allow_methods([Method::GET, Method::POST, Method::DELETE, Method::OPTIONS])
|
||||
.allow_headers([
|
||||
axum::http::header::CONTENT_TYPE,
|
||||
axum::http::header::AUTHORIZATION,
|
||||
]);
|
||||
|
||||
Router::new()
|
||||
// Health check
|
||||
.route("/health", get(health))
|
||||
// Want endpoints
|
||||
.route("/api/wants", get(list_wants))
|
||||
.route("/api/wants", post(create_want))
|
||||
.route("/api/wants/:id", get(get_want))
|
||||
.route("/api/wants/:id", delete(cancel_want))
|
||||
// Partition endpoints
|
||||
.route("/api/partitions", get(list_partitions))
|
||||
// Job run endpoints
|
||||
.route("/api/job_runs", get(list_job_runs))
|
||||
// Add CORS middleware
|
||||
.layer(cors)
|
||||
// Add middleware to track request time
|
||||
.layer(middleware::from_fn_with_state(
|
||||
state.clone(),
|
||||
update_last_request_time,
|
||||
))
|
||||
.with_state(state)
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Error Handling
|
||||
// ============================================================================
|
||||
|
||||
/// Standard error response structure
|
||||
#[derive(serde::Serialize)]
|
||||
struct ErrorResponse {
|
||||
error: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
details: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
impl ErrorResponse {
|
||||
fn new(error: impl Into<String>) -> Self {
|
||||
Self {
|
||||
error: error.into(),
|
||||
details: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn with_details(error: impl Into<String>, details: serde_json::Value) -> Self {
|
||||
Self {
|
||||
error: error.into(),
|
||||
details: Some(details),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Handlers
|
||||
// ============================================================================
|
||||
|
||||
/// Health check endpoint
|
||||
async fn health() -> impl IntoResponse {
|
||||
(StatusCode::OK, "OK")
|
||||
}
|
||||
|
||||
/// List all wants
|
||||
async fn list_wants(
|
||||
State(state): State<AppState>,
|
||||
Query(params): Query<ListWantsRequest>,
|
||||
) -> impl IntoResponse {
|
||||
// Read from shared build state
|
||||
let build_state = match state.build_state.read() {
|
||||
Ok(state) => state,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to acquire read lock on build state: {}", e);
|
||||
return (
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(ErrorResponse::new(
|
||||
"Internal server error: state lock poisoned",
|
||||
)),
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
};
|
||||
let response = build_state.list_wants(¶ms);
|
||||
|
||||
(StatusCode::OK, Json(response)).into_response()
|
||||
}
|
||||
|
||||
/// Get a specific want by ID
|
||||
async fn get_want(State(state): State<AppState>, Path(want_id): Path<String>) -> impl IntoResponse {
|
||||
// Read from shared build state
|
||||
let build_state = match state.build_state.read() {
|
||||
Ok(state) => state,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to acquire read lock on build state: {}", e);
|
||||
return (
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(ErrorResponse::new(
|
||||
"Internal server error: state lock poisoned",
|
||||
)),
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
};
|
||||
let response = build_state.get_want(&want_id);
|
||||
|
||||
match response {
|
||||
Some(want) => (StatusCode::OK, Json(GetWantResponse { data: Some(want) })).into_response(),
|
||||
None => {
|
||||
tracing::debug!("Want not found: {}", want_id);
|
||||
(
|
||||
StatusCode::NOT_FOUND,
|
||||
Json(ErrorResponse::with_details(
|
||||
"Want not found",
|
||||
serde_json::json!({"want_id": want_id}),
|
||||
)),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new want
|
||||
async fn create_want(
|
||||
State(state): State<AppState>,
|
||||
Json(req): Json<CreateWantRequest>,
|
||||
) -> impl IntoResponse {
|
||||
// Create oneshot channel for reply
|
||||
let (reply_tx, reply_rx) = oneshot::channel();
|
||||
|
||||
// Send command to orchestrator
|
||||
let command = Command::CreateWant {
|
||||
request: req,
|
||||
reply: reply_tx,
|
||||
};
|
||||
|
||||
if let Err(_) = state.command_tx.send(command).await {
|
||||
return (
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({
|
||||
"error": "Failed to send command to orchestrator"
|
||||
})),
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
|
||||
// Wait for orchestrator reply
|
||||
match reply_rx.await {
|
||||
Ok(Ok(response)) => {
|
||||
tracing::info!(
|
||||
"Created want: {}",
|
||||
response
|
||||
.data
|
||||
.as_ref()
|
||||
.map(|w| &w.want_id)
|
||||
.unwrap_or(&"unknown".to_string())
|
||||
);
|
||||
(StatusCode::OK, Json(response)).into_response()
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
tracing::error!("Failed to create want: {}", e);
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(ErrorResponse::new(format!("Failed to create want: {}", e))),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::error!("Orchestrator did not respond to create want command");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(ErrorResponse::new("Orchestrator did not respond")),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Cancel a want
|
||||
async fn cancel_want(
|
||||
State(state): State<AppState>,
|
||||
Path(want_id): Path<String>,
|
||||
) -> impl IntoResponse {
|
||||
// Create oneshot channel for reply
|
||||
let (reply_tx, reply_rx) = oneshot::channel();
|
||||
|
||||
// Send command to orchestrator
|
||||
let command = Command::CancelWant {
|
||||
request: CancelWantRequest {
|
||||
want_id,
|
||||
source: None, // HTTP requests don't have a source
|
||||
comment: None,
|
||||
},
|
||||
reply: reply_tx,
|
||||
};
|
||||
|
||||
if let Err(_) = state.command_tx.send(command).await {
|
||||
return (
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({
|
||||
"error": "Failed to send command to orchestrator"
|
||||
})),
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
|
||||
// Wait for orchestrator reply
|
||||
match reply_rx.await {
|
||||
Ok(Ok(response)) => {
|
||||
tracing::info!(
|
||||
"Cancelled want: {}",
|
||||
response
|
||||
.data
|
||||
.as_ref()
|
||||
.map(|w| &w.want_id)
|
||||
.unwrap_or(&"unknown".to_string())
|
||||
);
|
||||
(StatusCode::OK, Json(response)).into_response()
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
tracing::error!("Failed to cancel want: {}", e);
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(ErrorResponse::new(format!("Failed to cancel want: {}", e))),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::error!("Orchestrator did not respond to cancel want command");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(ErrorResponse::new("Orchestrator did not respond")),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// List all partitions
|
||||
async fn list_partitions(
|
||||
State(state): State<AppState>,
|
||||
Query(params): Query<ListPartitionsRequest>,
|
||||
) -> impl IntoResponse {
|
||||
// Read from shared build state
|
||||
let build_state = match state.build_state.read() {
|
||||
Ok(state) => state,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to acquire read lock on build state: {}", e);
|
||||
return (
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(ErrorResponse::new(
|
||||
"Internal server error: state lock poisoned",
|
||||
)),
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
};
|
||||
let response = build_state.list_partitions(¶ms);
|
||||
|
||||
(StatusCode::OK, Json(response)).into_response()
|
||||
}
|
||||
|
||||
/// List all job runs
|
||||
async fn list_job_runs(
|
||||
State(state): State<AppState>,
|
||||
Query(params): Query<ListJobRunsRequest>,
|
||||
) -> impl IntoResponse {
|
||||
// Read from shared build state
|
||||
let build_state = match state.build_state.read() {
|
||||
Ok(state) => state,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to acquire read lock on build state: {}", e);
|
||||
return (
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(ErrorResponse::new(
|
||||
"Internal server error: state lock poisoned",
|
||||
)),
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
};
|
||||
let response = build_state.list_job_runs(¶ms);
|
||||
|
||||
(StatusCode::OK, Json(response)).into_response()
|
||||
}
|
||||
|
|
@ -2,12 +2,14 @@ use crate::job_run::{JobRunHandle, SubProcessBackend};
|
|||
use crate::util::DatabuildError;
|
||||
use crate::{JobConfig, PartitionRef, WantDetail};
|
||||
use regex::Regex;
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct JobConfiguration {
|
||||
pub label: String,
|
||||
pub patterns: Vec<String>,
|
||||
pub entry_point: String,
|
||||
pub environment: HashMap<String, String>,
|
||||
}
|
||||
|
||||
impl JobConfiguration {
|
||||
|
|
@ -38,6 +40,7 @@ impl From<JobConfig> for JobConfiguration {
|
|||
label: config.label,
|
||||
patterns: config.partition_patterns,
|
||||
entry_point: config.entrypoint,
|
||||
environment: config.environment,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,12 +1,15 @@
|
|||
mod build_event_log;
|
||||
mod build_state;
|
||||
pub mod build_event_log;
|
||||
pub mod build_state;
|
||||
pub mod commands;
|
||||
pub mod config;
|
||||
mod data_deps;
|
||||
mod event_transforms;
|
||||
pub mod http_server;
|
||||
mod job;
|
||||
mod job_run;
|
||||
mod job_run_state;
|
||||
mod mock_job_run;
|
||||
mod orchestrator;
|
||||
pub mod orchestrator;
|
||||
mod partition_state;
|
||||
mod util;
|
||||
mod want_state;
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage};
|
||||
use crate::commands::Command;
|
||||
use crate::data_build_event::Event;
|
||||
use crate::job::JobConfiguration;
|
||||
use crate::job_run::SubProcessBackend;
|
||||
|
|
@ -6,6 +7,8 @@ use crate::util::DatabuildError;
|
|||
use crate::{JobRunBufferEventV1, PartitionRef, WantDetail};
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
use uuid::Uuid;
|
||||
|
||||
/**
|
||||
Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads +
|
||||
|
|
@ -17,10 +20,16 @@ JTBDs:
|
|||
- Orchestrator polls queued and active job runs, keeping track of their state, and scheduling queued
|
||||
jobs when possible
|
||||
*/
|
||||
struct Orchestrator<S: BELStorage + Debug> {
|
||||
pub struct Orchestrator<S: BELStorage + Debug> {
|
||||
pub bel: BuildEventLog<S>,
|
||||
pub config: OrchestratorConfig,
|
||||
pub job_runs: Vec<crate::job_run::JobRunHandle<SubProcessBackend>>,
|
||||
/// Optional command receiver for write operations from HTTP server
|
||||
pub command_rx: Option<mpsc::Receiver<Command>>,
|
||||
/// Optional event broadcaster for state updates to HTTP server
|
||||
pub event_tx: Option<broadcast::Sender<Event>>,
|
||||
/// Environment variables for each job run (keyed by job_run_id)
|
||||
pub job_environments: HashMap<Uuid, HashMap<String, String>>,
|
||||
}
|
||||
|
||||
impl Default for Orchestrator<MemoryBELStorage> {
|
||||
|
|
@ -29,6 +38,9 @@ impl Default for Orchestrator<MemoryBELStorage> {
|
|||
bel: Default::default(),
|
||||
config: Default::default(),
|
||||
job_runs: Default::default(),
|
||||
command_rx: None,
|
||||
event_tx: None,
|
||||
job_environments: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -39,6 +51,9 @@ impl Orchestrator<MemoryBELStorage> {
|
|||
bel: self.bel.clone(),
|
||||
config: self.config.clone(),
|
||||
job_runs: Default::default(),
|
||||
command_rx: None,
|
||||
event_tx: None,
|
||||
job_environments: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -61,8 +76,8 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
|||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct OrchestratorConfig {
|
||||
jobs: Vec<JobConfiguration>,
|
||||
pub struct OrchestratorConfig {
|
||||
pub jobs: Vec<JobConfiguration>,
|
||||
}
|
||||
|
||||
impl Default for OrchestratorConfig {
|
||||
|
|
@ -121,26 +136,68 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
|||
bel: BuildEventLog::new(storage, Default::default()),
|
||||
config,
|
||||
job_runs: Vec::new(),
|
||||
command_rx: None,
|
||||
event_tx: None,
|
||||
job_environments: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_commands(
|
||||
storage: S,
|
||||
config: OrchestratorConfig,
|
||||
command_rx: mpsc::Receiver<Command>,
|
||||
) -> Self {
|
||||
Self {
|
||||
bel: BuildEventLog::new(storage, Default::default()),
|
||||
config,
|
||||
job_runs: Vec::new(),
|
||||
command_rx: Some(command_rx),
|
||||
event_tx: None,
|
||||
job_environments: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_channels(
|
||||
storage: S,
|
||||
config: OrchestratorConfig,
|
||||
command_rx: mpsc::Receiver<Command>,
|
||||
event_tx: broadcast::Sender<Event>,
|
||||
) -> Self {
|
||||
Self {
|
||||
bel: BuildEventLog::new(storage, Default::default()).with_broadcaster(event_tx.clone()),
|
||||
config,
|
||||
job_runs: Vec::new(),
|
||||
command_rx: Some(command_rx),
|
||||
event_tx: Some(event_tx),
|
||||
job_environments: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Append event to BEL (which handles state mutation, storage, broadcasting, and cascading events)
|
||||
fn append_and_broadcast(&mut self, event: &Event) -> Result<u64, DatabuildError> {
|
||||
self.bel.append_event(event)
|
||||
}
|
||||
|
||||
fn schedule_queued_jobs(&mut self) -> Result<(), DatabuildError> {
|
||||
use crate::JobRunHeartbeatEventV1;
|
||||
use crate::data_build_event::Event;
|
||||
use crate::job_run::JobRunHandle;
|
||||
|
||||
let mut new_jobs = Vec::new();
|
||||
let mut events_to_append = Vec::new();
|
||||
for job in self.job_runs.drain(..) {
|
||||
let transitioned = match job {
|
||||
JobRunHandle::NotStarted(not_started) => {
|
||||
let job_run_id = not_started.job_run_id.clone();
|
||||
let running = not_started.run(None)?;
|
||||
// Look up environment for this job run
|
||||
let env = self.job_environments.get(&job_run_id).cloned();
|
||||
let running = not_started.run(env)?;
|
||||
|
||||
// Emit heartbeat event to notify BuildState that job is now running
|
||||
// Collect heartbeat event to emit after drain completes
|
||||
let heartbeat_event = Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 {
|
||||
job_run_id: job_run_id.to_string(),
|
||||
});
|
||||
self.bel.append_event(&heartbeat_event)?;
|
||||
events_to_append.push(heartbeat_event);
|
||||
|
||||
JobRunHandle::Running(running)
|
||||
}
|
||||
|
|
@ -149,12 +206,19 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
|||
new_jobs.push(transitioned);
|
||||
}
|
||||
self.job_runs = new_jobs;
|
||||
|
||||
// Now append all collected events
|
||||
for event in events_to_append {
|
||||
self.append_and_broadcast(&event)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Visits individual job runs, appending resulting events, and moving runs between run status
|
||||
/// containers. Either jobs are still running, or they are moved to terminal states.
|
||||
fn poll_job_runs(&mut self) -> Result<(), DatabuildError> {
|
||||
use crate::data_build_event::Event;
|
||||
use crate::job_run::{JobRunHandle, VisitResult};
|
||||
|
||||
self.schedule_queued_jobs()?;
|
||||
|
|
@ -165,30 +229,41 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
|||
// Need to decide on heartbeat frequency (e.g., every N polls or based on time elapsed)
|
||||
|
||||
// Visit all running jobs using type-safe transitions
|
||||
// Collect events first to avoid borrow checker issues
|
||||
let mut new_jobs = Vec::new();
|
||||
let mut events_to_append = Vec::new();
|
||||
for job in self.job_runs.drain(..) {
|
||||
let transitioned = match job {
|
||||
JobRunHandle::Running(running) => match running.visit()? {
|
||||
VisitResult::StillRunning(still_running) => {
|
||||
println!("Still running job: {:?}", still_running.job_run_id);
|
||||
tracing::debug!(
|
||||
job_run_id = still_running.job_run_id.to_string(),
|
||||
"Job still running"
|
||||
);
|
||||
JobRunHandle::Running(still_running)
|
||||
}
|
||||
VisitResult::Completed(completed) => {
|
||||
println!("Completed job: {:?}", completed.job_run_id);
|
||||
tracing::debug!(
|
||||
job_run_id = completed.job_run_id.to_string(),
|
||||
"Completed job"
|
||||
);
|
||||
let event = completed.state.to_event(&completed.job_run_id);
|
||||
self.bel.append_event(&event)?;
|
||||
events_to_append.push(event);
|
||||
JobRunHandle::Completed(completed)
|
||||
}
|
||||
VisitResult::Failed(failed) => {
|
||||
println!("Failed job: {:?}", failed.job_run_id);
|
||||
tracing::debug!(job_run_id = failed.job_run_id.to_string(), "Failed job");
|
||||
let event = failed.state.to_event(&failed.job_run_id);
|
||||
self.bel.append_event(&event)?;
|
||||
events_to_append.push(event);
|
||||
JobRunHandle::Failed(failed)
|
||||
}
|
||||
VisitResult::DepMiss(dep_miss) => {
|
||||
println!("Dep miss job: {:?}", dep_miss.job_run_id);
|
||||
tracing::debug!(
|
||||
job_run_id = dep_miss.job_run_id.to_string(),
|
||||
"Job dep miss"
|
||||
);
|
||||
let event = dep_miss.state.to_event(&dep_miss.job_run_id);
|
||||
self.bel.append_event(&event)?;
|
||||
events_to_append.push(event);
|
||||
JobRunHandle::DepMiss(dep_miss)
|
||||
}
|
||||
},
|
||||
|
|
@ -198,6 +273,11 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
|||
}
|
||||
self.job_runs = new_jobs;
|
||||
|
||||
// Now append all collected events
|
||||
for event in events_to_append {
|
||||
self.append_and_broadcast(&event)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -254,9 +334,16 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
|||
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
|
||||
let job_run = JobRunHandle::spawn(wg.job.entry_point.clone(), args);
|
||||
|
||||
// Store environment for this job run
|
||||
let job_run_id = job_run.job_run_id().clone();
|
||||
if !wg.job.environment.is_empty() {
|
||||
self.job_environments
|
||||
.insert(job_run_id, wg.job.environment.clone());
|
||||
}
|
||||
|
||||
// Create job run buffer event
|
||||
let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 {
|
||||
job_run_id: job_run.job_run_id().to_string(),
|
||||
job_run_id: job_run_id.to_string(),
|
||||
job_label: wg.job.label,
|
||||
building_partitions: wg
|
||||
.wants
|
||||
|
|
@ -266,13 +353,55 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
|
|||
.collect(),
|
||||
want_attributed_partitions: wg.wants.iter().map(|w| w.into()).collect(),
|
||||
});
|
||||
self.bel.append_event(&job_buffer_event)?;
|
||||
self.append_and_broadcast(&job_buffer_event)?;
|
||||
self.job_runs.push(job_run);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn step(&mut self) -> Result<(), DatabuildError> {
|
||||
/// Process all pending commands from the HTTP server
|
||||
fn process_commands(&mut self) -> Result<(), DatabuildError> {
|
||||
// TODO Handle command failure gracefully - what's story there?
|
||||
// Collect all pending commands first (to avoid borrow checker issues)
|
||||
let mut commands = Vec::new();
|
||||
if let Some(ref mut rx) = self.command_rx {
|
||||
while let Ok(cmd) = rx.try_recv() {
|
||||
commands.push(cmd);
|
||||
}
|
||||
}
|
||||
|
||||
// Now process all collected commands
|
||||
for cmd in commands {
|
||||
match cmd {
|
||||
Command::CreateWant { request, reply } => {
|
||||
// Convert request to event and broadcast it (so HTTP server receives it)
|
||||
let event: crate::WantCreateEventV1 = request.into();
|
||||
let result = self
|
||||
.append_and_broadcast(&crate::data_build_event::Event::WantCreateV1(
|
||||
event.clone(),
|
||||
))
|
||||
.map(|_| self.bel.state.get_want(&event.want_id).into());
|
||||
let _ = reply.send(result); // Ignore send errors
|
||||
}
|
||||
Command::CancelWant { request, reply } => {
|
||||
// Convert request to event and broadcast it (so HTTP server receives it)
|
||||
let event: crate::WantCancelEventV1 = request.into();
|
||||
let result = self
|
||||
.append_and_broadcast(&crate::data_build_event::Event::WantCancelV1(
|
||||
event.clone(),
|
||||
))
|
||||
.map(|_| self.bel.state.get_want(&event.want_id).into());
|
||||
let _ = reply.send(result); // Ignore send errors
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn step(&mut self) -> Result<(), DatabuildError> {
|
||||
// Process commands first (write operations)
|
||||
self.process_commands()?;
|
||||
// Then poll job runs and wants
|
||||
self.poll_job_runs()?;
|
||||
self.poll_wants()?;
|
||||
Ok(())
|
||||
|
|
@ -370,11 +499,13 @@ mod tests {
|
|||
label: "alpha".to_string(),
|
||||
patterns: vec!["data/alpha".to_string()],
|
||||
entry_point: MockJobRun::bin_path(),
|
||||
environment: Default::default(),
|
||||
},
|
||||
JobConfiguration {
|
||||
label: "beta".to_string(),
|
||||
patterns: vec!["data/beta".to_string()],
|
||||
entry_point: MockJobRun::bin_path(),
|
||||
environment: Default::default(),
|
||||
},
|
||||
],
|
||||
};
|
||||
|
|
@ -737,11 +868,13 @@ echo 'Beta succeeded'
|
|||
label: "alpha".to_string(),
|
||||
patterns: vec!["data/alpha".to_string()],
|
||||
entry_point: alpha_script.to_string(),
|
||||
environment: Default::default(),
|
||||
},
|
||||
JobConfiguration {
|
||||
label: "beta".to_string(),
|
||||
patterns: vec!["data/beta".to_string()],
|
||||
entry_point: beta_script.to_string(),
|
||||
environment: Default::default(),
|
||||
},
|
||||
],
|
||||
};
|
||||
|
|
@ -919,6 +1052,7 @@ echo 'Beta succeeded'
|
|||
label: label.to_string(),
|
||||
patterns: vec![pattern.to_string()],
|
||||
entry_point: "test_entrypoint".to_string(),
|
||||
environment: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
577
docs/plans/api.md
Normal file
577
docs/plans/api.md
Normal file
|
|
@ -0,0 +1,577 @@
|
|||
# Web Server Implementation Plan
|
||||
|
||||
## Architecture Summary
|
||||
|
||||
**Concurrency Model: Event Log Separation**
|
||||
- Orchestrator runs synchronously in dedicated thread, owns BEL exclusively
|
||||
- Web server reads from shared BEL storage, sends write commands via channel
|
||||
- No locks on hot path, orchestrator stays single-threaded
|
||||
- Eventual consistency for reads (acceptable since builds take time anyway)
|
||||
|
||||
**Daemon Model:**
|
||||
- Server binary started manually from workspace root (for now)
|
||||
- Server tracks last request time, shuts down after idle timeout (default: 3 hours)
|
||||
- HTTP REST on localhost random port
|
||||
- Future: CLI can auto-discover/start server
|
||||
|
||||
---
|
||||
|
||||
## Thread Model
|
||||
|
||||
```
|
||||
Main Process
|
||||
├─ HTTP Server (tokio multi-threaded runtime)
|
||||
│ ├─ Request handlers (async, read from BEL storage)
|
||||
│ └─ Command sender (send writes to orchestrator)
|
||||
└─ Orchestrator Thread (std::thread, synchronous)
|
||||
├─ Receives commands via mpsc channel
|
||||
├─ Owns BEL (exclusive mutable access)
|
||||
└─ Runs existing step() loop
|
||||
```
|
||||
|
||||
**Read Path (Low Latency):**
|
||||
1. HTTP request → Axum handler
|
||||
2. Read events from shared BEL storage (no lock contention)
|
||||
3. Reconstruct BuildState from events (can cache this)
|
||||
4. Return response
|
||||
|
||||
**Write Path (Strong Consistency):**
|
||||
1. HTTP request → Axum handler
|
||||
2. Send command via channel to orchestrator
|
||||
3. Orchestrator processes command in its thread
|
||||
4. Reply sent back via oneshot channel
|
||||
5. Return response
|
||||
|
||||
**Why This Works:**
|
||||
- Orchestrator remains completely synchronous (no refactoring needed)
|
||||
- Reads scale horizontally (multiple handlers, no locks)
|
||||
- Writes are serialized through orchestrator (consistent with current model)
|
||||
- Event sourcing means reads can be eventually consistent
|
||||
|
||||
---
|
||||
|
||||
## Phase 1: Foundation - Make BEL Storage Thread-Safe
|
||||
|
||||
**Goal:** Allow BEL storage to be safely shared between orchestrator and web server
|
||||
|
||||
**Tasks:**
|
||||
1. Add `Send + Sync` bounds to `BELStorage` trait
|
||||
2. Wrap `SqliteBELStorage::connection` in `Arc<Mutex<Connection>>` or use r2d2 pool
|
||||
3. Add read-only methods to BELStorage:
|
||||
- `list_events(offset: usize, limit: usize) -> Vec<DataBuildEvent>`
|
||||
- `get_event(event_id: u64) -> Option<DataBuildEvent>`
|
||||
- `latest_event_id() -> u64`
|
||||
4. Add builder method to reconstruct BuildState from events:
|
||||
- `BuildState::from_events(events: &[DataBuildEvent]) -> Self`
|
||||
|
||||
**Files Modified:**
|
||||
- `databuild/build_event_log.rs` - update trait and storage impls
|
||||
- `databuild/build_state.rs` - add `from_events()` builder
|
||||
|
||||
**Acceptance Criteria:**
|
||||
- `BELStorage` trait has `Send + Sync` bounds
|
||||
- Can clone `Arc<SqliteBELStorage>` and use from multiple threads
|
||||
- Can reconstruct BuildState from events without mutating storage
|
||||
|
||||
---
|
||||
|
||||
## Phase 2: Web Server - HTTP API with Axum
|
||||
|
||||
**Goal:** HTTP server serving read/write APIs
|
||||
|
||||
**Tasks:**
|
||||
1. Add dependencies to MODULE.bazel:
|
||||
```python
|
||||
crate.spec(package = "tokio", features = ["full"], version = "1.0")
|
||||
crate.spec(package = "axum", version = "0.7")
|
||||
crate.spec(package = "tower", version = "0.4")
|
||||
crate.spec(package = "tower-http", features = ["trace", "cors"], version = "0.5")
|
||||
```
|
||||
|
||||
2. Create `databuild/http_server.rs` module with:
|
||||
- `AppState` struct holding:
|
||||
- `bel_storage: Arc<dyn BELStorage>` - shared read access
|
||||
- `command_tx: mpsc::Sender<Command>` - channel to orchestrator
|
||||
- `last_request_time: Arc<AtomicU64>` - for idle tracking
|
||||
- Axum router with all endpoints
|
||||
- Handler functions delegating to existing `api_handle_*` methods
|
||||
|
||||
3. API Endpoints:
|
||||
```
|
||||
GET /health → health check
|
||||
GET /api/wants → list_wants
|
||||
POST /api/wants → create_want
|
||||
GET /api/wants/:id → get_want
|
||||
DELETE /api/wants/:id → cancel_want
|
||||
GET /api/partitions → list_partitions
|
||||
GET /api/job_runs → list_job_runs
|
||||
GET /api/job_runs/:id/logs/stdout → stream_logs (stub)
|
||||
```
|
||||
|
||||
4. Handler pattern (reads):
|
||||
```rust
|
||||
async fn list_wants(
|
||||
State(state): State<AppState>,
|
||||
Query(params): Query<ListWantsParams>,
|
||||
) -> Json<ListWantsResponse> {
|
||||
// Read events from storage
|
||||
let events = state.bel_storage.list_events(0, 10000)?;
|
||||
|
||||
// Reconstruct state
|
||||
let build_state = BuildState::from_events(&events);
|
||||
|
||||
// Use existing API method
|
||||
Json(build_state.list_wants(¶ms.into()))
|
||||
}
|
||||
```
|
||||
|
||||
5. Handler pattern (writes):
|
||||
```rust
|
||||
async fn create_want(
|
||||
State(state): State<AppState>,
|
||||
Json(req): Json<CreateWantRequest>,
|
||||
) -> Json<CreateWantResponse> {
|
||||
// Send command to orchestrator
|
||||
let (reply_tx, reply_rx) = oneshot::channel();
|
||||
state.command_tx.send(Command::CreateWant(req, reply_tx)).await?;
|
||||
|
||||
// Wait for orchestrator reply
|
||||
let response = reply_rx.await?;
|
||||
Json(response)
|
||||
}
|
||||
```
|
||||
|
||||
**Files Created:**
|
||||
- `databuild/http_server.rs` - new module
|
||||
|
||||
**Files Modified:**
|
||||
- `databuild/lib.rs` - add `pub mod http_server;`
|
||||
- `MODULE.bazel` - add dependencies
|
||||
|
||||
**Acceptance Criteria:**
|
||||
- Server starts on localhost random port, prints "Listening on http://127.0.0.1:XXXXX"
|
||||
- All read endpoints return correct JSON responses
|
||||
- Write endpoints return stub responses (Phase 4 will connect to orchestrator)
|
||||
|
||||
---
|
||||
|
||||
## Phase 3: CLI - HTTP Client
|
||||
|
||||
**Goal:** CLI that sends HTTP requests to running server
|
||||
|
||||
**Tasks:**
|
||||
1. Add dependencies to MODULE.bazel:
|
||||
```python
|
||||
crate.spec(package = "clap", features = ["derive"], version = "4.0")
|
||||
crate.spec(package = "reqwest", features = ["blocking", "json"], version = "0.11")
|
||||
```
|
||||
|
||||
2. Create `databuild/bin/databuild.rs` main binary:
|
||||
```rust
|
||||
#[derive(Parser)]
|
||||
#[command(name = "databuild")]
|
||||
enum Cli {
|
||||
/// Start the databuild server
|
||||
Serve(ServeArgs),
|
||||
|
||||
/// Create a want for partitions
|
||||
Build(BuildArgs),
|
||||
|
||||
/// Want operations
|
||||
Want(WantCommand),
|
||||
|
||||
/// Stream job run logs
|
||||
Logs(LogsArgs),
|
||||
}
|
||||
|
||||
#[derive(Args)]
|
||||
struct ServeArgs {
|
||||
#[arg(long, default_value = "8080")]
|
||||
port: u16,
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum WantCommand {
|
||||
Create(CreateWantArgs),
|
||||
List,
|
||||
Get { want_id: String },
|
||||
Cancel { want_id: String },
|
||||
}
|
||||
```
|
||||
|
||||
3. Server address discovery:
|
||||
- For now: hardcode `http://localhost:8080` or accept `--server-url` flag
|
||||
- Future: read from `.databuild/server.json` file
|
||||
|
||||
4. HTTP client implementation:
|
||||
```rust
|
||||
fn list_wants(server_url: &str) -> Result<Vec<WantDetail>> {
|
||||
let client = reqwest::blocking::Client::new();
|
||||
let resp = client.get(&format!("{}/api/wants", server_url))
|
||||
.send()?
|
||||
.json::<ListWantsResponse>()?;
|
||||
Ok(resp.data)
|
||||
}
|
||||
```
|
||||
|
||||
5. Commands:
|
||||
- `databuild serve --port 8080` - Start server (blocks)
|
||||
- `databuild build part1 part2` - Create want for partitions
|
||||
- `databuild want list` - List all wants
|
||||
- `databuild want get <id>` - Get specific want
|
||||
- `databuild want cancel <id>` - Cancel want
|
||||
- `databuild logs <job_run_id>` - Stream logs (stub)
|
||||
|
||||
**Files Created:**
|
||||
- `databuild/bin/databuild.rs` - new CLI binary
|
||||
|
||||
**Files Modified:**
|
||||
- `databuild/BUILD.bazel` - add `rust_binary` target for databuild CLI
|
||||
|
||||
**Acceptance Criteria:**
|
||||
- Can run `databuild serve` to start server
|
||||
- Can run `databuild want list` in another terminal and see wants
|
||||
- Commands print pretty JSON or formatted tables
|
||||
|
||||
---
|
||||
|
||||
## Phase 4: Orchestrator Integration - Command Channel
|
||||
|
||||
**Goal:** Connect orchestrator to web server via message passing
|
||||
|
||||
**Tasks:**
|
||||
1. Create `databuild/commands.rs` with command enum:
|
||||
```rust
|
||||
pub enum Command {
|
||||
CreateWant(CreateWantRequest, oneshot::Sender<CreateWantResponse>),
|
||||
CancelWant(CancelWantRequest, oneshot::Sender<CancelWantResponse>),
|
||||
// Only write operations need commands
|
||||
}
|
||||
```
|
||||
|
||||
2. Update `Orchestrator`:
|
||||
- Add `command_rx: mpsc::Receiver<Command>` field
|
||||
- In `step()` method, before polling:
|
||||
```rust
|
||||
// Process all pending commands
|
||||
while let Ok(cmd) = self.command_rx.try_recv() {
|
||||
match cmd {
|
||||
Command::CreateWant(req, reply) => {
|
||||
let resp = self.bel.api_handle_want_create(req);
|
||||
let _ = reply.send(resp); // Ignore send errors
|
||||
}
|
||||
// ... other commands
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
3. Create server startup function in `http_server.rs`:
|
||||
```rust
|
||||
pub fn start_server(
|
||||
bel_storage: Arc<dyn BELStorage>,
|
||||
port: u16,
|
||||
) -> (JoinHandle<()>, mpsc::Sender<Command>) {
|
||||
let (cmd_tx, cmd_rx) = mpsc::channel(100);
|
||||
|
||||
// Spawn orchestrator in background thread
|
||||
let orch_bel = bel_storage.clone();
|
||||
let orch_handle = std::thread::spawn(move || {
|
||||
let mut orch = Orchestrator::new_with_commands(orch_bel, cmd_rx);
|
||||
orch.join().unwrap();
|
||||
});
|
||||
|
||||
// Start HTTP server in tokio runtime
|
||||
let runtime = tokio::runtime::Runtime::new().unwrap();
|
||||
let http_handle = runtime.spawn(async move {
|
||||
let app_state = AppState {
|
||||
bel_storage,
|
||||
command_tx: cmd_tx.clone(),
|
||||
last_request_time: Arc::new(AtomicU64::new(0)),
|
||||
};
|
||||
|
||||
let app = create_router(app_state);
|
||||
let addr = SocketAddr::from(([127, 0, 0, 1], port));
|
||||
axum::Server::bind(&addr)
|
||||
.serve(app.into_make_service())
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
(http_handle, cmd_tx)
|
||||
}
|
||||
```
|
||||
|
||||
4. Update `databuild serve` command to use `start_server()`
|
||||
|
||||
**Files Created:**
|
||||
- `databuild/commands.rs` - new module
|
||||
|
||||
**Files Modified:**
|
||||
- `databuild/orchestrator.rs` - accept command channel, process in `step()`
|
||||
- `databuild/http_server.rs` - send commands for writes
|
||||
- `databuild/bin/databuild.rs` - use `start_server()` in `serve` command
|
||||
|
||||
**Acceptance Criteria:**
|
||||
- Creating a want via HTTP actually creates it in BuildState
|
||||
- Orchestrator processes commands without blocking its main loop
|
||||
- Can observe wants being scheduled into job runs
|
||||
|
||||
---
|
||||
|
||||
## Phase 5: Daemon Lifecycle - Auto-Shutdown
|
||||
|
||||
**Goal:** Server shuts down gracefully after idle timeout
|
||||
|
||||
**Tasks:**
|
||||
1. Update AppState to track last request time:
|
||||
```rust
|
||||
pub struct AppState {
|
||||
bel_storage: Arc<dyn BELStorage>,
|
||||
command_tx: mpsc::Sender<Command>,
|
||||
last_request_time: Arc<AtomicU64>, // epoch millis
|
||||
shutdown_tx: broadcast::Sender<()>,
|
||||
}
|
||||
```
|
||||
|
||||
2. Add Tower middleware to update timestamp:
|
||||
```rust
|
||||
async fn update_last_request_time<B>(
|
||||
State(state): State<AppState>,
|
||||
req: Request<B>,
|
||||
next: Next<B>,
|
||||
) -> Response {
|
||||
state.last_request_time.store(
|
||||
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
next.run(req).await
|
||||
}
|
||||
```
|
||||
|
||||
3. Background idle checker task:
|
||||
```rust
|
||||
tokio::spawn(async move {
|
||||
let idle_timeout = Duration::from_secs(3 * 60 * 60); // 3 hours
|
||||
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(60)).await;
|
||||
|
||||
let last_request = state.last_request_time.load(Ordering::Relaxed);
|
||||
let now = SystemTime::now()...;
|
||||
|
||||
if now - last_request > idle_timeout.as_millis() as u64 {
|
||||
eprintln!("Server idle for {} hours, shutting down",
|
||||
idle_timeout.as_secs() / 3600);
|
||||
shutdown_tx.send(()).unwrap();
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
```
|
||||
|
||||
4. Graceful shutdown handling:
|
||||
```rust
|
||||
let app = create_router(state);
|
||||
axum::Server::bind(&addr)
|
||||
.serve(app.into_make_service())
|
||||
.with_graceful_shutdown(async {
|
||||
shutdown_rx.recv().await.ok();
|
||||
})
|
||||
.await?;
|
||||
```
|
||||
|
||||
5. Cleanup on shutdown:
|
||||
- Orchestrator: finish current step, don't start new one
|
||||
- HTTP server: stop accepting new connections, finish in-flight requests
|
||||
- Log: "Shutdown complete"
|
||||
|
||||
**Files Modified:**
|
||||
- `databuild/http_server.rs` - add idle tracking, shutdown logic
|
||||
- `databuild/orchestrator.rs` - accept shutdown signal, check before each step
|
||||
|
||||
**Acceptance Criteria:**
|
||||
- Server shuts down after configured idle timeout
|
||||
- In-flight requests complete successfully during shutdown
|
||||
- Shutdown is logged clearly
|
||||
|
||||
---
|
||||
|
||||
## Phase 6: Testing & Polish
|
||||
|
||||
**Goal:** End-to-end testing and production readiness
|
||||
|
||||
**Tasks:**
|
||||
1. Integration tests:
|
||||
```rust
|
||||
#[test]
|
||||
fn test_server_lifecycle() {
|
||||
// Start server
|
||||
let (handle, port) = start_test_server();
|
||||
|
||||
// Make requests
|
||||
let wants = reqwest::blocking::get(
|
||||
&format!("http://localhost:{}/api/wants", port)
|
||||
).unwrap().json::<ListWantsResponse>().unwrap();
|
||||
|
||||
// Stop server
|
||||
handle.shutdown();
|
||||
}
|
||||
```
|
||||
|
||||
2. Error handling improvements:
|
||||
- Proper HTTP status codes (400, 404, 500)
|
||||
- Structured error responses:
|
||||
```json
|
||||
{"error": "Want not found", "want_id": "abc123"}
|
||||
```
|
||||
- Add `tracing` crate for structured logging
|
||||
|
||||
3. Add CORS middleware for web app:
|
||||
```rust
|
||||
let cors = CorsLayer::new()
|
||||
.allow_origin("http://localhost:3000".parse::<HeaderValue>().unwrap())
|
||||
.allow_methods([Method::GET, Method::POST, Method::DELETE]);
|
||||
|
||||
app.layer(cors)
|
||||
```
|
||||
|
||||
4. Health check endpoint:
|
||||
```rust
|
||||
async fn health() -> &'static str {
|
||||
"OK"
|
||||
}
|
||||
```
|
||||
|
||||
5. Optional: Metrics endpoint (prometheus format):
|
||||
```rust
|
||||
async fn metrics() -> String {
|
||||
format!(
|
||||
"# HELP databuild_wants_total Total number of wants\n\
|
||||
databuild_wants_total {}\n\
|
||||
# HELP databuild_job_runs_total Total number of job runs\n\
|
||||
databuild_job_runs_total {}\n",
|
||||
want_count, job_run_count
|
||||
)
|
||||
}
|
||||
```
|
||||
|
||||
**Files Created:**
|
||||
- `databuild/tests/http_integration_test.rs` - integration tests
|
||||
|
||||
**Files Modified:**
|
||||
- `databuild/http_server.rs` - add CORS, health, metrics, better errors
|
||||
- `MODULE.bazel` - add `tracing` dependency
|
||||
|
||||
**Acceptance Criteria:**
|
||||
- All endpoints have proper error handling
|
||||
- CORS works for web app development
|
||||
- Health check returns 200 OK
|
||||
- Integration tests pass
|
||||
|
||||
---
|
||||
|
||||
## Future Enhancements (Not in Initial Plan)
|
||||
|
||||
### Workspace Auto-Discovery
|
||||
- Walk up directory tree looking for `.databuild/` marker
|
||||
- Store server metadata in `.databuild/server.json`:
|
||||
```json
|
||||
{
|
||||
"pid": 12345,
|
||||
"port": 54321,
|
||||
"started_at": "2025-01-22T10:30:00Z",
|
||||
"workspace_root": "/Users/stuart/Projects/databuild"
|
||||
}
|
||||
```
|
||||
- CLI auto-starts server if not running
|
||||
|
||||
### Log Streaming (SSE)
|
||||
- Implement `GET /api/job_runs/:id/logs/stdout?follow=true`
|
||||
- Use Server-Sent Events for streaming
|
||||
- Integrate with FileLogStore from logging.md plan
|
||||
|
||||
### State Caching
|
||||
- Cache reconstructed BuildState for faster reads
|
||||
- Invalidate cache when new events arrive
|
||||
- Use `tokio::sync::RwLock<Option<(u64, BuildState)>>` where u64 is latest_event_id
|
||||
|
||||
### gRPC Support (If Needed)
|
||||
- Add Tonic alongside Axum
|
||||
- Share same orchestrator/command channel
|
||||
- Useful for language-agnostic clients
|
||||
|
||||
---
|
||||
|
||||
## Dependencies Summary
|
||||
|
||||
New dependencies to add to `MODULE.bazel`:
|
||||
|
||||
```python
|
||||
# Async runtime
|
||||
crate.spec(package = "tokio", features = ["full"], version = "1.0")
|
||||
|
||||
# Web framework
|
||||
crate.spec(package = "axum", version = "0.7")
|
||||
crate.spec(package = "tower", version = "0.4")
|
||||
crate.spec(package = "tower-http", features = ["trace", "cors"], version = "0.5")
|
||||
|
||||
# CLI
|
||||
crate.spec(package = "clap", features = ["derive"], version = "4.0")
|
||||
|
||||
# HTTP client for CLI
|
||||
crate.spec(package = "reqwest", features = ["blocking", "json"], version = "0.11")
|
||||
|
||||
# Logging
|
||||
crate.spec(package = "tracing", version = "0.1")
|
||||
crate.spec(package = "tracing-subscriber", version = "0.3")
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Estimated Timeline
|
||||
|
||||
- **Phase 1:** 2-3 hours (thread-safe BEL storage)
|
||||
- **Phase 2:** 4-6 hours (HTTP server with Axum)
|
||||
- **Phase 3:** 3-4 hours (basic CLI)
|
||||
- **Phase 4:** 3-4 hours (orchestrator integration)
|
||||
- **Phase 5:** 2-3 hours (idle shutdown)
|
||||
- **Phase 6:** 4-6 hours (testing and polish)
|
||||
|
||||
**Total:** ~18-26 hours for complete implementation
|
||||
|
||||
---
|
||||
|
||||
## Design Rationale
|
||||
|
||||
### Why Event Log Separation?
|
||||
|
||||
**Alternatives Considered:**
|
||||
1. **Shared State with RwLock**: Orchestrator holds write lock during `step()`, blocking all reads
|
||||
2. **Actor Model**: Extra overhead from message passing for all operations
|
||||
|
||||
**Why Event Log Separation Wins:**
|
||||
- Orchestrator stays completely synchronous (no refactoring)
|
||||
- Reads don't block writes (eventual consistency acceptable for build system)
|
||||
- Natural fit with event sourcing architecture
|
||||
- Can cache reconstructed state for even better read performance
|
||||
|
||||
### Why Not gRPC?
|
||||
|
||||
- User requirement: "JSON is a must"
|
||||
- REST is more debuggable (curl, browser dev tools)
|
||||
- gRPC adds complexity without clear benefit
|
||||
- Can add gRPC later if needed (both can coexist)
|
||||
|
||||
### Why Axum Over Actix?
|
||||
|
||||
- Better compile-time type safety (extractors)
|
||||
- Cleaner middleware composition (Tower)
|
||||
- Native async/await (Actix uses actor model internally)
|
||||
- More ergonomic for this use case
|
||||
|
||||
### Why Per-Workspace Server?
|
||||
|
||||
- Isolation: builds in different projects don't interfere
|
||||
- Simpler: no need to route requests by workspace
|
||||
- Matches Bazel's model (users already understand it)
|
||||
- Easier to reason about resource usage
|
||||
85
examples/multihop/README.md
Normal file
85
examples/multihop/README.md
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
# Multi-Hop Dependency Example
|
||||
|
||||
This example demonstrates DataBuild's ability to handle multi-hop dependencies between jobs.
|
||||
|
||||
## Overview
|
||||
|
||||
The example consists of two jobs:
|
||||
|
||||
- **job_alpha**: Produces the `data/alpha` partition
|
||||
- **job_beta**: Depends on `data/alpha` and produces `data/beta`
|
||||
|
||||
When you request `data/beta`:
|
||||
1. Beta job runs and detects missing `data/alpha` dependency
|
||||
2. Orchestrator creates a want for `data/alpha`
|
||||
3. Alpha job runs and produces `data/alpha`
|
||||
4. Beta job runs again and succeeds, producing `data/beta`
|
||||
|
||||
## Running the Example
|
||||
|
||||
From the repository root:
|
||||
|
||||
```bash
|
||||
# Build the CLI
|
||||
bazel build //databuild:databuild_cli
|
||||
|
||||
# Clean up any previous state
|
||||
rm -f /tmp/databuild_multihop*.db /tmp/databuild_multihop_alpha_complete
|
||||
|
||||
# Start the server with the multihop configuration
|
||||
./bazel-bin/databuild/databuild_cli serve \
|
||||
--port 3050 \
|
||||
--database /tmp/databuild_multihop.db \
|
||||
--config examples/multihop/config.json
|
||||
```
|
||||
|
||||
In another terminal, create a want for `data/beta`:
|
||||
|
||||
```bash
|
||||
# Create a want for data/beta (which will trigger the dependency chain)
|
||||
./bazel-bin/databuild/databuild_cli --server http://localhost:3050 \
|
||||
want data/beta
|
||||
|
||||
# Watch the wants
|
||||
./bazel-bin/databuild/databuild_cli --server http://localhost:3050 \
|
||||
wants list
|
||||
|
||||
# Watch the job runs
|
||||
./bazel-bin/databuild/databuild_cli --server http://localhost:3050 \
|
||||
job-runs list
|
||||
|
||||
# Watch the partitions
|
||||
./bazel-bin/databuild/databuild_cli --server http://localhost:3050 \
|
||||
partitions list
|
||||
```
|
||||
|
||||
## Expected Behavior
|
||||
|
||||
1. Initial want for `data/beta` is created
|
||||
2. Beta job runs, detects missing `data/alpha`, reports dependency miss
|
||||
3. Orchestrator creates derivative want for `data/alpha`
|
||||
4. Alpha job runs and succeeds
|
||||
5. Beta job runs again and succeeds
|
||||
6. Both partitions are now in `Live` state
|
||||
|
||||
## Configuration Format
|
||||
|
||||
The example uses JSON format (`config.json`), but TOML is also supported. Here's the equivalent TOML:
|
||||
|
||||
```toml
|
||||
[[jobs]]
|
||||
label = "//examples/multihop:job_alpha"
|
||||
entrypoint = "./examples/multihop/job_alpha.sh"
|
||||
partition_patterns = ["data/alpha"]
|
||||
|
||||
[jobs.environment]
|
||||
JOB_NAME = "alpha"
|
||||
|
||||
[[jobs]]
|
||||
label = "//examples/multihop:job_beta"
|
||||
entrypoint = "./examples/multihop/job_beta.sh"
|
||||
partition_patterns = ["data/beta"]
|
||||
|
||||
[jobs.environment]
|
||||
JOB_NAME = "beta"
|
||||
```
|
||||
20
examples/multihop/config.json
Normal file
20
examples/multihop/config.json
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
{
|
||||
"jobs": [
|
||||
{
|
||||
"label": "//examples/multihop:job_alpha",
|
||||
"entrypoint": "./examples/multihop/job_alpha.sh",
|
||||
"environment": {
|
||||
"JOB_NAME": "alpha"
|
||||
},
|
||||
"partition_patterns": ["data/alpha"]
|
||||
},
|
||||
{
|
||||
"label": "//examples/multihop:job_beta",
|
||||
"entrypoint": "./examples/multihop/job_beta.sh",
|
||||
"environment": {
|
||||
"JOB_NAME": "beta"
|
||||
},
|
||||
"partition_patterns": ["data/beta"]
|
||||
}
|
||||
]
|
||||
}
|
||||
18
examples/multihop/job_alpha.sh
Executable file
18
examples/multihop/job_alpha.sh
Executable file
|
|
@ -0,0 +1,18 @@
|
|||
#!/bin/bash
|
||||
|
||||
# Job Alpha: Produces data/alpha partition
|
||||
# This is a simple upstream job that beta depends on
|
||||
|
||||
echo "Job Alpha starting..." >&2
|
||||
echo " Building partitions: $@" >&2
|
||||
echo " Environment: JOB_NAME=$JOB_NAME" >&2
|
||||
|
||||
# Simulate some work
|
||||
sleep 0.5
|
||||
|
||||
# Create marker file to indicate data/alpha is available
|
||||
touch /tmp/databuild_multihop_alpha_complete
|
||||
|
||||
# Output success - no special output needed, just exit 0
|
||||
|
||||
echo "Job Alpha complete!" >&2
|
||||
27
examples/multihop/job_beta.sh
Executable file
27
examples/multihop/job_beta.sh
Executable file
|
|
@ -0,0 +1,27 @@
|
|||
#!/bin/bash
|
||||
|
||||
# Job Beta: Produces data/beta partition
|
||||
# Depends on data/alpha from job_alpha
|
||||
|
||||
echo "Job Beta starting..." >&2
|
||||
echo " Building partitions: $@" >&2
|
||||
echo " Environment: JOB_NAME=$JOB_NAME" >&2
|
||||
|
||||
# Check if data/alpha marker exists (this would be real data in a real system)
|
||||
ALPHA_MARKER="/tmp/databuild_multihop_alpha_complete"
|
||||
|
||||
if [ ! -f "$ALPHA_MARKER" ]; then
|
||||
echo " Missing dependency: data/alpha" >&2
|
||||
# Report missing dependency
|
||||
echo 'DATABUILD_MISSING_DEPS_JSON:{"version":"1","missing_deps":[{"impacted":[{"ref":"data/beta"}],"missing":[{"ref":"data/alpha"}]}]}'
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo " Found dependency: data/alpha" >&2
|
||||
|
||||
# Simulate some work
|
||||
sleep 0.5
|
||||
|
||||
# Output success - no special output needed, just exit 0
|
||||
|
||||
echo "Job Beta complete!" >&2
|
||||
|
|
@ -1,51 +1,51 @@
|
|||
# Test utilities filegroup
|
||||
filegroup(
|
||||
name = "test_utils",
|
||||
srcs = [
|
||||
"lib/test_utils.sh",
|
||||
"lib/db_utils.sh",
|
||||
"lib/service_utils.sh",
|
||||
],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
|
||||
# Delegation test that verifies second builds properly delegate to existing partitions
|
||||
# Note: This test is designed to be run via the integration test runner (run_e2e_tests.sh)
|
||||
# rather than through bazel test, due to examples being in .bazelignore
|
||||
sh_test(
|
||||
name = "delegation_test",
|
||||
srcs = ["delegation_test.sh"],
|
||||
data = [
|
||||
":test_utils",
|
||||
],
|
||||
size = "medium",
|
||||
timeout = "moderate",
|
||||
env = {
|
||||
"PATH": "/usr/bin:/bin:/usr/local/bin",
|
||||
},
|
||||
tags = ["e2e", "delegation", "manual"],
|
||||
args = ["placeholder"],
|
||||
)
|
||||
|
||||
# Simple shell script test that validates the test runner
|
||||
sh_test(
|
||||
name = "e2e_runner_test",
|
||||
srcs = ["validate_runner.sh"],
|
||||
data = [
|
||||
"//:run_e2e_tests.sh",
|
||||
":test_utils",
|
||||
"lib/test_utils.sh",
|
||||
"lib/db_utils.sh",
|
||||
"lib/service_utils.sh",
|
||||
"simple_test.sh",
|
||||
"basic_graph_test.sh",
|
||||
"podcast_reviews_test.sh",
|
||||
"delegation_test.sh",
|
||||
],
|
||||
size = "small",
|
||||
timeout = "short",
|
||||
env = {
|
||||
"PATH": "/usr/bin:/bin:/usr/local/bin",
|
||||
},
|
||||
tags = ["e2e"],
|
||||
)
|
||||
## Test utilities filegroup
|
||||
#filegroup(
|
||||
# name = "test_utils",
|
||||
# srcs = [
|
||||
# "lib/test_utils.sh",
|
||||
# "lib/db_utils.sh",
|
||||
# "lib/service_utils.sh",
|
||||
# ],
|
||||
# visibility = ["//visibility:public"],
|
||||
#)
|
||||
#
|
||||
## Delegation test that verifies second builds properly delegate to existing partitions
|
||||
## Note: This test is designed to be run via the integration test runner (run_e2e_tests.sh)
|
||||
## rather than through bazel test, due to examples being in .bazelignore
|
||||
#sh_test(
|
||||
# name = "delegation_test",
|
||||
# srcs = ["delegation_test.sh"],
|
||||
# data = [
|
||||
# ":test_utils",
|
||||
# ],
|
||||
# size = "medium",
|
||||
# timeout = "moderate",
|
||||
# env = {
|
||||
# "PATH": "/usr/bin:/bin:/usr/local/bin",
|
||||
# },
|
||||
# tags = ["e2e", "delegation", "manual"],
|
||||
# args = ["placeholder"],
|
||||
#)
|
||||
#
|
||||
## Simple shell script test that validates the test runner
|
||||
#sh_test(
|
||||
# name = "e2e_runner_test",
|
||||
# srcs = ["validate_runner.sh"],
|
||||
# data = [
|
||||
# "//:run_e2e_tests.sh",
|
||||
# ":test_utils",
|
||||
# "lib/test_utils.sh",
|
||||
# "lib/db_utils.sh",
|
||||
# "lib/service_utils.sh",
|
||||
# "simple_test.sh",
|
||||
# "basic_graph_test.sh",
|
||||
# "podcast_reviews_test.sh",
|
||||
# "delegation_test.sh",
|
||||
# ],
|
||||
# size = "small",
|
||||
# timeout = "short",
|
||||
# env = {
|
||||
# "PATH": "/usr/bin:/bin:/usr/local/bin",
|
||||
# },
|
||||
# tags = ["e2e"],
|
||||
#)
|
||||
|
|
|
|||
Loading…
Reference in a new issue