Implement rust version of graph analyze

This commit is contained in:
Stuart Axelbrooke 2025-05-04 20:17:35 -07:00
parent f9477b1717
commit b4575a68e7
No known key found for this signature in database
GPG key ID: 1B0A848C29D46A35
11 changed files with 2811 additions and 9 deletions

View file

@ -10,3 +10,15 @@ filegroup(
srcs = ["databuild.proto"],
visibility = ["//visibility:public"],
)
# Expose Cargo.toml for crate_universe
exports_files(
["Cargo.toml"],
visibility = ["//visibility:public"],
)
# Create an empty Cargo.lock file that will be generated by Bazel
exports_files(
["Cargo.lock"],
visibility = ["//visibility:public"],
)

View file

@ -9,3 +9,15 @@ bazel_dep(name = "rules_shell", version = "0.4.0")
bazel_dep(name = "rules_go", version = "0.46.0")
bazel_dep(name = "rules_oci", version = "2.2.6")
bazel_dep(name = "aspect_bazel_lib", version = "2.14.0")
bazel_dep(name = "rules_rust", version = "0.61.0")
crate = use_extension("@rules_rust//crate_universe:extensions.bzl", "crate")
crate.spec(package = "serde", features = ["derive"], version = "1.0")
crate.spec(package = "serde_json", version = "1.0")
crate.spec(package = "log", version = "0.4")
crate.spec(package = "simple_logger", version = "4.3")
crate.spec(package = "crossbeam-channel", version = "0.5")
crate.spec(package = "num_cpus", version = "1.16")
crate.spec(package = "tokio", default_features = False, features = ["macros", "net", "rt-multi-thread"], version = "1.38")
crate.from_specs()
use_repo(crate, "crates")

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -10,6 +10,8 @@
"https://bcr.bazel.build/modules/abseil-cpp/20230802.1/MODULE.bazel": "fa92e2eb41a04df73cdabeec37107316f7e5272650f81d6cc096418fe647b915",
"https://bcr.bazel.build/modules/abseil-cpp/20240116.1/MODULE.bazel": "37bcdb4440fbb61df6a1c296ae01b327f19e9bb521f9b8e26ec854b6f97309ed",
"https://bcr.bazel.build/modules/abseil-cpp/20240116.1/source.json": "9be551b8d4e3ef76875c0d744b5d6a504a27e3ae67bc6b28f46415fd2d2957da",
"https://bcr.bazel.build/modules/apple_support/1.17.1/MODULE.bazel": "655c922ab1209978a94ef6ca7d9d43e940cd97d9c172fb55f94d91ac53f8610b",
"https://bcr.bazel.build/modules/apple_support/1.17.1/source.json": "6b2b8c74d14e8d485528a938e44bdb72a5ba17632b9e14ef6e68a5ee96c8347f",
"https://bcr.bazel.build/modules/aspect_bazel_lib/2.14.0/MODULE.bazel": "2b31ffcc9bdc8295b2167e07a757dbbc9ac8906e7028e5170a3708cecaac119f",
"https://bcr.bazel.build/modules/aspect_bazel_lib/2.14.0/source.json": "0cf1826853b0bef8b5cd19c0610d717500f5521aa2b38b72b2ec302ac5e7526c",
"https://bcr.bazel.build/modules/aspect_bazel_lib/2.7.2/MODULE.bazel": "780d1a6522b28f5edb7ea09630748720721dfe27690d65a2d33aa7509de77e07",
@ -60,6 +62,7 @@
"https://bcr.bazel.build/modules/platforms/0.0.6/MODULE.bazel": "ad6eeef431dc52aefd2d77ed20a4b353f8ebf0f4ecdd26a807d2da5aa8cd0615",
"https://bcr.bazel.build/modules/platforms/0.0.7/MODULE.bazel": "72fd4a0ede9ee5c021f6a8dd92b503e089f46c227ba2813ff183b71616034814",
"https://bcr.bazel.build/modules/platforms/0.0.8/MODULE.bazel": "9f142c03e348f6d263719f5074b21ef3adf0b139ee4c5133e2aa35664da9eb2d",
"https://bcr.bazel.build/modules/platforms/0.0.9/MODULE.bazel": "4a87a60c927b56ddd67db50c89acaa62f4ce2a1d2149ccb63ffd871d5ce29ebc",
"https://bcr.bazel.build/modules/protobuf/21.7/MODULE.bazel": "a5a29bb89544f9b97edce05642fac225a808b5b7be74038ea3640fae2f8e66a7",
"https://bcr.bazel.build/modules/protobuf/27.0/MODULE.bazel": "7873b60be88844a0a1d8f80b9d5d20cfbd8495a689b8763e76c6372998d3f64c",
"https://bcr.bazel.build/modules/protobuf/27.1/MODULE.bazel": "703a7b614728bb06647f965264967a8ef1c39e09e8f167b3ca0bb1fd80449c0d",
@ -82,11 +85,12 @@
"https://bcr.bazel.build/modules/rules_cc/0.0.15/MODULE.bazel": "6704c35f7b4a72502ee81f61bf88706b54f06b3cbe5558ac17e2e14666cd5dcc",
"https://bcr.bazel.build/modules/rules_cc/0.0.16/MODULE.bazel": "7661303b8fc1b4d7f532e54e9d6565771fea666fbdf839e0a86affcd02defe87",
"https://bcr.bazel.build/modules/rules_cc/0.0.17/MODULE.bazel": "2ae1d8f4238ec67d7185d8861cb0a2cdf4bc608697c331b95bf990e69b62e64a",
"https://bcr.bazel.build/modules/rules_cc/0.0.17/source.json": "4db99b3f55c90ab28d14552aa0632533e3e8e5e9aea0f5c24ac0014282c2a7c5",
"https://bcr.bazel.build/modules/rules_cc/0.0.2/MODULE.bazel": "6915987c90970493ab97393024c156ea8fb9f3bea953b2f3ec05c34f19b5695c",
"https://bcr.bazel.build/modules/rules_cc/0.0.6/MODULE.bazel": "abf360251023dfe3efcef65ab9d56beefa8394d4176dd29529750e1c57eaa33f",
"https://bcr.bazel.build/modules/rules_cc/0.0.8/MODULE.bazel": "964c85c82cfeb6f3855e6a07054fdb159aced38e99a5eecf7bce9d53990afa3e",
"https://bcr.bazel.build/modules/rules_cc/0.0.9/MODULE.bazel": "836e76439f354b89afe6a911a7adf59a6b2518fafb174483ad78a2a2fde7b1c5",
"https://bcr.bazel.build/modules/rules_cc/0.1.1/MODULE.bazel": "2f0222a6f229f0bf44cd711dc13c858dad98c62d52bd51d8fc3a764a83125513",
"https://bcr.bazel.build/modules/rules_cc/0.1.1/source.json": "d61627377bd7dd1da4652063e368d9366fc9a73920bfa396798ad92172cf645c",
"https://bcr.bazel.build/modules/rules_foreign_cc/0.9.0/MODULE.bazel": "c9e8c682bf75b0e7c704166d79b599f93b72cfca5ad7477df596947891feeef6",
"https://bcr.bazel.build/modules/rules_fuzzing/0.5.2/MODULE.bazel": "40c97d1144356f52905566c55811f13b299453a14ac7769dfba2ac38192337a8",
"https://bcr.bazel.build/modules/rules_fuzzing/0.5.2/source.json": "c8b1e2c717646f1702290959a3302a178fb639d987ab61d548105019f11e527e",
@ -139,7 +143,10 @@
"https://bcr.bazel.build/modules/rules_python/0.4.0/MODULE.bazel": "9208ee05fd48bf09ac60ed269791cf17fb343db56c8226a720fbb1cdf467166c",
"https://bcr.bazel.build/modules/rules_python/0.40.0/MODULE.bazel": "9d1a3cd88ed7d8e39583d9ffe56ae8a244f67783ae89b60caafc9f5cf318ada7",
"https://bcr.bazel.build/modules/rules_python/0.40.0/source.json": "939d4bd2e3110f27bfb360292986bb79fd8dcefb874358ccd6cdaa7bda029320",
"https://bcr.bazel.build/modules/rules_rust/0.61.0/MODULE.bazel": "0318a95777b9114c8740f34b60d6d68f9cfef61e2f4b52424ca626213d33787b",
"https://bcr.bazel.build/modules/rules_rust/0.61.0/source.json": "d1bc743b5fa2e2abb35c436df7126a53dab0c3f35890ae6841592b2253786a63",
"https://bcr.bazel.build/modules/rules_shell/0.2.0/MODULE.bazel": "fda8a652ab3c7d8fee214de05e7a9916d8b28082234e8d2c0094505c5268ed3c",
"https://bcr.bazel.build/modules/rules_shell/0.3.0/MODULE.bazel": "de4402cd12f4cc8fda2354fce179fdb068c0b9ca1ec2d2b17b3e21b24c1a937b",
"https://bcr.bazel.build/modules/rules_shell/0.4.0/MODULE.bazel": "0f8f11bb3cd11755f0b48c1de0bbcf62b4b34421023aa41a2fc74ef68d9584f0",
"https://bcr.bazel.build/modules/rules_shell/0.4.0/source.json": "1d7fa7f941cd41dc2704ba5b4edc2e2230eea1cc600d80bd2b65838204c50b95",
"https://bcr.bazel.build/modules/stardoc/0.5.1/MODULE.bazel": "1a05d92974d0c122f5ccf09291442580317cdd859f07a8655f1db9a60374f9f8",
@ -159,6 +166,37 @@
},
"selectedYankedVersions": {},
"moduleExtensions": {
"@@apple_support+//crosstool:setup.bzl%apple_cc_configure_extension": {
"general": {
"bzlTransitiveDigest": "xcBTf2+GaloFpg7YEh/Bv+1yAczRkiCt3DGws4K7kSk=",
"usagesDigest": "3L+PK6aRnliv0iIS8m3kdo+LjmvjJWoFCm3qZcPSg+8=",
"recordedFileInputs": {},
"recordedDirentsInputs": {},
"envVariables": {},
"generatedRepoSpecs": {
"local_config_apple_cc_toolchains": {
"repoRuleId": "@@apple_support+//crosstool:setup.bzl%_apple_cc_autoconf_toolchains",
"attributes": {}
},
"local_config_apple_cc": {
"repoRuleId": "@@apple_support+//crosstool:setup.bzl%_apple_cc_autoconf",
"attributes": {}
}
},
"recordedRepoMappingEntries": [
[
"apple_support+",
"bazel_tools",
"bazel_tools"
],
[
"bazel_tools",
"rules_cc",
"rules_cc+"
]
]
}
},
"@@rules_go+//go:extensions.bzl%go_sdk": {
"os:osx,arch:aarch64": {
"bzlTransitiveDigest": "jBP0cRKOr+A42aPGunoasOD+vrmMLJIJ8Jwi65DdelE=",

View file

@ -10,6 +10,8 @@
"https://bcr.bazel.build/modules/abseil-cpp/20230802.1/MODULE.bazel": "fa92e2eb41a04df73cdabeec37107316f7e5272650f81d6cc096418fe647b915",
"https://bcr.bazel.build/modules/abseil-cpp/20240116.1/MODULE.bazel": "37bcdb4440fbb61df6a1c296ae01b327f19e9bb521f9b8e26ec854b6f97309ed",
"https://bcr.bazel.build/modules/abseil-cpp/20240116.1/source.json": "9be551b8d4e3ef76875c0d744b5d6a504a27e3ae67bc6b28f46415fd2d2957da",
"https://bcr.bazel.build/modules/apple_support/1.17.1/MODULE.bazel": "655c922ab1209978a94ef6ca7d9d43e940cd97d9c172fb55f94d91ac53f8610b",
"https://bcr.bazel.build/modules/apple_support/1.17.1/source.json": "6b2b8c74d14e8d485528a938e44bdb72a5ba17632b9e14ef6e68a5ee96c8347f",
"https://bcr.bazel.build/modules/aspect_bazel_lib/2.14.0/MODULE.bazel": "2b31ffcc9bdc8295b2167e07a757dbbc9ac8906e7028e5170a3708cecaac119f",
"https://bcr.bazel.build/modules/aspect_bazel_lib/2.14.0/source.json": "0cf1826853b0bef8b5cd19c0610d717500f5521aa2b38b72b2ec302ac5e7526c",
"https://bcr.bazel.build/modules/aspect_bazel_lib/2.7.2/MODULE.bazel": "780d1a6522b28f5edb7ea09630748720721dfe27690d65a2d33aa7509de77e07",
@ -60,6 +62,7 @@
"https://bcr.bazel.build/modules/platforms/0.0.6/MODULE.bazel": "ad6eeef431dc52aefd2d77ed20a4b353f8ebf0f4ecdd26a807d2da5aa8cd0615",
"https://bcr.bazel.build/modules/platforms/0.0.7/MODULE.bazel": "72fd4a0ede9ee5c021f6a8dd92b503e089f46c227ba2813ff183b71616034814",
"https://bcr.bazel.build/modules/platforms/0.0.8/MODULE.bazel": "9f142c03e348f6d263719f5074b21ef3adf0b139ee4c5133e2aa35664da9eb2d",
"https://bcr.bazel.build/modules/platforms/0.0.9/MODULE.bazel": "4a87a60c927b56ddd67db50c89acaa62f4ce2a1d2149ccb63ffd871d5ce29ebc",
"https://bcr.bazel.build/modules/protobuf/21.7/MODULE.bazel": "a5a29bb89544f9b97edce05642fac225a808b5b7be74038ea3640fae2f8e66a7",
"https://bcr.bazel.build/modules/protobuf/27.0/MODULE.bazel": "7873b60be88844a0a1d8f80b9d5d20cfbd8495a689b8763e76c6372998d3f64c",
"https://bcr.bazel.build/modules/protobuf/27.1/MODULE.bazel": "703a7b614728bb06647f965264967a8ef1c39e09e8f167b3ca0bb1fd80449c0d",
@ -83,11 +86,12 @@
"https://bcr.bazel.build/modules/rules_cc/0.0.15/MODULE.bazel": "6704c35f7b4a72502ee81f61bf88706b54f06b3cbe5558ac17e2e14666cd5dcc",
"https://bcr.bazel.build/modules/rules_cc/0.0.16/MODULE.bazel": "7661303b8fc1b4d7f532e54e9d6565771fea666fbdf839e0a86affcd02defe87",
"https://bcr.bazel.build/modules/rules_cc/0.0.17/MODULE.bazel": "2ae1d8f4238ec67d7185d8861cb0a2cdf4bc608697c331b95bf990e69b62e64a",
"https://bcr.bazel.build/modules/rules_cc/0.0.17/source.json": "4db99b3f55c90ab28d14552aa0632533e3e8e5e9aea0f5c24ac0014282c2a7c5",
"https://bcr.bazel.build/modules/rules_cc/0.0.2/MODULE.bazel": "6915987c90970493ab97393024c156ea8fb9f3bea953b2f3ec05c34f19b5695c",
"https://bcr.bazel.build/modules/rules_cc/0.0.6/MODULE.bazel": "abf360251023dfe3efcef65ab9d56beefa8394d4176dd29529750e1c57eaa33f",
"https://bcr.bazel.build/modules/rules_cc/0.0.8/MODULE.bazel": "964c85c82cfeb6f3855e6a07054fdb159aced38e99a5eecf7bce9d53990afa3e",
"https://bcr.bazel.build/modules/rules_cc/0.0.9/MODULE.bazel": "836e76439f354b89afe6a911a7adf59a6b2518fafb174483ad78a2a2fde7b1c5",
"https://bcr.bazel.build/modules/rules_cc/0.1.1/MODULE.bazel": "2f0222a6f229f0bf44cd711dc13c858dad98c62d52bd51d8fc3a764a83125513",
"https://bcr.bazel.build/modules/rules_cc/0.1.1/source.json": "d61627377bd7dd1da4652063e368d9366fc9a73920bfa396798ad92172cf645c",
"https://bcr.bazel.build/modules/rules_foreign_cc/0.9.0/MODULE.bazel": "c9e8c682bf75b0e7c704166d79b599f93b72cfca5ad7477df596947891feeef6",
"https://bcr.bazel.build/modules/rules_fuzzing/0.5.2/MODULE.bazel": "40c97d1144356f52905566c55811f13b299453a14ac7769dfba2ac38192337a8",
"https://bcr.bazel.build/modules/rules_fuzzing/0.5.2/source.json": "c8b1e2c717646f1702290959a3302a178fb639d987ab61d548105019f11e527e",
@ -143,7 +147,10 @@
"https://bcr.bazel.build/modules/rules_python/0.40.0/MODULE.bazel": "9d1a3cd88ed7d8e39583d9ffe56ae8a244f67783ae89b60caafc9f5cf318ada7",
"https://bcr.bazel.build/modules/rules_python/1.3.0/MODULE.bazel": "8361d57eafb67c09b75bf4bbe6be360e1b8f4f18118ab48037f2bd50aa2ccb13",
"https://bcr.bazel.build/modules/rules_python/1.3.0/source.json": "25932f917cd279c7baefa6cb1d3fa8750a7a29de522024449b19af6eab51f4a0",
"https://bcr.bazel.build/modules/rules_rust/0.61.0/MODULE.bazel": "0318a95777b9114c8740f34b60d6d68f9cfef61e2f4b52424ca626213d33787b",
"https://bcr.bazel.build/modules/rules_rust/0.61.0/source.json": "d1bc743b5fa2e2abb35c436df7126a53dab0c3f35890ae6841592b2253786a63",
"https://bcr.bazel.build/modules/rules_shell/0.2.0/MODULE.bazel": "fda8a652ab3c7d8fee214de05e7a9916d8b28082234e8d2c0094505c5268ed3c",
"https://bcr.bazel.build/modules/rules_shell/0.3.0/MODULE.bazel": "de4402cd12f4cc8fda2354fce179fdb068c0b9ca1ec2d2b17b3e21b24c1a937b",
"https://bcr.bazel.build/modules/rules_shell/0.4.0/MODULE.bazel": "0f8f11bb3cd11755f0b48c1de0bbcf62b4b34421023aa41a2fc74ef68d9584f0",
"https://bcr.bazel.build/modules/rules_shell/0.4.0/source.json": "1d7fa7f941cd41dc2704ba5b4edc2e2230eea1cc600d80bd2b65838204c50b95",
"https://bcr.bazel.build/modules/stardoc/0.5.1/MODULE.bazel": "1a05d92974d0c122f5ccf09291442580317cdd859f07a8655f1db9a60374f9f8",
@ -164,6 +171,37 @@
},
"selectedYankedVersions": {},
"moduleExtensions": {
"@@apple_support+//crosstool:setup.bzl%apple_cc_configure_extension": {
"general": {
"bzlTransitiveDigest": "xcBTf2+GaloFpg7YEh/Bv+1yAczRkiCt3DGws4K7kSk=",
"usagesDigest": "3L+PK6aRnliv0iIS8m3kdo+LjmvjJWoFCm3qZcPSg+8=",
"recordedFileInputs": {},
"recordedDirentsInputs": {},
"envVariables": {},
"generatedRepoSpecs": {
"local_config_apple_cc_toolchains": {
"repoRuleId": "@@apple_support+//crosstool:setup.bzl%_apple_cc_autoconf_toolchains",
"attributes": {}
},
"local_config_apple_cc": {
"repoRuleId": "@@apple_support+//crosstool:setup.bzl%_apple_cc_autoconf",
"attributes": {}
}
},
"recordedRepoMappingEntries": [
[
"apple_support+",
"bazel_tools",
"bazel_tools"
],
[
"bazel_tools",
"rules_cc",
"rules_cc+"
]
]
}
},
"@@rules_go+//go:extensions.bzl%go_sdk": {
"os:osx,arch:aarch64": {
"bzlTransitiveDigest": "jBP0cRKOr+A42aPGunoasOD+vrmMLJIJ8Jwi65DdelE=",

View file

@ -1,12 +1,14 @@
load("@rules_go//go:def.bzl", "go_binary")
load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_library")
exports_files([
"go_analyze_wrapper.sh.tpl",
"go_exec_wrapper.sh.tpl",
"rust_analyze_wrapper.sh.tpl",
])
go_binary(
name = "analyze",
name = "analyze_go",
srcs = ["analyze.go"],
visibility = ["//visibility:public"],
)
@ -16,3 +18,18 @@ go_binary(
srcs = ["execute.go"],
visibility = ["//visibility:public"],
)
rust_binary(
name = "analyze",
srcs = ["analyze.rs"],
edition = "2021",
deps = [
"@crates//:serde",
"@crates//:serde_json",
"@crates//:log",
"@crates//:simple_logger",
"@crates//:crossbeam-channel",
"@crates//:num_cpus",
],
visibility = ["//visibility:public"],
)

516
graph/analyze.rs Normal file
View file

@ -0,0 +1,516 @@
use std::collections::{HashMap, HashSet};
use std::env;
use std::process::{Command, exit};
use std::sync::{Arc, Mutex};
use std::thread;
use serde::{Deserialize, Serialize};
use serde_json::{self, json};
use log::{info, error};
use simple_logger::SimpleLogger;
use std::str::FromStr;
// Data structures that mirror the Go implementation
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
enum DataDepType {
Query,
Materialize,
}
impl FromStr for DataDepType {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"query" => Ok(DataDepType::Query),
"materialize" => Ok(DataDepType::Materialize),
_ => Err(format!("Unknown DataDepType: {}", s)),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct DataDep {
#[serde(rename = "depType")]
dep_type: DataDepType,
#[serde(rename = "ref")]
reference: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct JobConfig {
inputs: Vec<DataDep>,
outputs: Vec<String>,
args: Vec<String>,
env: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Task {
#[serde(rename = "jobLabel")]
job_label: String,
config: JobConfig,
}
#[derive(Debug, Serialize, Deserialize)]
struct JobGraph {
outputs: Vec<String>,
nodes: Vec<Task>,
}
// Function to convert a job label to a configuration path
fn job_label_to_cfg_path(job_label: &str) -> String {
let without_prefix = job_label.replace("//", "");
let with_slash = without_prefix.replace(":", "/");
format!(".{}.cfg", with_slash)
}
// Configure a job to produce the desired outputs
fn configure(job_label: &str, output_refs: &[String]) -> Result<Vec<Task>, String> {
let candidate_jobs_str = env::var("DATABUILD_CANDIDATE_JOBS")
.map_err(|e| format!("Failed to get DATABUILD_CANDIDATE_JOBS: {}", e))?;
let job_path_map: HashMap<String, String> = serde_json::from_str(&candidate_jobs_str)
.map_err(|e| format!("Failed to parse DATABUILD_CANDIDATE_JOBS: {}", e))?;
// Look up the executable path for this job
let exec_path = job_path_map.get(job_label)
.ok_or_else(|| format!("Job {} is not a candidate job", job_label))?;
// Check if executable exists
if !std::path::Path::new(exec_path).exists() {
return Err(format!("Executable not found at path: {}", exec_path));
}
info!("Executing job configuration: {} {:?}", exec_path, output_refs);
// Execute the job configuration command
let output = Command::new(exec_path)
.args(output_refs)
.output()
.map_err(|e| format!("Failed to execute job config: {}", e))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
error!("Job configuration failed: {}", stderr);
return Err(format!("Failed to run job config: {}", stderr));
}
info!("Job configuration succeeded for {}", job_label);
// Parse the job configurations
let stdout = String::from_utf8_lossy(&output.stdout);
let job_configs: Vec<JobConfig> = serde_json::from_str(&stdout)
.map_err(|e| {
error!("Error parsing job configs for {}: {}. `{}`", job_label, e, stdout);
format!("Failed to parse job configs: {}", e)
})?;
// Create tasks
let tasks: Vec<Task> = job_configs.into_iter()
.map(|cfg| Task {
job_label: job_label.to_string(),
config: cfg,
})
.collect();
info!("Created {} tasks for job {}", tasks.len(), job_label);
Ok(tasks)
}
// Resolve produces a mapping of required job refs to the partitions it produces
fn resolve(output_refs: &[String]) -> Result<HashMap<String, Vec<String>>, String> {
let lookup_path = env::var("DATABUILD_JOB_LOOKUP_PATH")
.map_err(|e| format!("Failed to get DATABUILD_JOB_LOOKUP_PATH: {}", e))?;
// Run the job lookup
info!("Executing job lookup: {} {:?}", lookup_path, output_refs);
let output = Command::new(&lookup_path)
.args(output_refs)
.output()
.map_err(|e| format!("Failed to execute job lookup: {}", e))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
error!("Job lookup failed: {}", stderr);
return Err(format!("Failed to run job lookup: {}", stderr));
}
info!("Job lookup succeeded for {} output refs", output_refs.len());
// Parse the result
let stdout = String::from_utf8_lossy(&output.stdout);
let result: HashMap<String, Vec<String>> = serde_json::from_str(&stdout)
.map_err(|e| {
error!("Error parsing job lookup result: {}", e);
format!("Failed to parse job lookup result: {}", e)
})?;
info!("Job lookup found {} job mappings", result.len());
for (job, refs) in &result {
info!(" Job {} produces {} refs", job, refs.len());
}
Ok(result)
}
// Configure multiple jobs in parallel
fn configure_parallel(job_refs: HashMap<String, Vec<String>>, num_workers: usize) -> Result<Vec<Task>, String> {
// Create a channel for jobs
let (job_sender, job_receiver) = crossbeam_channel::unbounded();
// Fill the jobs channel
for (job_label, produced_refs) in job_refs {
job_sender.send((job_label, produced_refs)).unwrap();
}
drop(job_sender); // Close the channel
// Create a channel for results
let (task_sender, task_receiver) = crossbeam_channel::unbounded();
let error = Arc::new(Mutex::new(None));
// Spawn worker threads
let mut handles = vec![];
for _ in 0..num_workers {
let job_receiver = job_receiver.clone();
let task_sender = task_sender.clone();
let error = Arc::clone(&error);
let handle = thread::spawn(move || {
for (job_label, produced_refs) in job_receiver {
// Check if an error has already occurred
if error.lock().unwrap().is_some() {
return;
}
match configure(&job_label, &produced_refs) {
Ok(tasks) => {
task_sender.send(tasks).unwrap();
}
Err(e) => {
let mut error_guard = error.lock().unwrap();
if error_guard.is_none() {
*error_guard = Some(e);
}
return;
}
}
}
});
handles.push(handle);
}
// Close the task sender
drop(task_sender);
// Wait for all workers to finish
for handle in handles {
handle.join().unwrap();
}
// Check for errors
let error_guard = error.lock().unwrap();
if let Some(e) = &*error_guard {
return Err(e.clone());
}
// Collect results
let mut all_tasks = Vec::new();
while let Ok(tasks) = task_receiver.try_recv() {
all_tasks.extend(tasks);
}
Ok(all_tasks)
}
// Plan creates a job graph for given output references
fn plan(output_refs: &[String]) -> Result<JobGraph, String> {
info!("Starting planning for {} output refs: {:?}", output_refs.len(), output_refs);
let mut unhandled_refs = HashSet::new();
for ref_str in output_refs {
unhandled_refs.insert(ref_str.clone());
}
let mut epoch = 0;
let mut nodes = Vec::new();
// Determine the number of workers based on available CPU cores or environment variable
let mut num_workers = num_cpus::get();
if let Ok(worker_env) = env::var("DATABUILD_PARALLEL_WORKERS") {
if let Ok(parsed_workers) = worker_env.parse::<usize>() {
if parsed_workers < 1 {
num_workers = 1;
info!("Warning: DATABUILD_PARALLEL_WORKERS must be at least 1, using: {}", num_workers);
} else {
num_workers = parsed_workers;
}
} else {
info!("Warning: Invalid DATABUILD_PARALLEL_WORKERS value '{}', using default: {}", worker_env, num_workers);
}
}
info!("Using {} workers for parallel execution", num_workers);
while !unhandled_refs.is_empty() {
if epoch >= 1000 {
error!("Planning timeout: still planning after {} epochs, giving up", epoch);
return Err(format!("Still planning after {} epochs, giving up", epoch));
}
info!("Planning epoch {} with {} unhandled refs", epoch, unhandled_refs.len());
// Resolve jobs for all unhandled refs
let unhandled_refs_list: Vec<String> = unhandled_refs.iter().cloned().collect();
let job_refs = resolve(&unhandled_refs_list)?;
// Configure jobs in parallel
let new_nodes = configure_parallel(job_refs.clone(), num_workers)?;
// Remove handled refs
for (_, produced_refs) in job_refs {
for ref_str in produced_refs {
unhandled_refs.remove(&ref_str);
}
}
if !unhandled_refs.is_empty() {
error!("Error: Still have unhandled refs after configuration phase: {:?}", unhandled_refs);
return Err(format!("Should have no unhandled refs after configuration phase, but had: {:?}", unhandled_refs));
}
epoch += 1;
// Add new nodes to the graph
nodes.extend(new_nodes.clone());
info!("Planning epoch {} completed: added {} new nodes, total nodes: {}", epoch, new_nodes.len(), nodes.len());
// Plan next epoch
let mut new_unhandled_count = 0;
for task in &new_nodes {
for input in &task.config.inputs {
if input.dep_type == DataDepType::Materialize {
if !unhandled_refs.contains(&input.reference) {
new_unhandled_count += 1;
}
unhandled_refs.insert(input.reference.clone());
}
}
}
if new_unhandled_count > 0 {
info!("Added {} new unhandled refs for next planning epoch", new_unhandled_count);
}
}
if !nodes.is_empty() {
info!("Planning complete: created graph with {} nodes for {} output refs", nodes.len(), output_refs.len());
Ok(JobGraph {
outputs: output_refs.to_vec(),
nodes,
})
} else {
error!("Planning failed: no nodes created for output refs {:?}", output_refs);
Err("Unknown failure in graph planning".to_string())
}
}
// Generate a Mermaid flowchart diagram from a job graph
fn generate_mermaid_diagram(graph: &JobGraph) -> String {
// Start the mermaid flowchart
let mut mermaid = String::from("flowchart TD\n");
// Track nodes we've already added to avoid duplicates
let mut added_nodes = HashSet::new();
let mut added_refs = HashSet::new();
// Map to track which refs are outputs (to highlight them)
let mut is_output_ref = HashSet::new();
for ref_str in &graph.outputs {
is_output_ref.insert(ref_str.clone());
}
// Process each task in the graph
for task in &graph.nodes {
// Create a unique ID for this job+outputs combination
let outputs_key = task.config.outputs.join("_");
let mut job_node_id = format!("job_{}", task.job_label.replace("//", "_"));
job_node_id = job_node_id.replace(":", "_");
job_node_id = format!("{}_{}", job_node_id, outputs_key.replace("/", "_"));
// Create a descriptive label that includes both job label and outputs
let job_label = &task.job_label;
let outputs_label = if !task.config.outputs.is_empty() {
if task.config.outputs.len() == 1 {
format!(" [{}]", task.config.outputs[0])
} else {
format!(" [{}, ...]", task.config.outputs[0])
}
} else {
String::new()
};
// Add the job node if not already added
if !added_nodes.contains(&job_node_id) {
// Represent job as a process shape with escaped label
mermaid.push_str(&format!(
" {}[\"`**{}** {}`\"]:::job\n",
job_node_id,
job_label,
outputs_label
));
added_nodes.insert(job_node_id.clone());
}
// Process inputs (dependencies)
for input in &task.config.inputs {
let ref_node_id = format!("ref_{}", input.reference.replace("/", "_"));
// Add the partition ref node if not already added
if !added_refs.contains(&ref_node_id) {
let node_class = if is_output_ref.contains(&input.reference) {
"outputPartition"
} else {
"partition"
};
// Represent partition as a cylinder
mermaid.push_str(&format!(
" {}[(\"{}\")]:::{}\n",
ref_node_id,
input.reference,
node_class
));
added_refs.insert(ref_node_id.clone());
}
// Add the edge from input to job
if input.dep_type == DataDepType::Materialize {
// Solid line for materialize dependencies
mermaid.push_str(&format!(" {} --> {}\n", ref_node_id, job_node_id));
} else {
// Dashed line for query dependencies
mermaid.push_str(&format!(" {} -.-> {}\n", ref_node_id, job_node_id));
}
}
// Process outputs
for output in &task.config.outputs {
let ref_node_id = format!("ref_{}", output.replace("/", "_"));
// Add the partition ref node if not already added
if !added_refs.contains(&ref_node_id) {
let node_class = if is_output_ref.contains(output) {
"outputPartition"
} else {
"partition"
};
// Represent partition as a cylinder
mermaid.push_str(&format!(
" {}[(\"Partition: {}\")]:::{}\n",
ref_node_id,
output,
node_class
));
added_refs.insert(ref_node_id.clone());
}
// Add the edge from job to output
mermaid.push_str(&format!(" {} --> {}\n", job_node_id, ref_node_id));
}
}
// Add styling
mermaid.push_str("\n %% Styling\n");
mermaid.push_str(" classDef job fill:#f9f,stroke:#333,stroke-width:1px;\n");
mermaid.push_str(" classDef partition fill:#bbf,stroke:#333,stroke-width:1px;\n");
mermaid.push_str(" classDef outputPartition fill:#bfb,stroke:#333,stroke-width:2px;\n");
mermaid
}
fn main() {
// Initialize logger
SimpleLogger::new().init().unwrap();
let mode = env::var("DATABUILD_MODE").unwrap_or_else(|_| "unknown".to_string());
info!("Starting analyze.rs in mode: {}", mode);
let args: Vec<String> = env::args().skip(1).collect();
match mode.as_str() {
"plan" => {
// Get output refs from command line arguments
match plan(&args) {
Ok(graph) => {
// Output the job graph as JSON
match serde_json::to_string(&graph) {
Ok(json_data) => {
info!("Successfully generated job graph with {} nodes", graph.nodes.len());
println!("{}", json_data);
}
Err(e) => {
error!("Error marshaling job graph: {}", e);
eprintln!("Error marshaling job graph: {}", e);
exit(1);
}
}
}
Err(e) => {
eprintln!("Error: {}", e);
exit(1);
}
}
}
"lookup" => {
// Get output refs from command line arguments
match resolve(&args) {
Ok(result) => {
// Output the result as JSON
match serde_json::to_string(&result) {
Ok(json_data) => {
info!("Successfully completed lookup for {} output refs with {} job mappings", args.len(), result.len());
println!("{}", json_data);
}
Err(e) => {
error!("Error marshaling lookup result: {}", e);
eprintln!("Error marshaling lookup result: {}", e);
exit(1);
}
}
}
Err(e) => {
eprintln!("Error: {}", e);
exit(1);
}
}
}
"mermaid" => {
// Get output refs from command line arguments
match plan(&args) {
Ok(graph) => {
// Generate and output the mermaid diagram
let mermaid_diagram = generate_mermaid_diagram(&graph);
println!("{}", mermaid_diagram);
info!("Successfully generated mermaid diagram for {} nodes", graph.nodes.len());
}
Err(e) => {
eprintln!("Error: {}", e);
exit(1);
}
}
}
"import_test" => {
info!("Running in import_test mode");
println!("ok :)");
info!("Import test completed successfully");
}
_ => {
error!("Error: Unknown mode '{}'", mode);
eprintln!("Unknown MODE `{}`", mode);
exit(1);
}
}
}

View file

@ -0,0 +1,13 @@
#!/bin/bash
set -e
%{RUNFILES_PREFIX}
%{PREFIX}
# Locate the Rust binary using its standard runfiles path
# Assumes workspace name is 'databuild'
EXECUTABLE_BINARY="$(rlocation "databuild/graph/analyze")"
# Run the analysis
exec "${EXECUTABLE_BINARY}" "$@"

View file

@ -1,3 +1,3 @@
#!/usr/bin/env bash
DATABUILD_MODE=import_test DATABUILD_JOB_LOOKUP_PATH=foo DATABUILD_CANDIDATE_JOBS=bar graph/analyze_/analyze
DATABUILD_MODE=import_test DATABUILD_JOB_LOOKUP_PATH=foo DATABUILD_CANDIDATE_JOBS=bar graph/analyze

View file

@ -410,7 +410,7 @@ _databuild_graph_analyze = rule(
allow_empty = False,
),
"_template": attr.label(
default = "@databuild//graph:go_analyze_wrapper.sh.tpl",
default = "@databuild//graph:rust_analyze_wrapper.sh.tpl",
allow_single_file = True,
),
"_bash_runfiles": attr.label(