use std::collections::{HashMap, HashSet}; use std::env; use std::process::{Command, exit}; use std::sync::{Arc, Mutex}; use std::thread; use log::{info, error}; use simple_logger::SimpleLogger; use structs::*; // Configure a job to produce the desired outputs fn configure(job_label: &str, output_refs: &[String]) -> Result, String> { let candidate_jobs_str = env::var("DATABUILD_CANDIDATE_JOBS") .map_err(|e| format!("Failed to get DATABUILD_CANDIDATE_JOBS: {}", e))?; let job_path_map: HashMap = serde_json::from_str(&candidate_jobs_str) .map_err(|e| format!("Failed to parse DATABUILD_CANDIDATE_JOBS: {}", e))?; // Look up the executable path for this job let exec_path = job_path_map.get(job_label) .ok_or_else(|| format!("Job {} is not a candidate job", job_label))?; // Check if executable exists if !std::path::Path::new(exec_path).exists() { return Err(format!("Executable not found at path: {}", exec_path)); } info!("Executing job configuration: {} {:?}", exec_path, output_refs); // Execute the job configuration command let output = Command::new(exec_path) .args(output_refs) .output() .map_err(|e| format!("Failed to execute job config: {}", e))?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); error!("Job configuration failed: {}", stderr); return Err(format!("Failed to run job config: {}", stderr)); } info!("Job configuration succeeded for {}", job_label); // Parse the job configurations let stdout = String::from_utf8_lossy(&output.stdout); let job_configs: Vec = serde_json::from_str(&stdout) .map_err(|e| { error!("Error parsing job configs for {}: {}. `{}`", job_label, e, stdout); format!("Failed to parse job configs: {}", e) })?; // Create tasks let tasks: Vec = job_configs.into_iter() .map(|cfg| Task { job_label: job_label.to_string(), config: cfg, }) .collect(); info!("Created {} tasks for job {}", tasks.len(), job_label); Ok(tasks) } // Resolve produces a mapping of required job refs to the partitions it produces fn resolve(output_refs: &[String]) -> Result>, String> { let lookup_path = env::var("DATABUILD_JOB_LOOKUP_PATH") .map_err(|e| format!("Failed to get DATABUILD_JOB_LOOKUP_PATH: {}", e))?; // Run the job lookup info!("Executing job lookup: {} {:?}", lookup_path, output_refs); let output = Command::new(&lookup_path) .args(output_refs) .output() .map_err(|e| format!("Failed to execute job lookup: {}", e))?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); error!("Job lookup failed: {}", stderr); return Err(format!("Failed to run job lookup: {}", stderr)); } info!("Job lookup succeeded for {} output refs", output_refs.len()); // Parse the result let stdout = String::from_utf8_lossy(&output.stdout); let result: HashMap> = serde_json::from_str(&stdout) .map_err(|e| { error!("Error parsing job lookup result: {}", e); format!("Failed to parse job lookup result: {}", e) })?; info!("Job lookup found {} job mappings", result.len()); for (job, refs) in &result { info!(" Job {} produces {} refs", job, refs.len()); } Ok(result) } // Configure multiple jobs in parallel fn configure_parallel(job_refs: HashMap>, num_workers: usize) -> Result, String> { // Create a channel for jobs let (job_sender, job_receiver) = crossbeam_channel::unbounded(); // Fill the jobs channel for (job_label, produced_refs) in job_refs { job_sender.send((job_label, produced_refs)).unwrap(); } drop(job_sender); // Close the channel // Create a channel for results let (task_sender, task_receiver) = crossbeam_channel::unbounded(); let error = Arc::new(Mutex::new(None)); // Spawn worker threads let mut handles = vec![]; for _ in 0..num_workers { let job_receiver = job_receiver.clone(); let task_sender = task_sender.clone(); let error = Arc::clone(&error); let handle = thread::spawn(move || { for (job_label, produced_refs) in job_receiver { // Check if an error has already occurred if error.lock().unwrap().is_some() { return; } match configure(&job_label, &produced_refs) { Ok(tasks) => { task_sender.send(tasks).unwrap(); } Err(e) => { let mut error_guard = error.lock().unwrap(); if error_guard.is_none() { *error_guard = Some(e); } return; } } } }); handles.push(handle); } // Close the task sender drop(task_sender); // Wait for all workers to finish for handle in handles { handle.join().unwrap(); } // Check for errors let error_guard = error.lock().unwrap(); if let Some(e) = &*error_guard { return Err(e.clone()); } // Collect results let mut all_tasks = Vec::new(); while let Ok(tasks) = task_receiver.try_recv() { all_tasks.extend(tasks); } Ok(all_tasks) } // Plan creates a job graph for given output references fn plan(output_refs: &[String]) -> Result { info!("Starting planning for {} output refs: {:?}", output_refs.len(), output_refs); let mut unhandled_refs = HashSet::new(); for ref_str in output_refs { unhandled_refs.insert(ref_str.clone()); } let mut epoch = 0; let mut nodes = Vec::new(); // Determine the number of workers based on available CPU cores or environment variable let mut num_workers = num_cpus::get(); if let Ok(worker_env) = env::var("DATABUILD_PARALLEL_WORKERS") { if let Ok(parsed_workers) = worker_env.parse::() { if parsed_workers < 1 { num_workers = 1; info!("Warning: DATABUILD_PARALLEL_WORKERS must be at least 1, using: {}", num_workers); } else { num_workers = parsed_workers; } } else { info!("Warning: Invalid DATABUILD_PARALLEL_WORKERS value '{}', using default: {}", worker_env, num_workers); } } info!("Using {} workers for parallel execution", num_workers); while !unhandled_refs.is_empty() { if epoch >= 1000 { error!("Planning timeout: still planning after {} epochs, giving up", epoch); return Err(format!("Still planning after {} epochs, giving up", epoch)); } info!("Planning epoch {} with {} unhandled refs", epoch, unhandled_refs.len()); // Resolve jobs for all unhandled refs let unhandled_refs_list: Vec = unhandled_refs.iter().cloned().collect(); let job_refs = resolve(&unhandled_refs_list)?; // Configure jobs in parallel let new_nodes = configure_parallel(job_refs.clone(), num_workers)?; // Remove handled refs for (_, produced_refs) in job_refs { for ref_str in produced_refs { unhandled_refs.remove(&ref_str); } } if !unhandled_refs.is_empty() { error!("Error: Still have unhandled refs after configuration phase: {:?}", unhandled_refs); return Err(format!("Should have no unhandled refs after configuration phase, but had: {:?}", unhandled_refs)); } epoch += 1; // Add new nodes to the graph nodes.extend(new_nodes.clone()); info!("Planning epoch {} completed: added {} new nodes, total nodes: {}", epoch, new_nodes.len(), nodes.len()); // Plan next epoch let mut new_unhandled_count = 0; for task in &new_nodes { for input in &task.config.inputs { if input.dep_type == DataDepType::Materialize { if !unhandled_refs.contains(&input.reference) { new_unhandled_count += 1; } unhandled_refs.insert(input.reference.clone()); } } } if new_unhandled_count > 0 { info!("Added {} new unhandled refs for next planning epoch", new_unhandled_count); } } if !nodes.is_empty() { info!("Planning complete: created graph with {} nodes for {} output refs", nodes.len(), output_refs.len()); Ok(JobGraph { outputs: output_refs.to_vec(), nodes, }) } else { error!("Planning failed: no nodes created for output refs {:?}", output_refs); Err("Unknown failure in graph planning".to_string()) } } // Generate a Mermaid flowchart diagram from a job graph fn generate_mermaid_diagram(graph: &JobGraph) -> String { // Start the mermaid flowchart let mut mermaid = String::from("flowchart TD\n"); // Track nodes we've already added to avoid duplicates let mut added_nodes = HashSet::new(); let mut added_refs = HashSet::new(); // Map to track which refs are outputs (to highlight them) let mut is_output_ref = HashSet::new(); for ref_str in &graph.outputs { is_output_ref.insert(ref_str.clone()); } // Process each task in the graph for task in &graph.nodes { // Create a unique ID for this job+outputs combination let outputs_key = task.config.outputs.join("_"); let mut job_node_id = format!("job_{}", task.job_label.replace("//", "_")); job_node_id = job_node_id.replace(":", "_"); job_node_id = format!("{}_{}", job_node_id, outputs_key.replace("/", "_")); // Create a descriptive label that includes both job label and outputs let job_label = &task.job_label; let outputs_label = if !task.config.outputs.is_empty() { if task.config.outputs.len() == 1 { format!(" [{}]", task.config.outputs[0]) } else { format!(" [{}, ...]", task.config.outputs[0]) } } else { String::new() }; // Add the job node if not already added if !added_nodes.contains(&job_node_id) { // Represent job as a process shape with escaped label mermaid.push_str(&format!( " {}[\"`**{}** {}`\"]:::job\n", job_node_id, job_label, outputs_label )); added_nodes.insert(job_node_id.clone()); } // Process inputs (dependencies) for input in &task.config.inputs { let ref_node_id = format!("ref_{}", input.reference.replace("/", "_")); // Add the partition ref node if not already added if !added_refs.contains(&ref_node_id) { let node_class = if is_output_ref.contains(&input.reference) { "outputPartition" } else { "partition" }; // Represent partition as a cylinder mermaid.push_str(&format!( " {}[(\"{}\")]:::{}\n", ref_node_id, input.reference, node_class )); added_refs.insert(ref_node_id.clone()); } // Add the edge from input to job if input.dep_type == DataDepType::Materialize { // Solid line for materialize dependencies mermaid.push_str(&format!(" {} --> {}\n", ref_node_id, job_node_id)); } else { // Dashed line for query dependencies mermaid.push_str(&format!(" {} -.-> {}\n", ref_node_id, job_node_id)); } } // Process outputs for output in &task.config.outputs { let ref_node_id = format!("ref_{}", output.replace("/", "_")); // Add the partition ref node if not already added if !added_refs.contains(&ref_node_id) { let node_class = if is_output_ref.contains(output) { "outputPartition" } else { "partition" }; // Represent partition as a cylinder mermaid.push_str(&format!( " {}[(\"Partition: {}\")]:::{}\n", ref_node_id, output, node_class )); added_refs.insert(ref_node_id.clone()); } // Add the edge from job to output mermaid.push_str(&format!(" {} --> {}\n", job_node_id, ref_node_id)); } } // Add styling mermaid.push_str("\n %% Styling\n"); mermaid.push_str(" classDef job fill:#f9f,stroke:#333,stroke-width:1px;\n"); mermaid.push_str(" classDef partition fill:#bbf,stroke:#333,stroke-width:1px;\n"); mermaid.push_str(" classDef outputPartition fill:#bfb,stroke:#333,stroke-width:2px;\n"); mermaid } fn main() { // Initialize logger SimpleLogger::new().init().unwrap(); let mode = env::var("DATABUILD_MODE").unwrap_or_else(|_| "unknown".to_string()); info!("Starting analyze.rs in mode: {}", mode); let args: Vec = env::args().skip(1).collect(); match mode.as_str() { "plan" => { // Get output refs from command line arguments match plan(&args) { Ok(graph) => { // Output the job graph as JSON match serde_json::to_string(&graph) { Ok(json_data) => { info!("Successfully generated job graph with {} nodes", graph.nodes.len()); println!("{}", json_data); } Err(e) => { error!("Error marshaling job graph: {}", e); eprintln!("Error marshaling job graph: {}", e); exit(1); } } } Err(e) => { eprintln!("Error: {}", e); exit(1); } } } "lookup" => { // Get output refs from command line arguments match resolve(&args) { Ok(result) => { // Output the result as JSON match serde_json::to_string(&result) { Ok(json_data) => { info!("Successfully completed lookup for {} output refs with {} job mappings", args.len(), result.len()); println!("{}", json_data); } Err(e) => { error!("Error marshaling lookup result: {}", e); eprintln!("Error marshaling lookup result: {}", e); exit(1); } } } Err(e) => { eprintln!("Error: {}", e); exit(1); } } } "mermaid" => { // Get output refs from command line arguments match plan(&args) { Ok(graph) => { // Generate and output the mermaid diagram let mermaid_diagram = generate_mermaid_diagram(&graph); println!("{}", mermaid_diagram); info!("Successfully generated mermaid diagram for {} nodes", graph.nodes.len()); } Err(e) => { eprintln!("Error: {}", e); exit(1); } } } "import_test" => { info!("Running in import_test mode"); println!("ok :)"); info!("Import test completed successfully"); } _ => { error!("Error: Unknown mode '{}'", mode); eprintln!("Unknown MODE `{}`", mode); exit(1); } } }