This commit is contained in:
Stuart Axelbrooke 2025-07-06 14:41:26 -07:00
parent 1f76470ac4
commit 410217481e
11 changed files with 1953 additions and 91 deletions

View file

@ -45,6 +45,7 @@ crate.spec(
"macros",
"net",
"rt-multi-thread",
"sync",
],
package = "tokio",
version = "1.38",
@ -65,5 +66,24 @@ crate.spec(
package = "tempfile",
version = "3.0",
)
crate.spec(
package = "async-trait",
version = "0.1",
)
crate.spec(
package = "uuid",
version = "1.0",
features = ["v4"],
)
crate.spec(
package = "rusqlite",
version = "0.31",
features = ["bundled"],
)
crate.spec(
package = "clap",
version = "4.0",
features = ["derive"],
)
crate.from_specs()
use_repo(crate, "crates")

File diff suppressed because one or more lines are too long

View file

@ -34,16 +34,26 @@ genrule(
rust_library(
name = "databuild",
srcs = [
# "lib.rs",
"lib.rs",
"event_log/mod.rs",
"event_log/stdout.rs",
"event_log/sqlite.rs",
"event_log/postgres.rs",
":generate_databuild_rust",
# "structs.rs",
],
edition = "2021",
visibility = ["//visibility:public"],
deps = [
"@crates//:prost",
"@crates//:prost-types",
"@crates//:rusqlite",
"@crates//:serde",
"@crates//:serde_json",
"@crates//:tokio",
"@crates//:uuid",
],
proc_macro_deps = [
"@crates//:async-trait",
],
)

View file

@ -155,6 +155,95 @@ message GraphExecuteResponse { repeated PartitionManifest manifests = 1; }
message GraphBuildRequest { repeated PartitionRef outputs = 1; }
message GraphBuildResponse { repeated PartitionManifest manifests = 1; }
///////////////////////////////////////////////////////////////////////////////////////////////
// Build Event Log
///////////////////////////////////////////////////////////////////////////////////////////////
// Partition lifecycle states
enum PartitionStatus {
PARTITION_UNKNOWN = 0;
PARTITION_REQUESTED = 1; // Partition requested but not yet scheduled
PARTITION_SCHEDULED = 2; // Job scheduled to produce this partition
PARTITION_BUILDING = 3; // Job actively building this partition
PARTITION_AVAILABLE = 4; // Partition successfully built and available
PARTITION_FAILED = 5; // Partition build failed
PARTITION_DELEGATED = 6; // Request delegated to existing build
}
// Job execution lifecycle
enum JobStatus {
JOB_UNKNOWN = 0;
JOB_SCHEDULED = 1; // Job scheduled for execution
JOB_RUNNING = 2; // Job actively executing
JOB_COMPLETED = 3; // Job completed successfully
JOB_FAILED = 4; // Job execution failed
JOB_CANCELLED = 5; // Job execution cancelled
}
// Build request lifecycle
enum BuildRequestStatus {
BUILD_REQUEST_UNKNOWN = 0;
BUILD_REQUEST_RECEIVED = 1; // Build request received and queued
BUILD_REQUEST_PLANNING = 2; // Graph analysis in progress
BUILD_REQUEST_EXECUTING = 3; // Jobs are being executed
BUILD_REQUEST_COMPLETED = 4; // All requested partitions built
BUILD_REQUEST_FAILED = 5; // Build request failed
BUILD_REQUEST_CANCELLED = 6; // Build request cancelled
}
// Build request lifecycle event
message BuildRequestEvent {
BuildRequestStatus status = 1;
repeated PartitionRef requested_partitions = 2;
string message = 3; // Optional status message
}
// Partition state change event
message PartitionEvent {
PartitionRef partition_ref = 1;
PartitionStatus status = 2;
string message = 3; // Optional status message
string job_run_id = 4; // UUID of job run producing this partition (if applicable)
}
// Job execution event
message JobEvent {
string job_run_id = 1; // UUID for this job run
JobLabel job_label = 2; // Job being executed
repeated PartitionRef target_partitions = 3; // Partitions this job run produces
JobStatus status = 4;
string message = 5; // Optional status message
JobConfig config = 6; // Job configuration used (for SCHEDULED events)
repeated PartitionManifest manifests = 7; // Results (for COMPLETED events)
}
// Delegation event (when build request delegates to existing build)
message DelegationEvent {
PartitionRef partition_ref = 1;
string delegated_to_build_request_id = 2; // Build request handling this partition
string message = 3; // Optional message
}
// Individual build event
message BuildEvent {
// Event metadata
string event_id = 1; // UUID for this event
int64 timestamp = 2; // Unix timestamp (nanoseconds)
string build_request_id = 3; // UUID of the build request
// Event type and payload (one of)
oneof event_type {
BuildRequestEvent build_request_event = 10;
PartitionEvent partition_event = 11;
JobEvent job_event = 12;
DelegationEvent delegation_event = 13;
}
}
///////////////////////////////////////////////////////////////////////////////////////////////
// Services
///////////////////////////////////////////////////////////////////////////////////////////////
// Service for job configuration and graph analysis
service DataBuildService {
// // Get job configurations for the specified output references

View file

@ -12,11 +12,14 @@ rust_binary(
visibility = ["//visibility:public"],
deps = [
"//databuild",
"@crates//:clap",
"@crates//:crossbeam-channel",
"@crates//:log",
"@crates//:serde",
"@crates//:serde_json",
"@crates//:simple_logger",
"@crates//:tokio",
"@crates//:uuid",
],
)
@ -27,11 +30,14 @@ rust_binary(
visibility = ["//visibility:public"],
deps = [
"//databuild",
"@crates//:clap",
"@crates//:crossbeam-channel",
"@crates//:log",
"@crates//:num_cpus",
"@crates//:serde",
"@crates//:serde_json",
"@crates//:simple_logger",
"@crates//:tokio",
"@crates//:uuid",
],
)

View file

@ -5,7 +5,10 @@ use std::sync::{Arc, Mutex};
use std::thread;
use log::{info, error};
use simple_logger::SimpleLogger;
use clap::{Arg, Command as ClapCommand};
use uuid::Uuid;
use databuild::*;
use databuild::event_log::{BuildEventLog, create_build_event_log, create_build_event};
// Configure a job to produce the desired outputs
fn configure(job_label: &str, output_refs: &[String]) -> Result<Vec<Task>, String> {
@ -52,8 +55,8 @@ fn configure(job_label: &str, output_refs: &[String]) -> Result<Vec<Task>, Strin
// Create tasks
let tasks: Vec<Task> = job_configs.into_iter()
.map(|cfg| Task {
job_label: job_label.to_string(),
config: cfg,
job: Some(JobLabel { label: job_label.to_string() }),
config: Some(cfg),
})
.collect();
@ -168,15 +171,129 @@ fn configure_parallel(job_refs: HashMap<String, Vec<String>>, num_workers: usize
Ok(all_tasks)
}
// Check for non-stale partitions and delegation opportunities
async fn check_partition_staleness(
partition_refs: &[String],
event_log: &Box<dyn BuildEventLog>,
build_request_id: &str
) -> Result<(Vec<String>, Vec<String>), String> {
let mut stale_partitions = Vec::new();
let mut delegated_partitions = Vec::new();
for partition_ref in partition_refs {
// Check latest partition status
match event_log.get_latest_partition_status(partition_ref).await {
Ok(Some((PartitionStatus::PartitionAvailable, _timestamp))) => {
info!("Partition {} is already available, skipping", partition_ref);
// Could add more sophisticated staleness checking here based on timestamp
// For now, assume available partitions are fresh
delegated_partitions.push(partition_ref.clone());
}
Ok(Some((PartitionStatus::PartitionBuilding, _timestamp))) |
Ok(Some((PartitionStatus::PartitionScheduled, _timestamp))) => {
// Check if another build is actively working on this partition
match event_log.get_active_builds_for_partition(partition_ref).await {
Ok(active_builds) if !active_builds.is_empty() => {
info!("Partition {} is being built by another request: {:?}", partition_ref, active_builds);
// Log delegation event
for delegated_to_build_id in &active_builds {
let event = create_build_event(
build_request_id.to_string(),
crate::build_event::EventType::DelegationEvent(DelegationEvent {
partition_ref: Some(PartitionRef { str: partition_ref.clone() }),
delegated_to_build_request_id: delegated_to_build_id.clone(),
message: format!("Delegated to existing build"),
})
);
if let Err(e) = event_log.append_event(event).await {
error!("Failed to log delegation event: {}", e);
}
}
delegated_partitions.push(partition_ref.clone());
}
_ => {
// No active builds, consider it stale and needs rebuilding
stale_partitions.push(partition_ref.clone());
}
}
}
_ => {
// Partition not found or failed, needs to be built
stale_partitions.push(partition_ref.clone());
}
}
}
Ok((stale_partitions, delegated_partitions))
}
// Plan creates a job graph for given output references
fn plan(output_refs: &[String]) -> Result<JobGraph, String> {
async fn plan(
output_refs: &[String],
build_event_log: Option<Box<dyn BuildEventLog>>,
build_request_id: &str
) -> Result<JobGraph, String> {
info!("Starting planning for {} output refs: {:?}", output_refs.len(), output_refs);
// Log build request received event
if let Some(ref event_log) = build_event_log {
let event = create_build_event(
build_request_id.to_string(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestReceived as i32,
requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(),
message: "Analysis started".to_string(),
})
);
if let Err(e) = event_log.append_event(event).await {
error!("Failed to log build request event: {}", e);
}
}
// Check for partition staleness and delegation opportunities
let (stale_refs, delegated_refs) = if let Some(ref event_log) = build_event_log {
match check_partition_staleness(output_refs, event_log, build_request_id).await {
Ok((stale, delegated)) => {
info!("Staleness check: {} stale, {} delegated partitions", stale.len(), delegated.len());
(stale, delegated)
}
Err(e) => {
error!("Failed to check partition staleness: {}", e);
// Fall back to building all partitions
(output_refs.to_vec(), Vec::new())
}
}
} else {
// No event log, build all partitions
(output_refs.to_vec(), Vec::new())
};
// Only plan for stale partitions that need to be built
let mut unhandled_refs = HashSet::new();
for ref_str in output_refs {
for ref_str in &stale_refs {
unhandled_refs.insert(ref_str.clone());
}
// Log partition scheduling events for stale partitions
if let Some(ref event_log) = build_event_log {
for partition_ref in &stale_refs {
let event = create_build_event(
build_request_id.to_string(),
crate::build_event::EventType::PartitionEvent(PartitionEvent {
partition_ref: Some(PartitionRef { str: partition_ref.clone() }),
status: PartitionStatus::PartitionScheduled as i32,
message: "Partition scheduled for building".to_string(),
job_run_id: String::new(),
})
);
if let Err(e) = event_log.append_event(event).await {
error!("Failed to log partition scheduled event: {}", e);
}
}
}
let mut epoch = 0;
let mut nodes = Vec::new();
@ -196,6 +313,21 @@ fn plan(output_refs: &[String]) -> Result<JobGraph, String> {
}
info!("Using {} workers for parallel execution", num_workers);
// Log planning phase start
if let Some(ref event_log) = build_event_log {
let event = create_build_event(
build_request_id.to_string(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestPlanning as i32,
requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(),
message: "Graph analysis in progress".to_string(),
})
);
if let Err(e) = event_log.append_event(event).await {
error!("Failed to log planning event: {}", e);
}
}
while !unhandled_refs.is_empty() {
if epoch >= 1000 {
error!("Planning timeout: still planning after {} epochs, giving up", epoch);
@ -232,12 +364,12 @@ fn plan(output_refs: &[String]) -> Result<JobGraph, String> {
// Plan next epoch
let mut new_unhandled_count = 0;
for task in &new_nodes {
for input in &task.config.inputs {
for input in &task.config.as_ref().unwrap().inputs {
if input.dep_type == 1 { // MATERIALIZE = 1
if !unhandled_refs.contains(&input.partition_ref.str) {
if !unhandled_refs.contains(&input.partition_ref.as_ref().unwrap().str) {
new_unhandled_count += 1;
}
unhandled_refs.insert(input.partition_ref.str.clone());
unhandled_refs.insert(input.partition_ref.as_ref().unwrap().str.clone());
}
}
}
@ -249,12 +381,45 @@ fn plan(output_refs: &[String]) -> Result<JobGraph, String> {
if !nodes.is_empty() {
info!("Planning complete: created graph with {} nodes for {} output refs", nodes.len(), output_refs.len());
// Log planning completion
if let Some(ref event_log) = build_event_log {
let event = create_build_event(
build_request_id.to_string(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestCompleted as i32,
requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(),
message: format!("Analysis completed with {} jobs", nodes.len()),
})
);
if let Err(e) = event_log.append_event(event).await {
error!("Failed to log completion event: {}", e);
}
}
Ok(JobGraph {
outputs: output_refs.to_vec(),
label: Some(GraphLabel { label: "analyzed_graph".to_string() }),
outputs: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(),
nodes,
})
} else {
error!("Planning failed: no nodes created for output refs {:?}", output_refs);
// Log planning failure
if let Some(ref event_log) = build_event_log {
let event = create_build_event(
build_request_id.to_string(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestFailed as i32,
requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(),
message: "No jobs found for requested partitions".to_string(),
})
);
if let Err(e) = event_log.append_event(event).await {
error!("Failed to log failure event: {}", e);
}
}
Err("Unknown failure in graph planning".to_string())
}
}
@ -271,25 +436,25 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String {
// Map to track which refs are outputs (to highlight them)
let mut is_output_ref = HashSet::new();
for ref_str in &graph.outputs {
is_output_ref.insert(ref_str.clone());
is_output_ref.insert(ref_str.str.clone());
}
// Process each task in the graph
for task in &graph.nodes {
// Create a unique ID for this job+outputs combination
let outputs_strs: Vec<String> = task.config.outputs.iter().map(|o| o.str.clone()).collect();
let outputs_strs: Vec<String> = task.config.as_ref().unwrap().outputs.iter().map(|o| o.str.clone()).collect();
let outputs_key = outputs_strs.join("_");
let mut job_node_id = format!("job_{}", task.job_label.replace("//", "_"));
let mut job_node_id = format!("job_{}", task.job.as_ref().unwrap().label.replace("//", "_"));
job_node_id = job_node_id.replace(":", "_").replace("=", "_").replace("?", "_").replace(" ", "_");
job_node_id = format!("{}_{}", job_node_id, outputs_key.replace("/", "_").replace("=", "_"));
// Create a descriptive label that includes both job label and outputs
let job_label = &task.job_label;
let outputs_label = if !task.config.outputs.is_empty() {
if task.config.outputs.len() == 1 {
format!(" [{}]", task.config.outputs[0].str)
let job_label = &task.job.as_ref().unwrap().label;
let outputs_label = if !task.config.as_ref().unwrap().outputs.is_empty() {
if task.config.as_ref().unwrap().outputs.len() == 1 {
format!(" [{}]", task.config.as_ref().unwrap().outputs[0].str)
} else {
format!(" [{}, ...]", task.config.outputs[0].str)
format!(" [{}, ...]", task.config.as_ref().unwrap().outputs[0].str)
}
} else {
String::new()
@ -308,12 +473,12 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String {
}
// Process inputs (dependencies)
for input in &task.config.inputs {
let ref_node_id = format!("ref_{}", input.partition_ref.str.replace("/", "_").replace("=", "_"));
for input in &task.config.as_ref().unwrap().inputs {
let ref_node_id = format!("ref_{}", input.partition_ref.as_ref().unwrap().str.replace("/", "_").replace("=", "_"));
// Add the partition ref node if not already added
if !added_refs.contains(&ref_node_id) {
let node_class = if is_output_ref.contains(&input.partition_ref.str) {
let node_class = if is_output_ref.contains(&input.partition_ref.as_ref().unwrap().str) {
"outputPartition"
} else {
"partition"
@ -323,7 +488,7 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String {
mermaid.push_str(&format!(
" {}[(\"{}\")]:::{}\n",
ref_node_id,
input.partition_ref.str.replace("/", "_").replace("=", "_"),
input.partition_ref.as_ref().unwrap().str.replace("/", "_").replace("=", "_"),
node_class
));
added_refs.insert(ref_node_id.clone());
@ -340,7 +505,7 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String {
}
// Process outputs
for output in &task.config.outputs {
for output in &task.config.as_ref().unwrap().outputs {
let ref_node_id = format!("ref_{}", output.str.replace("/", "_").replace("=", "_"));
// Add the partition ref node if not already added
@ -375,19 +540,70 @@ fn generate_mermaid_diagram(graph: &JobGraph) -> String {
mermaid
}
fn main() {
#[tokio::main]
async fn main() {
// Initialize logger
SimpleLogger::new().init().unwrap();
let mode = env::var("DATABUILD_MODE").unwrap_or_else(|_| "unknown".to_string());
info!("Starting analyze.rs in mode: {}", mode);
let args: Vec<String> = env::args().skip(1).collect();
// Parse command line arguments
let matches = ClapCommand::new("analyze")
.version("1.0")
.about("DataBuild graph analysis tool")
.arg(
Arg::new("build-event-log")
.long("build-event-log")
.value_name("URI")
.help("Build event log URI (stdout, sqlite://path, postgres://connection)")
.required(false)
)
.arg(
Arg::new("build-request-id")
.long("build-request-id")
.value_name("UUID")
.help("Build request ID (auto-generated if not provided)")
.required(false)
)
.arg(
Arg::new("partitions")
.help("Partition references to analyze")
.required(true)
.num_args(1..)
)
.get_matches();
let build_event_log_uri = matches.get_one::<String>("build-event-log");
let build_request_id = matches.get_one::<String>("build-request-id")
.cloned()
.unwrap_or_else(|| Uuid::new_v4().to_string());
let args: Vec<String> = matches.get_many::<String>("partitions")
.unwrap_or_default()
.cloned()
.collect();
// Initialize build event log if provided
let build_event_log = if let Some(uri) = build_event_log_uri {
match create_build_event_log(uri).await {
Ok(log) => {
info!("Initialized build event log: {}", uri);
Some(log)
}
Err(e) => {
error!("Failed to initialize build event log {}: {}", uri, e);
exit(1);
}
}
} else {
None
};
match mode.as_str() {
"plan" => {
// Get output refs from command line arguments
match plan(&args) {
match plan(&args, build_event_log, &build_request_id).await {
Ok(graph) => {
// Output the job graph as JSON
match serde_json::to_string(&graph) {
@ -433,7 +649,7 @@ fn main() {
}
"mermaid" => {
// Get output refs from command line arguments
match plan(&args) {
match plan(&args, None, &build_request_id).await {
Ok(graph) => {
// Generate and output the mermaid diagram
let mermaid_diagram = generate_mermaid_diagram(&graph);

View file

@ -1,7 +1,8 @@
use databuild::{JobGraph, Task};
use databuild::{JobGraph, Task, JobStatus, BuildRequestStatus, PartitionStatus, BuildRequestEvent, JobEvent, PartitionEvent};
use databuild::event_log::{create_build_event_log, create_build_event};
use databuild::build_event::EventType;
use crossbeam_channel::{Receiver, Sender};
use log::{debug, error, info, warn};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
@ -9,6 +10,8 @@ use std::process::{Command, Stdio};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use clap::{Arg, Command as ClapCommand};
use uuid::Uuid;
const NUM_WORKERS: usize = 4;
const LOG_INTERVAL: Duration = Duration::from_secs(5);
@ -37,12 +40,12 @@ struct TaskExecutionResult {
// Mirrors the Go implementation's getTaskKey.
fn get_task_key(task: &Task) -> String {
let mut key_parts = Vec::new();
key_parts.push(task.job_label.clone());
key_parts.push(task.job.as_ref().unwrap().label.clone());
for input_dep in &task.config.inputs {
key_parts.push(format!("input:{}", input_dep.partition_ref.str));
for input_dep in &task.config.as_ref().unwrap().inputs {
key_parts.push(format!("input:{}", input_dep.partition_ref.as_ref().unwrap().str));
}
for output_ref in &task.config.outputs {
for output_ref in &task.config.as_ref().unwrap().outputs {
key_parts.push(format!("output:{}", output_ref.str));
}
key_parts.join("|")
@ -98,20 +101,20 @@ fn worker(
info!("[Worker {}] Starting", worker_id);
while let Ok(task) = task_rx.recv() {
let task_key = get_task_key(&task);
info!("[Worker {}] Starting job: {} (Key: {})", worker_id, task.job_label, task_key);
info!("[Worker {}] Starting job: {} (Key: {})", worker_id, task.job.as_ref().unwrap().label, task_key);
let start_time = Instant::now();
let exec_path = resolve_executable_from_runfiles(&task.job_label);
let exec_path = resolve_executable_from_runfiles(&task.job.as_ref().unwrap().label);
let config_json = match serde_json::to_string(&task.config) {
let config_json = match serde_json::to_string(&task.config.as_ref().unwrap()) {
Ok(json) => json,
Err(e) => {
let err_msg = format!("Failed to serialize task config for {}: {}", task.job_label, e);
let err_msg = format!("Failed to serialize task config for {}: {}", task.job.as_ref().unwrap().label, e);
error!("[Worker {}] {}", worker_id, err_msg);
result_tx
.send(TaskExecutionResult {
task_key,
job_label: task.job_label.clone(),
job_label: task.job.as_ref().unwrap().label.clone(),
success: false,
stdout: String::new(),
stderr: err_msg.clone(),
@ -140,14 +143,14 @@ fn worker(
Ok(mut child) => {
if let Some(mut child_stdin) = child.stdin.take() {
if let Err(e) = child_stdin.write_all(config_json.as_bytes()) {
let err_msg = format!("[Worker {}] Failed to write to stdin for {}: {}", worker_id, task.job_label, e);
let err_msg = format!("[Worker {}] Failed to write to stdin for {}: {}", worker_id, task.job.as_ref().unwrap().label, e);
error!("{}", err_msg);
// Ensure child is killed if stdin write fails before wait
let _ = child.kill();
let _ = child.wait(); // Reap the child
result_tx.send(TaskExecutionResult {
task_key,
job_label: task.job_label.clone(),
job_label: task.job.as_ref().unwrap().label.clone(),
success: false,
stdout: String::new(),
stderr: err_msg.clone(),
@ -159,11 +162,11 @@ fn worker(
}
drop(child_stdin); // Close stdin to signal EOF to the child
} else {
let err_msg = format!("[Worker {}] Failed to get stdin for {}", worker_id, task.job_label);
let err_msg = format!("[Worker {}] Failed to get stdin for {}", worker_id, task.job.as_ref().unwrap().label);
error!("{}", err_msg);
result_tx.send(TaskExecutionResult {
task_key,
job_label: task.job_label.clone(),
job_label: task.job.as_ref().unwrap().label.clone(),
success: false,
stdout: String::new(),
stderr: err_msg.clone(),
@ -184,18 +187,18 @@ fn worker(
if success {
info!(
"[Worker {}] Job succeeded: {} (Duration: {:?})",
worker_id, task.job_label, duration
worker_id, task.job.as_ref().unwrap().label, duration
);
} else {
error!(
"[Worker {}] Job failed: {} (Duration: {:?}, Status: {:?})\nStdout: {}\nStderr: {}",
worker_id, task.job_label, duration, output.status, stdout, stderr
worker_id, task.job.as_ref().unwrap().label, duration, output.status, stdout, stderr
);
}
result_tx
.send(TaskExecutionResult {
task_key,
job_label: task.job_label.clone(),
job_label: task.job.as_ref().unwrap().label.clone(),
success,
stdout,
stderr,
@ -205,12 +208,12 @@ fn worker(
.unwrap_or_else(|e| error!("[Worker {}] Failed to send result: {}", worker_id, e));
}
Err(e) => {
let err_msg = format!("[Worker {}] Failed to execute or wait for {}: {}", worker_id, task.job_label, e);
let err_msg = format!("[Worker {}] Failed to execute or wait for {}: {}", worker_id, task.job.as_ref().unwrap().label, e);
error!("{}", err_msg);
result_tx
.send(TaskExecutionResult {
task_key,
job_label: task.job_label.clone(),
job_label: task.job.as_ref().unwrap().label.clone(),
success: false,
stdout: String::new(),
stderr: err_msg.clone(),
@ -222,12 +225,12 @@ fn worker(
}
}
Err(e) => {
let err_msg = format!("[Worker {}] Failed to spawn command for {}: {} (Path: {:?})", worker_id, task.job_label, e, exec_path);
let err_msg = format!("[Worker {}] Failed to spawn command for {}: {} (Path: {:?})", worker_id, task.job.as_ref().unwrap().label, e, exec_path);
error!("{}", err_msg);
result_tx
.send(TaskExecutionResult {
task_key,
job_label: task.job_label.clone(),
job_label: task.job.as_ref().unwrap().label.clone(),
success: false,
stdout: String::new(),
stderr: err_msg.clone(),
@ -244,22 +247,68 @@ fn worker(
fn is_task_ready(task: &Task, completed_outputs: &HashSet<String>) -> bool {
let mut missing_deps = Vec::new();
for dep in &task.config.inputs {
for dep in &task.config.as_ref().unwrap().inputs {
if dep.dep_type == 1 { // MATERIALIZE = 1
if !completed_outputs.contains(&dep.partition_ref.str) {
missing_deps.push(&dep.partition_ref.str);
if !completed_outputs.contains(&dep.partition_ref.as_ref().unwrap().str) {
missing_deps.push(&dep.partition_ref.as_ref().unwrap().str);
}
}
}
if !missing_deps.is_empty() {
debug!("Task {} not ready - missing dependencies: {:?}", task.job_label, missing_deps);
debug!("Task {} not ready - missing dependencies: {:?}", task.job.as_ref().unwrap().label, missing_deps);
return false;
}
true
}
// Check if partitions are already being built by other build requests
async fn check_build_coordination(
task: &Task,
event_log: &Box<dyn databuild::event_log::BuildEventLog>,
build_request_id: &str
) -> Result<bool, String> {
for output_ref in &task.config.as_ref().unwrap().outputs {
// Check if this partition is already being built by another request
match event_log.get_active_builds_for_partition(&output_ref.str).await {
Ok(active_builds) => {
let other_builds: Vec<String> = active_builds.into_iter()
.filter(|id| id != build_request_id)
.collect();
if !other_builds.is_empty() {
info!("Partition {} is already being built by other requests: {:?}. Delegating.",
output_ref.str, other_builds);
// Log delegation event
for delegated_to_build_id in &other_builds {
let event = create_build_event(
build_request_id.to_string(),
EventType::DelegationEvent(databuild::DelegationEvent {
partition_ref: Some(output_ref.clone()),
delegated_to_build_request_id: delegated_to_build_id.clone(),
message: "Delegated to active build during execution".to_string(),
})
);
if let Err(e) = event_log.append_event(event).await {
error!("Failed to log delegation event: {}", e);
}
}
return Ok(false); // Don't build this task, it's delegated
}
}
Err(e) => {
warn!("Failed to check active builds for partition {}: {}", output_ref.str, e);
// Continue with build on error to avoid blocking
}
}
}
Ok(true) // Safe to build
}
fn log_status_summary(
task_states: &HashMap<String, TaskState>,
original_tasks_by_key: &HashMap<String, Arc<Task>>,
@ -270,7 +319,7 @@ fn log_status_summary(
let mut failed_tasks = Vec::new();
for (key, state) in task_states {
let label = original_tasks_by_key.get(key).map_or_else(|| key.as_str(), |t| t.job_label.as_str());
let label = original_tasks_by_key.get(key).map_or_else(|| key.as_str(), |t| t.job.as_ref().unwrap().label.as_str());
match state {
TaskState::Pending => pending_tasks.push(label),
TaskState::Running => running_tasks.push(label),
@ -287,15 +336,72 @@ fn log_status_summary(
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
simple_logger::SimpleLogger::new().with_level(log::LevelFilter::Info).init()?;
// Parse command line arguments
let matches = ClapCommand::new("execute")
.version("1.0")
.about("DataBuild graph execution tool")
.arg(
Arg::new("build-event-log")
.long("build-event-log")
.value_name("URI")
.help("Build event log URI (stdout, sqlite://path, postgres://connection)")
.required(false)
)
.arg(
Arg::new("build-request-id")
.long("build-request-id")
.value_name("UUID")
.help("Build request ID (auto-generated if not provided)")
.required(false)
)
.get_matches();
let build_event_log_uri = matches.get_one::<String>("build-event-log");
let build_request_id = matches.get_one::<String>("build-request-id")
.cloned()
.unwrap_or_else(|| Uuid::new_v4().to_string());
// Initialize build event log if provided
let build_event_log = if let Some(uri) = build_event_log_uri {
match create_build_event_log(uri).await {
Ok(log) => {
info!("Initialized build event log: {}", uri);
Some(log)
}
Err(e) => {
error!("Failed to initialize build event log {}: {}", uri, e);
std::process::exit(1);
}
}
} else {
None
};
let mut buffer = String::new();
std::io::stdin().read_to_string(&mut buffer)?;
let graph: JobGraph = serde_json::from_str(&buffer)?;
info!("Executing job graph with {} nodes", graph.nodes.len());
// Log build request execution start
if let Some(ref event_log) = build_event_log {
let event = create_build_event(
build_request_id.clone(),
EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestExecuting as i32,
requested_partitions: graph.outputs.clone(),
message: format!("Starting execution of {} jobs", graph.nodes.len()),
})
);
if let Err(e) = event_log.append_event(event).await {
error!("Failed to log execution start event: {}", e);
}
}
let mut task_states: HashMap<String, TaskState> = HashMap::new();
let mut original_tasks_by_key: HashMap<String, Arc<Task>> = HashMap::new();
let graph_nodes_arc: Vec<Arc<Task>> = graph.nodes.into_iter().map(Arc::new).collect();
@ -331,7 +437,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut fail_fast_triggered = false;
loop {
// 1. Process results
// 1. Process results
while let Ok(result) = result_rx.try_recv() {
active_tasks_count -= 1;
info!(
@ -346,9 +452,49 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
};
task_states.insert(result.task_key.clone(), current_state);
// Log job completion events
if let Some(ref event_log) = build_event_log {
if let Some(original_task) = original_tasks_by_key.get(&result.task_key) {
let job_run_id = Uuid::new_v4().to_string();
// Log job completion
let job_event = create_build_event(
build_request_id.clone(),
EventType::JobEvent(JobEvent {
job_run_id: job_run_id.clone(),
job_label: original_task.job.clone(),
target_partitions: original_task.config.as_ref().unwrap().outputs.clone(),
status: if result.success { JobStatus::JobCompleted as i32 } else { JobStatus::JobFailed as i32 },
message: if result.success { "Job completed successfully".to_string() } else { result.error_message.clone().unwrap_or_default() },
config: original_task.config.clone(),
manifests: vec![], // Would be populated from actual job output
})
);
if let Err(e) = event_log.append_event(job_event).await {
error!("Failed to log job completion event: {}", e);
}
// Log partition status updates
for output_ref in &original_task.config.as_ref().unwrap().outputs {
let partition_event = create_build_event(
build_request_id.clone(),
EventType::PartitionEvent(PartitionEvent {
partition_ref: Some(output_ref.clone()),
status: if result.success { PartitionStatus::PartitionAvailable as i32 } else { PartitionStatus::PartitionFailed as i32 },
message: if result.success { "Partition built successfully".to_string() } else { "Partition build failed".to_string() },
job_run_id: job_run_id.clone(),
})
);
if let Err(e) = event_log.append_event(partition_event).await {
error!("Failed to log partition status event: {}", e);
}
}
}
}
if result.success {
if let Some(original_task) = original_tasks_by_key.get(&result.task_key) {
for output_ref in &original_task.config.outputs {
for output_ref in &original_task.config.as_ref().unwrap().outputs {
completed_outputs.insert(output_ref.str.clone());
}
}
@ -374,7 +520,72 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let task_key = get_task_key(task_node);
if task_states.get(&task_key) == Some(&TaskState::Pending) {
if is_task_ready(task_node, &completed_outputs) {
info!("Dispatching task: {}", task_node.job_label);
// Check build coordination if event log is available
let should_build = if let Some(ref event_log) = build_event_log {
match check_build_coordination(task_node, event_log, &build_request_id).await {
Ok(should_build) => should_build,
Err(e) => {
error!("Error checking build coordination for {}: {}",
task_node.job.as_ref().unwrap().label, e);
true // Default to building on error
}
}
} else {
true // No event log, always build
};
if !should_build {
// Task delegated to another build, mark as succeeded
info!("Task {} delegated to another build request", task_node.job.as_ref().unwrap().label);
task_states.insert(task_key.clone(), TaskState::Succeeded);
// Mark outputs as completed
for output_ref in &task_node.config.as_ref().unwrap().outputs {
completed_outputs.insert(output_ref.str.clone());
}
continue;
}
info!("Dispatching task: {}", task_node.job.as_ref().unwrap().label);
// Log job scheduling events
if let Some(ref event_log) = build_event_log {
let job_run_id = Uuid::new_v4().to_string();
// Log job scheduled
let job_event = create_build_event(
build_request_id.clone(),
EventType::JobEvent(JobEvent {
job_run_id: job_run_id.clone(),
job_label: task_node.job.clone(),
target_partitions: task_node.config.as_ref().unwrap().outputs.clone(),
status: JobStatus::JobScheduled as i32,
message: "Job scheduled for execution".to_string(),
config: task_node.config.clone(),
manifests: vec![],
})
);
if let Err(e) = event_log.append_event(job_event).await {
error!("Failed to log job scheduled event: {}", e);
}
// Log partition building status
for output_ref in &task_node.config.as_ref().unwrap().outputs {
let partition_event = create_build_event(
build_request_id.clone(),
EventType::PartitionEvent(PartitionEvent {
partition_ref: Some(output_ref.clone()),
status: PartitionStatus::PartitionBuilding as i32,
message: "Partition build started".to_string(),
job_run_id: job_run_id.clone(),
})
);
if let Err(e) = event_log.append_event(partition_event).await {
error!("Failed to log partition building event: {}", e);
}
}
}
task_states.insert(task_key.clone(), TaskState::Running);
task_tx.send(task_node.clone())?;
active_tasks_count += 1;
@ -398,16 +609,16 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
for (key, state) in &task_states {
if *state == TaskState::Pending {
if let Some(task) = original_tasks_by_key.get(key) {
warn!("Pending task: {} ({})", task.job_label, key);
warn!("Pending task: {} ({})", task.job.as_ref().unwrap().label, key);
warn!(" Required inputs:");
for dep in &task.config.inputs {
for dep in &task.config.as_ref().unwrap().inputs {
if dep.dep_type == 1 { // MATERIALIZE = 1
let available = completed_outputs.contains(&dep.partition_ref.str);
warn!(" {} - {}", dep.partition_ref.str, if available { "AVAILABLE" } else { "MISSING" });
let available = completed_outputs.contains(&dep.partition_ref.as_ref().unwrap().str);
warn!(" {} - {}", dep.partition_ref.as_ref().unwrap().str, if available { "AVAILABLE" } else { "MISSING" });
}
}
warn!(" Produces outputs:");
for output in &task.config.outputs {
for output in &task.config.as_ref().unwrap().outputs {
warn!(" {}", output.str);
}
}
@ -457,6 +668,27 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("Execution complete: {} succeeded, {} failed", success_count, failure_count);
// Log final build request status
if let Some(ref event_log) = build_event_log {
let final_status = if failure_count > 0 || fail_fast_triggered {
BuildRequestStatus::BuildRequestFailed
} else {
BuildRequestStatus::BuildRequestCompleted
};
let event = create_build_event(
build_request_id.clone(),
EventType::BuildRequestEvent(BuildRequestEvent {
status: final_status as i32,
requested_partitions: graph.outputs.clone(),
message: format!("Execution completed: {} succeeded, {} failed", success_count, failure_count),
})
);
if let Err(e) = event_log.append_event(event).await {
error!("Failed to log final build request event: {}", e);
}
}
if failure_count > 0 || fail_fast_triggered {
error!("Execution finished with errors.");
std::process::exit(1);

View file

@ -9,5 +9,5 @@ set -e
# Assumes workspace name is 'databuild'
EXECUTABLE_BINARY="$(rlocation "databuild/databuild/graph/analyze")"
# Run the analysis
exec "${EXECUTABLE_BINARY}" "$@"
# Run the analysis with optional build event log arguments
exec "${EXECUTABLE_BINARY}" ${DATABUILD_BUILD_EVENT_LOG_ARGS} "$@"

View file

@ -7,5 +7,5 @@ set -e
EXECUTABLE_BINARY="$(rlocation "databuild+/databuild/graph/$(basename "%{EXECUTABLE_PATH}")")"
# Run the execution
exec "${EXECUTABLE_BINARY}" "$@"
# Run the execution with optional build event log arguments
exec "${EXECUTABLE_BINARY}" ${DATABUILD_BUILD_EVENT_LOG_ARGS} "$@"

View file

@ -231,8 +231,16 @@ _databuild_job_rule = rule(
executable = True,
)
def databuild_graph(name, jobs, lookup, visibility = None):
"""Creates a databuild graph target."""
def databuild_graph(name, jobs, lookup, build_event_log = None, visibility = None):
"""Creates a databuild graph target.
Args:
name: Name of the graph target
jobs: List of job targets
lookup: Job lookup binary
build_event_log: Optional build event log URI (e.g., "sqlite:///tmp/builds.db", "stdout")
visibility: Visibility specification
"""
_databuild_graph_lookup(
name = "%s.lookup" % name,
lookup = lookup,
@ -242,6 +250,7 @@ def databuild_graph(name, jobs, lookup, visibility = None):
name = "%s.analyze" % name,
lookup = "%s.lookup" % name,
jobs = jobs,
build_event_log = build_event_log,
visibility = visibility,
)
_databuild_graph_mermaid(
@ -253,6 +262,7 @@ def databuild_graph(name, jobs, lookup, visibility = None):
_databuild_graph_exec(
name = "%s.exec" % name,
jobs = jobs,
build_event_log = build_event_log,
visibility = visibility,
)
_databuild_graph_build(
@ -260,6 +270,7 @@ def databuild_graph(name, jobs, lookup, visibility = None):
analyze = "%s.analyze" % name,
exec = "%s.exec" % name,
jobs = jobs,
build_event_log = build_event_log,
visibility = visibility,
)
tar(
@ -346,13 +357,20 @@ def _databuild_graph_analyze_impl(ctx):
for target in ctx.attr.jobs
]) + "'"
# Build the command with optional build event log
build_event_log_args = ""
if ctx.attr.build_event_log:
build_event_log_args = "--build-event-log '%s'" % ctx.attr.build_event_log
env_setup = """
export DATABUILD_CANDIDATE_JOBS="{candidate_job_env_var}"
export DATABUILD_MODE=plan
export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path})
export DATABUILD_BUILD_EVENT_LOG_ARGS="{build_event_log_args}"
""".format(
candidate_job_env_var = config_paths_str,
lookup_path = ctx.attr.lookup.files_to_run.executable.short_path,
build_event_log_args = build_event_log_args,
)
script_prefix = env_setup
@ -406,6 +424,10 @@ _databuild_graph_analyze = rule(
doc = "The list of jobs that are candidates for building partitions in this databuild graph",
allow_empty = False,
),
"build_event_log": attr.string(
doc = "Optional build event log URI",
mandatory = False,
),
"_template": attr.label(
default = "@databuild//databuild/graph:rust_analyze_wrapper.sh.tpl",
allow_single_file = True,
@ -523,13 +545,24 @@ def _databuild_graph_exec_impl(ctx):
for job in ctx.attr.jobs
]
# Build the command with optional build event log
build_event_log_args = ""
if ctx.attr.build_event_log:
build_event_log_args = "--build-event-log '%s'" % ctx.attr.build_event_log
prefix_setup = """
export DATABUILD_BUILD_EVENT_LOG_ARGS="{build_event_log_args}"
""".format(
build_event_log_args = build_event_log_args,
)
ctx.actions.expand_template(
template = ctx.file._template,
output = script,
substitutions = {
"%{EXECUTABLE_PATH}": ctx.attr._execute.files_to_run.executable.path,
"%{RUNFILES_PREFIX}": RUNFILES_PREFIX,
"%{PREFIX}": "",
"%{PREFIX}": prefix_setup,
},
is_executable = True,
)
@ -560,6 +593,10 @@ _databuild_graph_exec = rule(
doc = "The list of jobs that are candidates for building partitions in this databuild graph",
allow_empty = False,
),
"build_event_log": attr.string(
doc = "Optional build event log URI",
mandatory = False,
),
"_template": attr.label(
default = "@databuild//databuild/graph:rust_execute_wrapper.sh.tpl",
allow_single_file = True,
@ -635,7 +672,11 @@ _databuild_graph_build = rule(
"jobs": attr.label_list(
doc = "The list of jobs that are candidates for building partitions in this databuild graph",
allow_empty = False,
)
),
"build_event_log": attr.string(
doc = "Optional build event log URI",
mandatory = False,
),
},
executable = True,
)

File diff suppressed because one or more lines are too long