databuild/databuild/graph/analyze.rs
soaxelbrooke bfec05e065
Some checks are pending
/ setup (push) Waiting to run
Big change
2025-07-13 21:18:15 -07:00

622 lines
No EOL
24 KiB
Rust

use std::collections::{HashMap, HashSet};
use std::env;
use std::process::{Command, exit};
use std::sync::{Arc, Mutex};
use std::thread;
use log::{info, error};
use simple_logger::SimpleLogger;
use clap::{Arg, Command as ClapCommand};
use uuid::Uuid;
use databuild::*;
use databuild::event_log::{BuildEventLog, create_build_event_log, create_build_event};
// Configure a job to produce the desired outputs
fn configure(job_label: &str, output_refs: &[String]) -> Result<Vec<Task>, String> {
let candidate_jobs_str = env::var("DATABUILD_CANDIDATE_JOBS")
.map_err(|e| format!("Failed to get DATABUILD_CANDIDATE_JOBS: {}", e))?;
let job_path_map: HashMap<String, String> = serde_json::from_str(&candidate_jobs_str)
.map_err(|e| format!("Failed to parse DATABUILD_CANDIDATE_JOBS: {}", e))?;
// Look up the executable path for this job
let exec_path = job_path_map.get(job_label)
.ok_or_else(|| format!("Job {} is not a candidate job", job_label))?;
// Check if executable exists
if !std::path::Path::new(exec_path).exists() {
return Err(format!("Executable not found at path: {}", exec_path));
}
info!("Executing job configuration: {} {:?}", exec_path, output_refs);
// Execute the job configuration command
let output = Command::new(exec_path)
.args(output_refs)
.output()
.map_err(|e| format!("Failed to execute job config: {}", e))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
error!("Job configuration failed: {}", stderr);
return Err(format!("Failed to run job config: {}", stderr));
}
info!("Job configuration succeeded for {}", job_label);
// Parse the job configurations
let stdout = String::from_utf8_lossy(&output.stdout);
let job_configure_response: JobConfigureResponse = serde_json::from_str(&stdout)
.map_err(|e| {
error!("Error parsing job configs for {}: {}. `{}`", job_label, e, stdout);
format!("Failed to parse job configs: {}", e)
})?;
let job_configs = job_configure_response.configs;
// Create tasks
let tasks: Vec<Task> = job_configs.into_iter()
.map(|cfg| Task {
job: Some(JobLabel { label: job_label.to_string() }),
config: Some(cfg),
})
.collect();
info!("Created {} tasks for job {}", tasks.len(), job_label);
Ok(tasks)
}
// Resolve produces a mapping of required job refs to the partitions it produces
fn resolve(output_refs: &[String]) -> Result<HashMap<String, Vec<String>>, String> {
let lookup_path = env::var("DATABUILD_JOB_LOOKUP_PATH")
.map_err(|e| format!("Failed to get DATABUILD_JOB_LOOKUP_PATH: {}", e))?;
// Run the job lookup
info!("Executing job lookup: {} {:?}", lookup_path, output_refs);
let output = Command::new(&lookup_path)
.args(output_refs)
.output()
.map_err(|e| format!("Failed to execute job lookup: {}", e))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
error!("Job lookup failed: {}", stderr);
return Err(format!("Failed to run job lookup: {}", stderr));
}
info!("Job lookup succeeded for {} output refs", output_refs.len());
// Parse the result
let stdout = String::from_utf8_lossy(&output.stdout);
let result: HashMap<String, Vec<String>> = serde_json::from_str(&stdout)
.map_err(|e| {
error!("Error parsing job lookup result: {}", e);
format!("Failed to parse job lookup result: {}", e)
})?;
info!("Job lookup found {} job mappings", result.len());
for (job, refs) in &result {
info!(" Job {} produces {} refs", job, refs.len());
}
Ok(result)
}
// Configure multiple jobs in parallel
fn configure_parallel(job_refs: HashMap<String, Vec<String>>, num_workers: usize) -> Result<Vec<Task>, String> {
// Create a channel for jobs
let (job_sender, job_receiver) = crossbeam_channel::unbounded();
// Fill the jobs channel
for (job_label, produced_refs) in job_refs {
job_sender.send((job_label, produced_refs)).unwrap();
}
drop(job_sender); // Close the channel
// Create a channel for results
let (task_sender, task_receiver) = crossbeam_channel::unbounded();
let error = Arc::new(Mutex::new(None));
// Spawn worker threads
let mut handles = vec![];
for _ in 0..num_workers {
let job_receiver = job_receiver.clone();
let task_sender = task_sender.clone();
let error = Arc::clone(&error);
let handle = thread::spawn(move || {
for (job_label, produced_refs) in job_receiver {
// Check if an error has already occurred
if error.lock().unwrap().is_some() {
return;
}
match configure(&job_label, &produced_refs) {
Ok(tasks) => {
task_sender.send(tasks).unwrap();
}
Err(e) => {
let mut error_guard = error.lock().unwrap();
if error_guard.is_none() {
*error_guard = Some(e);
}
return;
}
}
}
});
handles.push(handle);
}
// Close the task sender
drop(task_sender);
// Wait for all workers to finish
for handle in handles {
handle.join().unwrap();
}
// Check for errors
let error_guard = error.lock().unwrap();
if let Some(e) = &*error_guard {
return Err(e.clone());
}
// Collect results
let mut all_tasks = Vec::new();
while let Ok(tasks) = task_receiver.try_recv() {
all_tasks.extend(tasks);
}
Ok(all_tasks)
}
// Simple staleness check - all requested partitions need jobs created
// Delegation optimization happens in execution phase
async fn check_partition_staleness(
partition_refs: &[String],
_event_log: &Box<dyn BuildEventLog>,
_build_request_id: &str
) -> Result<(Vec<String>, Vec<String>), String> {
// Analysis phase creates jobs for all requested partitions
// Execution phase will handle delegation optimization
let stale_partitions = partition_refs.to_vec();
let delegated_partitions = Vec::new();
Ok((stale_partitions, delegated_partitions))
}
// Plan creates a job graph for given output references
async fn plan(
output_refs: &[String],
build_event_log: Option<Box<dyn BuildEventLog>>,
build_request_id: &str
) -> Result<JobGraph, String> {
info!("Starting planning for {} output refs: {:?}", output_refs.len(), output_refs);
// Log build request received event
if let Some(ref event_log) = build_event_log {
let event = create_build_event(
build_request_id.to_string(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestReceived as i32,
requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(),
message: "Analysis started".to_string(),
})
);
if let Err(e) = event_log.append_event(event).await {
error!("Failed to log build request event: {}", e);
}
}
// Check for partition staleness and delegation opportunities
let (stale_refs, _delegated_refs) = if let Some(ref event_log) = build_event_log {
match check_partition_staleness(output_refs, event_log, build_request_id).await {
Ok((stale, delegated)) => {
info!("Staleness check: {} stale, {} delegated partitions", stale.len(), delegated.len());
(stale, delegated)
}
Err(e) => {
error!("Failed to check partition staleness: {}", e);
// Fall back to building all partitions
(output_refs.to_vec(), Vec::new())
}
}
} else {
// No event log, build all partitions
(output_refs.to_vec(), Vec::new())
};
// Only plan for stale partitions that need to be built
let mut unhandled_refs = HashSet::new();
for ref_str in &stale_refs {
unhandled_refs.insert(ref_str.clone());
}
// Note: Partition analysis events will be logged after successful job graph creation
let mut epoch = 0;
let mut nodes = Vec::new();
// Determine the number of workers based on available CPU cores or environment variable
let mut num_workers = num_cpus::get();
if let Ok(worker_env) = env::var("DATABUILD_PARALLEL_WORKERS") {
if let Ok(parsed_workers) = worker_env.parse::<usize>() {
if parsed_workers < 1 {
num_workers = 1;
info!("Warning: DATABUILD_PARALLEL_WORKERS must be at least 1, using: {}", num_workers);
} else {
num_workers = parsed_workers;
}
} else {
info!("Warning: Invalid DATABUILD_PARALLEL_WORKERS value '{}', using default: {}", worker_env, num_workers);
}
}
info!("Using {} workers for parallel execution", num_workers);
// Log planning phase start
if let Some(ref event_log) = build_event_log {
let event = create_build_event(
build_request_id.to_string(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestPlanning as i32,
requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(),
message: "Graph analysis in progress".to_string(),
})
);
if let Err(e) = event_log.append_event(event).await {
error!("Failed to log planning event: {}", e);
}
}
while !unhandled_refs.is_empty() {
if epoch >= 1000 {
error!("Planning timeout: still planning after {} epochs, giving up", epoch);
return Err(format!("Still planning after {} epochs, giving up", epoch));
}
info!("Planning epoch {} with {} unhandled refs", epoch, unhandled_refs.len());
// Resolve jobs for all unhandled refs
let unhandled_refs_list: Vec<String> = unhandled_refs.iter().cloned().collect();
let job_refs = resolve(&unhandled_refs_list)?;
// Configure jobs in parallel
let new_nodes = configure_parallel(job_refs.clone(), num_workers)?;
// Remove handled refs
for (_, produced_refs) in job_refs {
for ref_str in produced_refs {
unhandled_refs.remove(&ref_str);
}
}
if !unhandled_refs.is_empty() {
error!("Error: Still have unhandled refs after configuration phase: {:?}", unhandled_refs);
return Err(format!("Should have no unhandled refs after configuration phase, but had: {:?}", unhandled_refs));
}
epoch += 1;
// Add new nodes to the graph
nodes.extend(new_nodes.clone());
info!("Planning epoch {} completed: added {} new nodes, total nodes: {}", epoch, new_nodes.len(), nodes.len());
// Plan next epoch
let mut new_unhandled_count = 0;
for task in &new_nodes {
for input in &task.config.as_ref().unwrap().inputs {
if input.dep_type == 1 { // MATERIALIZE = 1
if !unhandled_refs.contains(&input.partition_ref.as_ref().unwrap().str) {
new_unhandled_count += 1;
}
unhandled_refs.insert(input.partition_ref.as_ref().unwrap().str.clone());
}
}
}
if new_unhandled_count > 0 {
info!("Added {} new unhandled refs for next planning epoch", new_unhandled_count);
}
}
if !nodes.is_empty() {
info!("Planning complete: created graph with {} nodes for {} output refs", nodes.len(), output_refs.len());
// Log analysis completion event
if let Some(ref event_log) = build_event_log {
let event = create_build_event(
build_request_id.to_string(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestAnalysisCompleted as i32,
requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(),
message: format!("Analysis completed successfully, {} tasks planned", nodes.len()),
})
);
if let Err(e) = event_log.append_event(event).await {
error!("Failed to log analysis completion event: {}", e);
}
}
Ok(JobGraph {
label: Some(GraphLabel { label: "analyzed_graph".to_string() }),
outputs: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(),
nodes,
})
} else {
error!("Planning failed: no nodes created for output refs {:?}", output_refs);
// Log planning failure
if let Some(ref event_log) = build_event_log {
let event = create_build_event(
build_request_id.to_string(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestFailed as i32,
requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(),
message: "No jobs found for requested partitions".to_string(),
})
);
if let Err(e) = event_log.append_event(event).await {
error!("Failed to log failure event: {}", e);
}
}
Err("Unknown failure in graph planning".to_string())
}
}
// Generate a Mermaid flowchart diagram from a job graph
fn generate_mermaid_diagram(graph: &JobGraph) -> String {
// Start the mermaid flowchart
let mut mermaid = String::from("flowchart TD\n");
// Track nodes we've already added to avoid duplicates
let mut added_nodes = HashSet::new();
let mut added_refs = HashSet::new();
// Map to track which refs are outputs (to highlight them)
let mut is_output_ref = HashSet::new();
for ref_str in &graph.outputs {
is_output_ref.insert(ref_str.str.clone());
}
// Process each task in the graph
for task in &graph.nodes {
// Create a unique ID for this job+outputs combination
let outputs_strs: Vec<String> = task.config.as_ref().unwrap().outputs.iter().map(|o| o.str.clone()).collect();
let outputs_key = outputs_strs.join("_");
let mut job_node_id = format!("job_{}", task.job.as_ref().unwrap().label.replace("//", "_"));
job_node_id = job_node_id.replace(":", "_").replace("=", "_").replace("?", "_").replace(" ", "_");
job_node_id = format!("{}_{}", job_node_id, outputs_key.replace("/", "_").replace("=", "_"));
// Create a descriptive label that includes both job label and outputs
let job_label = &task.job.as_ref().unwrap().label;
let outputs_label = if !task.config.as_ref().unwrap().outputs.is_empty() {
if task.config.as_ref().unwrap().outputs.len() == 1 {
format!(" [{}]", task.config.as_ref().unwrap().outputs[0].str)
} else {
format!(" [{}, ...]", task.config.as_ref().unwrap().outputs[0].str)
}
} else {
String::new()
};
// Add the job node if not already added
if !added_nodes.contains(&job_node_id) {
// Represent job as a process shape with escaped label
mermaid.push_str(&format!(
" {}[\"`**{}** {}`\"]:::job\n",
job_node_id,
job_label,
outputs_label
));
added_nodes.insert(job_node_id.clone());
}
// Process inputs (dependencies)
for input in &task.config.as_ref().unwrap().inputs {
let ref_node_id = format!("ref_{}", input.partition_ref.as_ref().unwrap().str.replace("/", "_").replace("=", "_"));
// Add the partition ref node if not already added
if !added_refs.contains(&ref_node_id) {
let node_class = if is_output_ref.contains(&input.partition_ref.as_ref().unwrap().str) {
"outputPartition"
} else {
"partition"
};
// Represent partition as a cylinder
mermaid.push_str(&format!(
" {}[(\"{}\")]:::{}\n",
ref_node_id,
input.partition_ref.as_ref().unwrap().str.replace("/", "_").replace("=", "_"),
node_class
));
added_refs.insert(ref_node_id.clone());
}
// Add the edge from input to job
if input.dep_type == 1 { // MATERIALIZE = 1
// Solid line for materialize dependencies
mermaid.push_str(&format!(" {} --> {}\n", ref_node_id, job_node_id));
} else {
// Dashed line for query dependencies
mermaid.push_str(&format!(" {} -.-> {}\n", ref_node_id, job_node_id));
}
}
// Process outputs
for output in &task.config.as_ref().unwrap().outputs {
let ref_node_id = format!("ref_{}", output.str.replace("/", "_").replace("=", "_"));
// Add the partition ref node if not already added
if !added_refs.contains(&ref_node_id) {
let node_class = if is_output_ref.contains(&output.str) {
"outputPartition"
} else {
"partition"
};
// Represent partition as a cylinder
mermaid.push_str(&format!(
" {}[(\"Partition: {}\")]:::{}\n",
ref_node_id,
output.str,
node_class
));
added_refs.insert(ref_node_id.clone());
}
// Add the edge from job to output
mermaid.push_str(&format!(" {} --> {}\n", job_node_id, ref_node_id));
}
}
// Add styling
mermaid.push_str("\n %% Styling\n");
mermaid.push_str(" classDef job fill:#f9f,stroke:#333,stroke-width:1px;\n");
mermaid.push_str(" classDef partition fill:#bbf,stroke:#333,stroke-width:1px;\n");
mermaid.push_str(" classDef outputPartition fill:#bfb,stroke:#333,stroke-width:2px;\n");
mermaid
}
#[tokio::main]
async fn main() {
// Initialize logger
SimpleLogger::new().init().unwrap();
let mode = env::var("DATABUILD_MODE").unwrap_or_else(|_| "unknown".to_string());
info!("Starting analyze.rs in mode: {}", mode);
// Parse command line arguments (only for partition references)
let matches = ClapCommand::new("analyze")
.version("1.0")
.about("DataBuild graph analysis tool")
.arg(
Arg::new("partitions")
.help("Partition references to analyze")
.required(false)
.num_args(0..)
.value_name("PARTITIONS")
)
.get_matches();
let args: Vec<String> = matches.get_many::<String>("partitions")
.unwrap_or_default()
.cloned()
.collect();
// Validate arguments based on mode
match mode.as_str() {
"plan" | "mermaid" => {
if args.is_empty() {
error!("Error: Partition references are required for {} mode", mode);
eprintln!("Error: Partition references are required for {} mode", mode);
exit(1);
}
}
"import_test" => {
// No partition arguments needed for test mode
}
_ => {
// Unknown mode, will be handled later
}
}
// Get build event log configuration from environment variables
let build_event_log_uri = env::var("DATABUILD_BUILD_EVENT_LOG").ok();
let build_request_id = env::var("DATABUILD_BUILD_REQUEST_ID")
.unwrap_or_else(|_| Uuid::new_v4().to_string());
// Initialize build event log if provided
let build_event_log = if let Some(uri) = build_event_log_uri {
match create_build_event_log(&uri).await {
Ok(log) => {
info!("Initialized build event log: {}", uri);
Some(log)
}
Err(e) => {
error!("Failed to initialize build event log {}: {}", uri, e);
exit(1);
}
}
} else {
None
};
match mode.as_str() {
"plan" => {
// Get output refs from command line arguments
match plan(&args, build_event_log, &build_request_id).await {
Ok(graph) => {
// Output the job graph as JSON
match serde_json::to_string(&graph) {
Ok(json_data) => {
info!("Successfully generated job graph with {} nodes", graph.nodes.len());
println!("{}", json_data);
}
Err(e) => {
error!("Error marshaling job graph: {}", e);
eprintln!("Error marshaling job graph: {}", e);
exit(1);
}
}
}
Err(e) => {
eprintln!("Error: {}", e);
exit(1);
}
}
}
"lookup" => {
// Get output refs from command line arguments
match resolve(&args) {
Ok(result) => {
// Output the result as JSON
match serde_json::to_string(&result) {
Ok(json_data) => {
info!("Successfully completed lookup for {} output refs with {} job mappings", args.len(), result.len());
println!("{}", json_data);
}
Err(e) => {
error!("Error marshaling lookup result: {}", e);
eprintln!("Error marshaling lookup result: {}", e);
exit(1);
}
}
}
Err(e) => {
eprintln!("Error: {}", e);
exit(1);
}
}
}
"mermaid" => {
// Get output refs from command line arguments
match plan(&args, None, &build_request_id).await {
Ok(graph) => {
// Generate and output the mermaid diagram
let mermaid_diagram = generate_mermaid_diagram(&graph);
println!("{}", mermaid_diagram);
info!("Successfully generated mermaid diagram for {} nodes", graph.nodes.len());
}
Err(e) => {
eprintln!("Error: {}", e);
exit(1);
}
}
}
"import_test" => {
info!("Running in import_test mode");
println!("ok :)");
info!("Import test completed successfully");
}
_ => {
error!("Error: Unknown mode '{}'", mode);
eprintln!("Unknown MODE `{}`", mode);
exit(1);
}
}
}