implemented api phase 7 - running application

This commit is contained in:
Stuart Axelbrooke 2025-11-22 23:05:46 +08:00
parent 556ccb8a4b
commit a5a1be8855
12 changed files with 394 additions and 10 deletions

View file

@ -101,6 +101,10 @@ crate.spec(
package = "reqwest",
version = "0.11",
)
crate.spec(
package = "toml",
version = "0.8",
)
crate.from_specs()
use_repo(crate, "crates")

File diff suppressed because one or more lines are too long

View file

@ -34,6 +34,7 @@ rust_library(
"@crates//:serde",
"@crates//:serde_json",
"@crates//:tokio",
"@crates//:toml",
"@crates//:tower",
"@crates//:tower-http",
"@crates//:tracing",

View file

@ -29,6 +29,10 @@ enum Commands {
/// Database URL (default: :memory: for in-memory SQLite)
#[arg(long, default_value = ":memory:")]
database: String,
/// Path to configuration file (JSON or TOML)
#[arg(long)]
config: Option<String>,
},
/// Create a new want (trigger partition builds)
@ -78,8 +82,8 @@ fn main() {
let cli = Cli::parse();
match cli.command {
Commands::Serve { port, database } => {
cmd_serve(port, &database);
Commands::Serve { port, database, config } => {
cmd_serve(port, &database, config.as_deref());
}
Commands::Want { partitions } => {
cmd_want(&cli.server, partitions);
@ -107,12 +111,29 @@ fn main() {
// ============================================================================
#[tokio::main]
async fn cmd_serve(port: u16, database: &str) {
async fn cmd_serve(port: u16, database: &str, config_path: Option<&str>) {
use databuild::build_event_log::BELStorage;
// Initialize logging
tracing_subscriber::fmt::init();
// Load configuration if provided
let jobs = if let Some(path) = config_path {
match databuild::config::DatabuildConfig::from_file(path) {
Ok(config) => {
println!("Loaded configuration from: {}", path);
println!(" Jobs: {}", config.jobs.len());
config.into_job_configurations()
}
Err(e) => {
eprintln!("Failed to load configuration from {}: {}", path, e);
std::process::exit(1);
}
}
} else {
Vec::new()
};
// Create SQLite BEL storage (shared between orchestrator and HTTP server)
let bel_storage = Arc::new(
SqliteBELStorage::create(database).expect("Failed to create BEL storage"),
@ -131,8 +152,8 @@ async fn cmd_serve(port: u16, database: &str) {
let orch_handle = std::thread::spawn(move || {
use databuild::orchestrator::{Orchestrator, OrchestratorConfig};
// Create orchestrator with command channel
let config = OrchestratorConfig::default();
// Create orchestrator with command channel and jobs from config
let config = OrchestratorConfig { jobs };
let mut orchestrator = Orchestrator::new_with_commands(orch_bel_storage, config, command_rx);
let mut shutdown_rx = orch_shutdown_rx;

92
databuild/config.rs Normal file
View file

@ -0,0 +1,92 @@
use crate::JobConfig;
use crate::job::JobConfiguration;
use crate::util::DatabuildError;
use std::fs;
use std::path::Path;
/// Configuration file format for DataBuild application
#[derive(Debug, serde::Deserialize)]
pub struct DatabuildConfig {
/// List of job configurations
pub jobs: Vec<JobConfig>,
}
impl DatabuildConfig {
/// Load configuration from a file, auto-detecting format from extension
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self, DatabuildError> {
let path = path.as_ref();
let contents = fs::read_to_string(path)
.map_err(|e| DatabuildError::from(format!("Failed to read config file: {}", e)))?;
// Determine format from file extension
let extension = path.extension().and_then(|s| s.to_str()).unwrap_or("");
match extension {
"json" => Self::from_json(&contents),
"toml" => Self::from_toml(&contents),
_ => Err(DatabuildError::from(format!(
"Unknown config file extension: {}. Use .json or .toml",
extension
))),
}
}
/// Parse configuration from JSON string
pub fn from_json(s: &str) -> Result<Self, DatabuildError> {
serde_json::from_str(s)
.map_err(|e| DatabuildError::from(format!("Failed to parse JSON config: {}", e)))
}
/// Parse configuration from TOML string
pub fn from_toml(s: &str) -> Result<Self, DatabuildError> {
toml::from_str(s)
.map_err(|e| DatabuildError::from(format!("Failed to parse TOML config: {}", e)))
}
/// Convert to a list of JobConfiguration
pub fn into_job_configurations(self) -> Vec<JobConfiguration> {
self.jobs.into_iter().map(|jc| jc.into()).collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_json_config() {
let json = r#"
{
"jobs": [
{
"label": "//test:job_alpha",
"entrypoint": "/usr/bin/python3",
"environment": {"FOO": "bar"},
"partition_patterns": ["data/alpha/.*"]
}
]
}
"#;
let config = DatabuildConfig::from_json(json).unwrap();
assert_eq!(config.jobs.len(), 1);
assert_eq!(config.jobs[0].label, "//test:job_alpha");
}
#[test]
fn test_parse_toml_config() {
let toml = r#"
[[jobs]]
label = "//test:job_alpha"
entrypoint = "/usr/bin/python3"
partition_patterns = ["data/alpha/.*"]
[jobs.environment]
FOO = "bar"
"#;
let config = DatabuildConfig::from_toml(toml).unwrap();
assert_eq!(config.jobs.len(), 1);
assert_eq!(config.jobs[0].label, "//test:job_alpha");
}
}

View file

@ -2,12 +2,14 @@ use crate::job_run::{JobRunHandle, SubProcessBackend};
use crate::util::DatabuildError;
use crate::{JobConfig, PartitionRef, WantDetail};
use regex::Regex;
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct JobConfiguration {
pub label: String,
pub patterns: Vec<String>,
pub entry_point: String,
pub environment: HashMap<String, String>,
}
impl JobConfiguration {
@ -38,6 +40,7 @@ impl From<JobConfig> for JobConfiguration {
label: config.label,
patterns: config.partition_patterns,
entry_point: config.entrypoint,
environment: config.environment,
}
}
}

View file

@ -1,6 +1,7 @@
pub mod build_event_log;
mod build_state;
pub mod commands;
pub mod config;
mod data_deps;
mod event_transforms;
pub mod http_server;

View file

@ -8,6 +8,7 @@ use crate::{JobRunBufferEventV1, PartitionRef, WantDetail};
use std::collections::HashMap;
use std::fmt::Debug;
use tokio::sync::mpsc;
use uuid::Uuid;
/**
Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads +
@ -25,6 +26,8 @@ pub struct Orchestrator<S: BELStorage + Debug> {
pub job_runs: Vec<crate::job_run::JobRunHandle<SubProcessBackend>>,
/// Optional command receiver for write operations from HTTP server
pub command_rx: Option<mpsc::Receiver<Command>>,
/// Environment variables for each job run (keyed by job_run_id)
pub job_environments: HashMap<Uuid, HashMap<String, String>>,
}
impl Default for Orchestrator<MemoryBELStorage> {
@ -34,6 +37,7 @@ impl Default for Orchestrator<MemoryBELStorage> {
config: Default::default(),
job_runs: Default::default(),
command_rx: None,
job_environments: HashMap::new(),
}
}
}
@ -45,6 +49,7 @@ impl Orchestrator<MemoryBELStorage> {
config: self.config.clone(),
job_runs: Default::default(),
command_rx: None,
job_environments: HashMap::new(),
}
}
}
@ -128,6 +133,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
config,
job_runs: Vec::new(),
command_rx: None,
job_environments: HashMap::new(),
}
}
@ -141,6 +147,7 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
config,
job_runs: Vec::new(),
command_rx: Some(command_rx),
job_environments: HashMap::new(),
}
}
@ -154,7 +161,9 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
let transitioned = match job {
JobRunHandle::NotStarted(not_started) => {
let job_run_id = not_started.job_run_id.clone();
let running = not_started.run(None)?;
// Look up environment for this job run
let env = self.job_environments.get(&job_run_id).cloned();
let running = not_started.run(env)?;
// Emit heartbeat event to notify BuildState that job is now running
let heartbeat_event = Event::JobRunHeartbeatV1(JobRunHeartbeatEventV1 {
@ -274,9 +283,16 @@ impl<S: BELStorage + Debug> Orchestrator<S> {
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
let job_run = JobRunHandle::spawn(wg.job.entry_point.clone(), args);
// Store environment for this job run
let job_run_id = job_run.job_run_id().clone();
if !wg.job.environment.is_empty() {
self.job_environments
.insert(job_run_id, wg.job.environment.clone());
}
// Create job run buffer event
let job_buffer_event = Event::JobRunBufferV1(JobRunBufferEventV1 {
job_run_id: job_run.job_run_id().to_string(),
job_run_id: job_run_id.to_string(),
job_label: wg.job.label,
building_partitions: wg
.wants

View file

@ -0,0 +1,85 @@
# Multi-Hop Dependency Example
This example demonstrates DataBuild's ability to handle multi-hop dependencies between jobs.
## Overview
The example consists of two jobs:
- **job_alpha**: Produces the `data/alpha` partition
- **job_beta**: Depends on `data/alpha` and produces `data/beta`
When you request `data/beta`:
1. Beta job runs and detects missing `data/alpha` dependency
2. Orchestrator creates a want for `data/alpha`
3. Alpha job runs and produces `data/alpha`
4. Beta job runs again and succeeds, producing `data/beta`
## Running the Example
From the repository root:
```bash
# Build the CLI
bazel build //databuild:databuild_cli
# Clean up any previous state
rm -f /tmp/databuild_multihop*.db /tmp/databuild_multihop_alpha_complete
# Start the server with the multihop configuration
./bazel-bin/databuild/databuild_cli serve \
--port 3050 \
--database /tmp/databuild_multihop.db \
--config examples/multihop/config.json
```
In another terminal, create a want for `data/beta`:
```bash
# Create a want for data/beta (which will trigger the dependency chain)
./bazel-bin/databuild/databuild_cli --server http://localhost:3050 \
want data/beta
# Watch the wants
./bazel-bin/databuild/databuild_cli --server http://localhost:3050 \
wants list
# Watch the job runs
./bazel-bin/databuild/databuild_cli --server http://localhost:3050 \
job-runs list
# Watch the partitions
./bazel-bin/databuild/databuild_cli --server http://localhost:3050 \
partitions list
```
## Expected Behavior
1. Initial want for `data/beta` is created
2. Beta job runs, detects missing `data/alpha`, reports dependency miss
3. Orchestrator creates derivative want for `data/alpha`
4. Alpha job runs and succeeds
5. Beta job runs again and succeeds
6. Both partitions are now in `Live` state
## Configuration Format
The example uses JSON format (`config.json`), but TOML is also supported. Here's the equivalent TOML:
```toml
[[jobs]]
label = "//examples/multihop:job_alpha"
entrypoint = "./examples/multihop/job_alpha.sh"
partition_patterns = ["data/alpha"]
[jobs.environment]
JOB_NAME = "alpha"
[[jobs]]
label = "//examples/multihop:job_beta"
entrypoint = "./examples/multihop/job_beta.sh"
partition_patterns = ["data/beta"]
[jobs.environment]
JOB_NAME = "beta"
```

View file

@ -0,0 +1,20 @@
{
"jobs": [
{
"label": "//examples/multihop:job_alpha",
"entrypoint": "./examples/multihop/job_alpha.sh",
"environment": {
"JOB_NAME": "alpha"
},
"partition_patterns": ["data/alpha"]
},
{
"label": "//examples/multihop:job_beta",
"entrypoint": "./examples/multihop/job_beta.sh",
"environment": {
"JOB_NAME": "beta"
},
"partition_patterns": ["data/beta"]
}
]
}

18
examples/multihop/job_alpha.sh Executable file
View file

@ -0,0 +1,18 @@
#!/bin/bash
# Job Alpha: Produces data/alpha partition
# This is a simple upstream job that beta depends on
echo "Job Alpha starting..." >&2
echo " Building partitions: $@" >&2
echo " Environment: JOB_NAME=$JOB_NAME" >&2
# Simulate some work
sleep 0.5
# Create marker file to indicate data/alpha is available
touch /tmp/databuild_multihop_alpha_complete
# Output success - no special output needed, just exit 0
echo "Job Alpha complete!" >&2

27
examples/multihop/job_beta.sh Executable file
View file

@ -0,0 +1,27 @@
#!/bin/bash
# Job Beta: Produces data/beta partition
# Depends on data/alpha from job_alpha
echo "Job Beta starting..." >&2
echo " Building partitions: $@" >&2
echo " Environment: JOB_NAME=$JOB_NAME" >&2
# Check if data/alpha marker exists (this would be real data in a real system)
ALPHA_MARKER="/tmp/databuild_multihop_alpha_complete"
if [ ! -f "$ALPHA_MARKER" ]; then
echo " Missing dependency: data/alpha" >&2
# Report missing dependency
echo 'DATABUILD_MISSING_DEPS_JSON:{"version":"1","missing_deps":[{"impacted":[{"ref":"data/beta"}],"missing":[{"ref":"data/alpha"}]}]}'
exit 1
fi
echo " Found dependency: data/alpha" >&2
# Simulate some work
sleep 0.5
# Output success - no special output needed, just exit 0
echo "Job Beta complete!" >&2