460 lines
No EOL
17 KiB
Rust
460 lines
No EOL
17 KiB
Rust
use std::collections::{HashMap, HashSet};
|
|
use std::env;
|
|
use std::process::{Command, exit};
|
|
use std::sync::{Arc, Mutex};
|
|
use std::thread;
|
|
use log::{info, error};
|
|
use simple_logger::SimpleLogger;
|
|
use structs::*;
|
|
|
|
// Configure a job to produce the desired outputs
|
|
fn configure(job_label: &str, output_refs: &[String]) -> Result<Vec<Task>, String> {
|
|
let candidate_jobs_str = env::var("DATABUILD_CANDIDATE_JOBS")
|
|
.map_err(|e| format!("Failed to get DATABUILD_CANDIDATE_JOBS: {}", e))?;
|
|
|
|
let job_path_map: HashMap<String, String> = serde_json::from_str(&candidate_jobs_str)
|
|
.map_err(|e| format!("Failed to parse DATABUILD_CANDIDATE_JOBS: {}", e))?;
|
|
|
|
// Look up the executable path for this job
|
|
let exec_path = job_path_map.get(job_label)
|
|
.ok_or_else(|| format!("Job {} is not a candidate job", job_label))?;
|
|
|
|
// Check if executable exists
|
|
if !std::path::Path::new(exec_path).exists() {
|
|
return Err(format!("Executable not found at path: {}", exec_path));
|
|
}
|
|
|
|
info!("Executing job configuration: {} {:?}", exec_path, output_refs);
|
|
|
|
// Execute the job configuration command
|
|
let output = Command::new(exec_path)
|
|
.args(output_refs)
|
|
.output()
|
|
.map_err(|e| format!("Failed to execute job config: {}", e))?;
|
|
|
|
if !output.status.success() {
|
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
|
error!("Job configuration failed: {}", stderr);
|
|
return Err(format!("Failed to run job config: {}", stderr));
|
|
}
|
|
|
|
info!("Job configuration succeeded for {}", job_label);
|
|
|
|
// Parse the job configurations
|
|
let stdout = String::from_utf8_lossy(&output.stdout);
|
|
let job_configure_response: JobConfigureResponse = serde_json::from_str(&stdout)
|
|
.map_err(|e| {
|
|
error!("Error parsing job configs for {}: {}. `{}`", job_label, e, stdout);
|
|
format!("Failed to parse job configs: {}", e)
|
|
})?;
|
|
let job_configs = job_configure_response.configs;
|
|
|
|
// Create tasks
|
|
let tasks: Vec<Task> = job_configs.into_iter()
|
|
.map(|cfg| Task {
|
|
job_label: job_label.to_string(),
|
|
config: cfg,
|
|
})
|
|
.collect();
|
|
|
|
info!("Created {} tasks for job {}", tasks.len(), job_label);
|
|
Ok(tasks)
|
|
}
|
|
|
|
// Resolve produces a mapping of required job refs to the partitions it produces
|
|
fn resolve(output_refs: &[String]) -> Result<HashMap<String, Vec<String>>, String> {
|
|
let lookup_path = env::var("DATABUILD_JOB_LOOKUP_PATH")
|
|
.map_err(|e| format!("Failed to get DATABUILD_JOB_LOOKUP_PATH: {}", e))?;
|
|
|
|
// Run the job lookup
|
|
info!("Executing job lookup: {} {:?}", lookup_path, output_refs);
|
|
|
|
let output = Command::new(&lookup_path)
|
|
.args(output_refs)
|
|
.output()
|
|
.map_err(|e| format!("Failed to execute job lookup: {}", e))?;
|
|
|
|
if !output.status.success() {
|
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
|
error!("Job lookup failed: {}", stderr);
|
|
return Err(format!("Failed to run job lookup: {}", stderr));
|
|
}
|
|
|
|
info!("Job lookup succeeded for {} output refs", output_refs.len());
|
|
|
|
// Parse the result
|
|
let stdout = String::from_utf8_lossy(&output.stdout);
|
|
let result: HashMap<String, Vec<String>> = serde_json::from_str(&stdout)
|
|
.map_err(|e| {
|
|
error!("Error parsing job lookup result: {}", e);
|
|
format!("Failed to parse job lookup result: {}", e)
|
|
})?;
|
|
|
|
info!("Job lookup found {} job mappings", result.len());
|
|
for (job, refs) in &result {
|
|
info!(" Job {} produces {} refs", job, refs.len());
|
|
}
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
// Configure multiple jobs in parallel
|
|
fn configure_parallel(job_refs: HashMap<String, Vec<String>>, num_workers: usize) -> Result<Vec<Task>, String> {
|
|
// Create a channel for jobs
|
|
let (job_sender, job_receiver) = crossbeam_channel::unbounded();
|
|
|
|
// Fill the jobs channel
|
|
for (job_label, produced_refs) in job_refs {
|
|
job_sender.send((job_label, produced_refs)).unwrap();
|
|
}
|
|
drop(job_sender); // Close the channel
|
|
|
|
// Create a channel for results
|
|
let (task_sender, task_receiver) = crossbeam_channel::unbounded();
|
|
let error = Arc::new(Mutex::new(None));
|
|
|
|
// Spawn worker threads
|
|
let mut handles = vec![];
|
|
for _ in 0..num_workers {
|
|
let job_receiver = job_receiver.clone();
|
|
let task_sender = task_sender.clone();
|
|
let error = Arc::clone(&error);
|
|
|
|
let handle = thread::spawn(move || {
|
|
for (job_label, produced_refs) in job_receiver {
|
|
// Check if an error has already occurred
|
|
if error.lock().unwrap().is_some() {
|
|
return;
|
|
}
|
|
|
|
match configure(&job_label, &produced_refs) {
|
|
Ok(tasks) => {
|
|
task_sender.send(tasks).unwrap();
|
|
}
|
|
Err(e) => {
|
|
let mut error_guard = error.lock().unwrap();
|
|
if error_guard.is_none() {
|
|
*error_guard = Some(e);
|
|
}
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
handles.push(handle);
|
|
}
|
|
|
|
// Close the task sender
|
|
drop(task_sender);
|
|
|
|
// Wait for all workers to finish
|
|
for handle in handles {
|
|
handle.join().unwrap();
|
|
}
|
|
|
|
// Check for errors
|
|
let error_guard = error.lock().unwrap();
|
|
if let Some(e) = &*error_guard {
|
|
return Err(e.clone());
|
|
}
|
|
|
|
// Collect results
|
|
let mut all_tasks = Vec::new();
|
|
while let Ok(tasks) = task_receiver.try_recv() {
|
|
all_tasks.extend(tasks);
|
|
}
|
|
|
|
Ok(all_tasks)
|
|
}
|
|
|
|
// Plan creates a job graph for given output references
|
|
fn plan(output_refs: &[String]) -> Result<JobGraph, String> {
|
|
info!("Starting planning for {} output refs: {:?}", output_refs.len(), output_refs);
|
|
|
|
let mut unhandled_refs = HashSet::new();
|
|
for ref_str in output_refs {
|
|
unhandled_refs.insert(ref_str.clone());
|
|
}
|
|
|
|
let mut epoch = 0;
|
|
let mut nodes = Vec::new();
|
|
|
|
// Determine the number of workers based on available CPU cores or environment variable
|
|
let mut num_workers = num_cpus::get();
|
|
if let Ok(worker_env) = env::var("DATABUILD_PARALLEL_WORKERS") {
|
|
if let Ok(parsed_workers) = worker_env.parse::<usize>() {
|
|
if parsed_workers < 1 {
|
|
num_workers = 1;
|
|
info!("Warning: DATABUILD_PARALLEL_WORKERS must be at least 1, using: {}", num_workers);
|
|
} else {
|
|
num_workers = parsed_workers;
|
|
}
|
|
} else {
|
|
info!("Warning: Invalid DATABUILD_PARALLEL_WORKERS value '{}', using default: {}", worker_env, num_workers);
|
|
}
|
|
}
|
|
info!("Using {} workers for parallel execution", num_workers);
|
|
|
|
while !unhandled_refs.is_empty() {
|
|
if epoch >= 1000 {
|
|
error!("Planning timeout: still planning after {} epochs, giving up", epoch);
|
|
return Err(format!("Still planning after {} epochs, giving up", epoch));
|
|
}
|
|
|
|
info!("Planning epoch {} with {} unhandled refs", epoch, unhandled_refs.len());
|
|
|
|
// Resolve jobs for all unhandled refs
|
|
let unhandled_refs_list: Vec<String> = unhandled_refs.iter().cloned().collect();
|
|
let job_refs = resolve(&unhandled_refs_list)?;
|
|
|
|
// Configure jobs in parallel
|
|
let new_nodes = configure_parallel(job_refs.clone(), num_workers)?;
|
|
|
|
// Remove handled refs
|
|
for (_, produced_refs) in job_refs {
|
|
for ref_str in produced_refs {
|
|
unhandled_refs.remove(&ref_str);
|
|
}
|
|
}
|
|
|
|
if !unhandled_refs.is_empty() {
|
|
error!("Error: Still have unhandled refs after configuration phase: {:?}", unhandled_refs);
|
|
return Err(format!("Should have no unhandled refs after configuration phase, but had: {:?}", unhandled_refs));
|
|
}
|
|
|
|
epoch += 1;
|
|
|
|
// Add new nodes to the graph
|
|
nodes.extend(new_nodes.clone());
|
|
info!("Planning epoch {} completed: added {} new nodes, total nodes: {}", epoch, new_nodes.len(), nodes.len());
|
|
|
|
// Plan next epoch
|
|
let mut new_unhandled_count = 0;
|
|
for task in &new_nodes {
|
|
for input in &task.config.inputs {
|
|
if input.dep_type == 1 { // MATERIALIZE = 1
|
|
if !unhandled_refs.contains(&input.partition_ref.str) {
|
|
new_unhandled_count += 1;
|
|
}
|
|
unhandled_refs.insert(input.partition_ref.str.clone());
|
|
}
|
|
}
|
|
}
|
|
|
|
if new_unhandled_count > 0 {
|
|
info!("Added {} new unhandled refs for next planning epoch", new_unhandled_count);
|
|
}
|
|
}
|
|
|
|
if !nodes.is_empty() {
|
|
info!("Planning complete: created graph with {} nodes for {} output refs", nodes.len(), output_refs.len());
|
|
Ok(JobGraph {
|
|
outputs: output_refs.to_vec(),
|
|
nodes,
|
|
})
|
|
} else {
|
|
error!("Planning failed: no nodes created for output refs {:?}", output_refs);
|
|
Err("Unknown failure in graph planning".to_string())
|
|
}
|
|
}
|
|
|
|
// Generate a Mermaid flowchart diagram from a job graph
|
|
fn generate_mermaid_diagram(graph: &JobGraph) -> String {
|
|
// Start the mermaid flowchart
|
|
let mut mermaid = String::from("flowchart TD\n");
|
|
|
|
// Track nodes we've already added to avoid duplicates
|
|
let mut added_nodes = HashSet::new();
|
|
let mut added_refs = HashSet::new();
|
|
|
|
// Map to track which refs are outputs (to highlight them)
|
|
let mut is_output_ref = HashSet::new();
|
|
for ref_str in &graph.outputs {
|
|
is_output_ref.insert(ref_str.clone());
|
|
}
|
|
|
|
// Process each task in the graph
|
|
for task in &graph.nodes {
|
|
// Create a unique ID for this job+outputs combination
|
|
let outputs_strs: Vec<String> = task.config.outputs.iter().map(|o| o.str.clone()).collect();
|
|
let outputs_key = outputs_strs.join("_");
|
|
let mut job_node_id = format!("job_{}", task.job_label.replace("//", "_"));
|
|
job_node_id = job_node_id.replace(":", "_").replace("=", "_").replace("?", "_").replace(" ", "_");
|
|
job_node_id = format!("{}_{}", job_node_id, outputs_key.replace("/", "_").replace("=", "_"));
|
|
|
|
// Create a descriptive label that includes both job label and outputs
|
|
let job_label = &task.job_label;
|
|
let outputs_label = if !task.config.outputs.is_empty() {
|
|
if task.config.outputs.len() == 1 {
|
|
format!(" [{}]", task.config.outputs[0].str)
|
|
} else {
|
|
format!(" [{}, ...]", task.config.outputs[0].str)
|
|
}
|
|
} else {
|
|
String::new()
|
|
};
|
|
|
|
// Add the job node if not already added
|
|
if !added_nodes.contains(&job_node_id) {
|
|
// Represent job as a process shape with escaped label
|
|
mermaid.push_str(&format!(
|
|
" {}[\"`**{}** {}`\"]:::job\n",
|
|
job_node_id,
|
|
job_label,
|
|
outputs_label
|
|
));
|
|
added_nodes.insert(job_node_id.clone());
|
|
}
|
|
|
|
// Process inputs (dependencies)
|
|
for input in &task.config.inputs {
|
|
let ref_node_id = format!("ref_{}", input.partition_ref.str.replace("/", "_").replace("=", "_"));
|
|
|
|
// Add the partition ref node if not already added
|
|
if !added_refs.contains(&ref_node_id) {
|
|
let node_class = if is_output_ref.contains(&input.partition_ref.str) {
|
|
"outputPartition"
|
|
} else {
|
|
"partition"
|
|
};
|
|
|
|
// Represent partition as a cylinder
|
|
mermaid.push_str(&format!(
|
|
" {}[(\"{}\")]:::{}\n",
|
|
ref_node_id,
|
|
input.partition_ref.str.replace("/", "_").replace("=", "_"),
|
|
node_class
|
|
));
|
|
added_refs.insert(ref_node_id.clone());
|
|
}
|
|
|
|
// Add the edge from input to job
|
|
if input.dep_type == 1 { // MATERIALIZE = 1
|
|
// Solid line for materialize dependencies
|
|
mermaid.push_str(&format!(" {} --> {}\n", ref_node_id, job_node_id));
|
|
} else {
|
|
// Dashed line for query dependencies
|
|
mermaid.push_str(&format!(" {} -.-> {}\n", ref_node_id, job_node_id));
|
|
}
|
|
}
|
|
|
|
// Process outputs
|
|
for output in &task.config.outputs {
|
|
let ref_node_id = format!("ref_{}", output.str.replace("/", "_").replace("=", "_"));
|
|
|
|
// Add the partition ref node if not already added
|
|
if !added_refs.contains(&ref_node_id) {
|
|
let node_class = if is_output_ref.contains(&output.str) {
|
|
"outputPartition"
|
|
} else {
|
|
"partition"
|
|
};
|
|
|
|
// Represent partition as a cylinder
|
|
mermaid.push_str(&format!(
|
|
" {}[(\"Partition: {}\")]:::{}\n",
|
|
ref_node_id,
|
|
output.str,
|
|
node_class
|
|
));
|
|
added_refs.insert(ref_node_id.clone());
|
|
}
|
|
|
|
// Add the edge from job to output
|
|
mermaid.push_str(&format!(" {} --> {}\n", job_node_id, ref_node_id));
|
|
}
|
|
}
|
|
|
|
// Add styling
|
|
mermaid.push_str("\n %% Styling\n");
|
|
mermaid.push_str(" classDef job fill:#f9f,stroke:#333,stroke-width:1px;\n");
|
|
mermaid.push_str(" classDef partition fill:#bbf,stroke:#333,stroke-width:1px;\n");
|
|
mermaid.push_str(" classDef outputPartition fill:#bfb,stroke:#333,stroke-width:2px;\n");
|
|
|
|
mermaid
|
|
}
|
|
|
|
fn main() {
|
|
// Initialize logger
|
|
SimpleLogger::new().init().unwrap();
|
|
|
|
let mode = env::var("DATABUILD_MODE").unwrap_or_else(|_| "unknown".to_string());
|
|
info!("Starting analyze.rs in mode: {}", mode);
|
|
|
|
let args: Vec<String> = env::args().skip(1).collect();
|
|
|
|
match mode.as_str() {
|
|
"plan" => {
|
|
// Get output refs from command line arguments
|
|
match plan(&args) {
|
|
Ok(graph) => {
|
|
// Output the job graph as JSON
|
|
match serde_json::to_string(&graph) {
|
|
Ok(json_data) => {
|
|
info!("Successfully generated job graph with {} nodes", graph.nodes.len());
|
|
println!("{}", json_data);
|
|
}
|
|
Err(e) => {
|
|
error!("Error marshaling job graph: {}", e);
|
|
eprintln!("Error marshaling job graph: {}", e);
|
|
exit(1);
|
|
}
|
|
}
|
|
}
|
|
Err(e) => {
|
|
eprintln!("Error: {}", e);
|
|
exit(1);
|
|
}
|
|
}
|
|
}
|
|
"lookup" => {
|
|
// Get output refs from command line arguments
|
|
match resolve(&args) {
|
|
Ok(result) => {
|
|
// Output the result as JSON
|
|
match serde_json::to_string(&result) {
|
|
Ok(json_data) => {
|
|
info!("Successfully completed lookup for {} output refs with {} job mappings", args.len(), result.len());
|
|
println!("{}", json_data);
|
|
}
|
|
Err(e) => {
|
|
error!("Error marshaling lookup result: {}", e);
|
|
eprintln!("Error marshaling lookup result: {}", e);
|
|
exit(1);
|
|
}
|
|
}
|
|
}
|
|
Err(e) => {
|
|
eprintln!("Error: {}", e);
|
|
exit(1);
|
|
}
|
|
}
|
|
}
|
|
"mermaid" => {
|
|
// Get output refs from command line arguments
|
|
match plan(&args) {
|
|
Ok(graph) => {
|
|
// Generate and output the mermaid diagram
|
|
let mermaid_diagram = generate_mermaid_diagram(&graph);
|
|
println!("{}", mermaid_diagram);
|
|
info!("Successfully generated mermaid diagram for {} nodes", graph.nodes.len());
|
|
}
|
|
Err(e) => {
|
|
eprintln!("Error: {}", e);
|
|
exit(1);
|
|
}
|
|
}
|
|
}
|
|
"import_test" => {
|
|
info!("Running in import_test mode");
|
|
println!("ok :)");
|
|
info!("Import test completed successfully");
|
|
}
|
|
_ => {
|
|
error!("Error: Unknown mode '{}'", mode);
|
|
eprintln!("Unknown MODE `{}`", mode);
|
|
exit(1);
|
|
}
|
|
}
|
|
} |