Start fixing mermaid charts
Some checks failed
/ setup (push) Has been cancelled

This commit is contained in:
Stuart Axelbrooke 2025-07-17 22:00:03 -07:00
parent 953d317505
commit a358e7a091
8 changed files with 235 additions and 213 deletions

View file

@ -39,12 +39,12 @@ rust_library(
"event_log/sqlite.rs",
"event_log/stdout.rs",
"lib.rs",
"orchestration/mod.rs",
"mermaid_utils.rs",
"orchestration/error.rs",
"orchestration/events.rs",
"orchestration/mod.rs",
"service/handlers.rs",
"service/mod.rs",
"service/mermaid_utils.rs",
":generate_databuild_rust",
],
edition = "2021",

View file

@ -9,6 +9,7 @@ use clap::{Arg, Command as ClapCommand};
use uuid::Uuid;
use databuild::*;
use databuild::event_log::{BuildEventLog, create_build_event_log, create_build_event};
use databuild::mermaid_utils::generate_mermaid_diagram;
// Configure a job to produce the desired outputs
fn configure(job_label: &str, output_refs: &[String]) -> Result<Vec<Task>, String> {
@ -384,120 +385,120 @@ async fn plan(
}
// Generate a Mermaid flowchart diagram from a job graph
fn generate_mermaid_diagram(graph: &JobGraph) -> String {
// Start the mermaid flowchart
let mut mermaid = String::from("flowchart TD\n");
// Track nodes we've already added to avoid duplicates
let mut added_nodes = HashSet::new();
let mut added_refs = HashSet::new();
// Map to track which refs are outputs (to highlight them)
let mut is_output_ref = HashSet::new();
for ref_str in &graph.outputs {
is_output_ref.insert(ref_str.str.clone());
}
// Process each task in the graph
for task in &graph.nodes {
// Create a unique ID for this job+outputs combination
let outputs_strs: Vec<String> = task.config.as_ref().unwrap().outputs.iter().map(|o| o.str.clone()).collect();
let outputs_key = outputs_strs.join("_");
let mut job_node_id = format!("job_{}", task.job.as_ref().unwrap().label.replace("//", "_"));
job_node_id = job_node_id.replace(":", "_").replace("=", "_").replace("?", "_").replace(" ", "_");
job_node_id = format!("{}_{}", job_node_id, outputs_key.replace("/", "_").replace("=", "_"));
// Create a descriptive label that includes both job label and outputs
let job_label = &task.job.as_ref().unwrap().label;
let outputs_label = if !task.config.as_ref().unwrap().outputs.is_empty() {
if task.config.as_ref().unwrap().outputs.len() == 1 {
format!(" [{}]", task.config.as_ref().unwrap().outputs[0].str)
} else {
format!(" [{}, ...]", task.config.as_ref().unwrap().outputs[0].str)
}
} else {
String::new()
};
// Add the job node if not already added
if !added_nodes.contains(&job_node_id) {
// Represent job as a process shape with escaped label
mermaid.push_str(&format!(
" {}[\"`**{}** {}`\"]:::job\n",
job_node_id,
job_label,
outputs_label
));
added_nodes.insert(job_node_id.clone());
}
// Process inputs (dependencies)
for input in &task.config.as_ref().unwrap().inputs {
let ref_node_id = format!("ref_{}", input.partition_ref.as_ref().unwrap().str.replace("/", "_").replace("=", "_"));
// Add the partition ref node if not already added
if !added_refs.contains(&ref_node_id) {
let node_class = if is_output_ref.contains(&input.partition_ref.as_ref().unwrap().str) {
"outputPartition"
} else {
"partition"
};
// Represent partition as a cylinder
mermaid.push_str(&format!(
" {}[(\"{}\")]:::{}\n",
ref_node_id,
input.partition_ref.as_ref().unwrap().str.replace("/", "_").replace("=", "_"),
node_class
));
added_refs.insert(ref_node_id.clone());
}
// Add the edge from input to job
if input.dep_type == 1 { // MATERIALIZE = 1
// Solid line for materialize dependencies
mermaid.push_str(&format!(" {} --> {}\n", ref_node_id, job_node_id));
} else {
// Dashed line for query dependencies
mermaid.push_str(&format!(" {} -.-> {}\n", ref_node_id, job_node_id));
}
}
// Process outputs
for output in &task.config.as_ref().unwrap().outputs {
let ref_node_id = format!("ref_{}", output.str.replace("/", "_").replace("=", "_"));
// Add the partition ref node if not already added
if !added_refs.contains(&ref_node_id) {
let node_class = if is_output_ref.contains(&output.str) {
"outputPartition"
} else {
"partition"
};
// Represent partition as a cylinder
mermaid.push_str(&format!(
" {}[(\"Partition: {}\")]:::{}\n",
ref_node_id,
output.str,
node_class
));
added_refs.insert(ref_node_id.clone());
}
// Add the edge from job to output
mermaid.push_str(&format!(" {} --> {}\n", job_node_id, ref_node_id));
}
}
// Add styling
mermaid.push_str("\n %% Styling\n");
mermaid.push_str(" classDef job fill:#f9f,stroke:#333,stroke-width:1px;\n");
mermaid.push_str(" classDef partition fill:#bbf,stroke:#333,stroke-width:1px;\n");
mermaid.push_str(" classDef outputPartition fill:#bfb,stroke:#333,stroke-width:2px;\n");
mermaid
}
// fn generate_mermaid_diagram(graph: &JobGraph) -> String {
// // Start the mermaid flowchart
// let mut mermaid = String::from("flowchart TD\n");
//
// // Track nodes we've already added to avoid duplicates
// let mut added_nodes = HashSet::new();
// let mut added_refs = HashSet::new();
//
// // Map to track which refs are outputs (to highlight them)
// let mut is_output_ref = HashSet::new();
// for ref_str in &graph.outputs {
// is_output_ref.insert(ref_str.str.clone());
// }
//
// // Process each task in the graph
// for task in &graph.nodes {
// // Create a unique ID for this job+outputs combination
// let outputs_strs: Vec<String> = task.config.as_ref().unwrap().outputs.iter().map(|o| o.str.clone()).collect();
// let outputs_key = outputs_strs.join("_");
// let mut job_node_id = format!("job_{}", task.job.as_ref().unwrap().label.replace("//", "_"));
// job_node_id = job_node_id.replace(":", "_").replace("=", "_").replace("?", "_").replace(" ", "_");
// job_node_id = format!("{}_{}", job_node_id, outputs_key.replace("/", "_").replace("=", "_"));
//
// // Create a descriptive label that includes both job label and outputs
// let job_label = &task.job.as_ref().unwrap().label;
// let outputs_label = if !task.config.as_ref().unwrap().outputs.is_empty() {
// if task.config.as_ref().unwrap().outputs.len() == 1 {
// format!(" [{}]", task.config.as_ref().unwrap().outputs[0].str)
// } else {
// format!(" [{}, ...]", task.config.as_ref().unwrap().outputs[0].str)
// }
// } else {
// String::new()
// };
//
// // Add the job node if not already added
// if !added_nodes.contains(&job_node_id) {
// // Represent job as a process shape with escaped label
// mermaid.push_str(&format!(
// " {}[\"`**{}** {}`\"]:::job\n",
// job_node_id,
// job_label,
// outputs_label
// ));
// added_nodes.insert(job_node_id.clone());
// }
//
// // Process inputs (dependencies)
// for input in &task.config.as_ref().unwrap().inputs {
// let ref_node_id = format!("ref_{}", input.partition_ref.as_ref().unwrap().str.replace("/", "_").replace("=", "_"));
//
// // Add the partition ref node if not already added
// if !added_refs.contains(&ref_node_id) {
// let node_class = if is_output_ref.contains(&input.partition_ref.as_ref().unwrap().str) {
// "outputPartition"
// } else {
// "partition"
// };
//
// // Represent partition as a cylinder
// mermaid.push_str(&format!(
// " {}[(\"{}\")]:::{}\n",
// ref_node_id,
// input.partition_ref.as_ref().unwrap().str.replace("/", "_").replace("=", "_"),
// node_class
// ));
// added_refs.insert(ref_node_id.clone());
// }
//
// // Add the edge from input to job
// if input.dep_type == 1 { // MATERIALIZE = 1
// // Solid line for materialize dependencies
// mermaid.push_str(&format!(" {} --> {}\n", ref_node_id, job_node_id));
// } else {
// // Dashed line for query dependencies
// mermaid.push_str(&format!(" {} -.-> {}\n", ref_node_id, job_node_id));
// }
// }
//
// // Process outputs
// for output in &task.config.as_ref().unwrap().outputs {
// let ref_node_id = format!("ref_{}", output.str.replace("/", "_").replace("=", "_"));
//
// // Add the partition ref node if not already added
// if !added_refs.contains(&ref_node_id) {
// let node_class = if is_output_ref.contains(&output.str) {
// "outputPartition"
// } else {
// "partition"
// };
//
// // Represent partition as a cylinder
// mermaid.push_str(&format!(
// " {}[(\"Partition: {}\")]:::{}\n",
// ref_node_id,
// output.str,
// node_class
// ));
// added_refs.insert(ref_node_id.clone());
// }
//
// // Add the edge from job to output
// mermaid.push_str(&format!(" {} --> {}\n", job_node_id, ref_node_id));
// }
// }
//
// // Add styling
// mermaid.push_str("\n %% Styling\n");
// mermaid.push_str(" classDef job fill:#f9f,stroke:#333,stroke-width:1px;\n");
// mermaid.push_str(" classDef partition fill:#bbf,stroke:#333,stroke-width:1px;\n");
// mermaid.push_str(" classDef outputPartition fill:#bfb,stroke:#333,stroke-width:2px;\n");
//
// mermaid
// }
#[tokio::main]
async fn main() {

View file

@ -10,6 +10,8 @@ pub mod orchestration;
// Service module
pub mod service;
pub mod mermaid_utils;
// Re-export commonly used types from event_log
pub use event_log::{BuildEventLog, BuildEventLogError, create_build_event_log};

View file

@ -41,6 +41,7 @@ pub fn extract_status_map(events: &[BuildEvent]) -> (HashMap<String, NodeStatus>
for event in sorted_events {
match &event.event_type {
// TODO map this to a job + outputs hash so that job status highlighting is correct
Some(crate::build_event::EventType::JobEvent(job_event)) => {
if let Some(job_label) = &job_event.job_label {
let status = match job_event.status {
@ -76,6 +77,57 @@ pub fn extract_status_map(events: &[BuildEvent]) -> (HashMap<String, NodeStatus>
(job_statuses, partition_statuses)
}
/// Encodes ID for safe usage in mermaid graph
fn encode_id(id: &str) -> String {
id.replace("/", "_").replace("=", "_").replace(":", "_")
}
struct MermaidJobNode {
task: Task,
id: String,
label: String,
}
impl MermaidJobNode {
fn from(task: &Task) -> Option<MermaidJobNode> {
let job_label: String = match &task.job {
Some(job) => job.label.clone(),
None => return None,
};
let outputs_label: String = match &task.config {
Some(config) => config.outputs.iter()
.map(|o| o.str.clone())
.collect::<Vec<_>>()
.join("___"),
None => String::new(),
};
let id = encode_id(&(job_label.clone() + "___" + &outputs_label));
let label = format!("**{}** {}", job_label, outputs_label);
Some(MermaidJobNode {
task: task.clone(),
id,
label,
})
}
fn to_mermaid(&self, job_statuses: &HashMap<String, NodeStatus>) -> String {
let status = job_statuses.get(&self.task.job.as_ref().unwrap().label).unwrap_or(&NodeStatus::Pending);
format!(" {}[\"{}\"]:::job_{}\n", self.id, self.label, status.css_class())
}
}
struct MermaidPartitionNode {
id: String,
label: String,
}
pub fn generate_mermaid_diagram(graph: &JobGraph) -> String {
generate_mermaid_with_status(graph, &[])
}
/// Generate a mermaid diagram for a job graph with current status annotations
pub fn generate_mermaid_with_status(
graph: &JobGraph,
@ -89,7 +141,6 @@ pub fn generate_mermaid_with_status(
// Track nodes we've already added to avoid duplicates
let mut added_nodes = HashSet::new();
let mut added_refs = HashSet::new();
let mut added_edges = HashSet::new();
// Map to track which refs are outputs (to highlight them)
let mut is_output_ref = HashSet::new();
@ -99,34 +150,11 @@ pub fn generate_mermaid_with_status(
// Add all task nodes and their relationships
for task in &graph.nodes {
let job_label = match &task.job {
Some(label) => &label.label,
None => continue,
};
let job_node_id = format!("job_{}", job_label.replace("/", "_").replace(":", "_"));
let job_node = MermaidJobNode::from(task).unwrap();
// Only add the job node once
if !added_nodes.contains(&job_node_id) {
let outputs_label = match &task.config {
Some(config) => config.outputs.iter()
.map(|o| o.str.clone())
.collect::<Vec<_>>()
.join(", "),
None => String::new(),
};
// Get the job status
let status = job_statuses.get(job_label).unwrap_or(&NodeStatus::Pending);
mermaid.push_str(&format!(
" {}[\"`**{}** {}`\"]:::job_{}\n",
job_node_id,
job_label,
outputs_label,
status.css_class()
));
added_nodes.insert(job_node_id.clone());
if !added_nodes.contains(&job_node.id) {
mermaid.push_str(&job_node.to_mermaid(&job_statuses));
added_nodes.insert(job_node.id.clone());
}
// Process inputs (dependencies)
@ -153,21 +181,14 @@ pub fn generate_mermaid_with_status(
added_refs.insert(ref_node_id.clone());
}
// Add the edge from input to job (avoid duplicates)
let edge_key = if input.dep_type == 1 { // MATERIALIZE = 1
format!("{}-->{}", ref_node_id, job_node_id)
let mermaid_edge = if (input.dep_type == 1) {
&format!(" {} --> {}\n", ref_node_id, job_node.id)
} else {
format!("{}-.->{}", ref_node_id, job_node_id)
&format!(" {} -.-> {}\n", ref_node_id, job_node.id)
};
if !added_edges.contains(&edge_key) {
if input.dep_type == 1 { // MATERIALIZE = 1
mermaid.push_str(&format!(" {} --> {}\n", ref_node_id, job_node_id));
} else {
// Dashed line for query dependencies
mermaid.push_str(&format!(" {} -.-> {}\n", ref_node_id, job_node_id));
}
added_edges.insert(edge_key);
if !mermaid.contains(mermaid_edge.trim()) {
mermaid.push_str(mermaid_edge);
}
}
}
@ -195,10 +216,9 @@ pub fn generate_mermaid_with_status(
}
// Add the edge from job to output (avoid duplicates)
let edge_key = format!("{}-->{}", job_node_id, ref_node_id);
if !added_edges.contains(&edge_key) {
mermaid.push_str(&format!(" {} --> {}\n", job_node_id, ref_node_id));
added_edges.insert(edge_key);
let mermaid_edge = &format!(" {} --> {}\n", job_node.id, ref_node_id);
if !mermaid.contains(mermaid_edge.trim()) {
mermaid.push_str(mermaid_edge);
}
}
}

View file

@ -1,5 +1,5 @@
load("@rules_oci//oci:defs.bzl", "oci_image", "oci_load")
load("@aspect_bazel_lib//lib:tar.bzl", "tar")
load("@rules_oci//oci:defs.bzl", "oci_image", "oci_load")
RUNFILES_PREFIX = """
# ================= BEGIN RUNFILES INIT =================
@ -272,6 +272,7 @@ def databuild_graph(name, jobs, lookup, visibility = None):
graph_label = "//%s:%s" % (native.package_name(), name),
visibility = visibility,
)
# Build deployment targets (renamed for hierarchical namespacing)
tar(
name = "%s.build.tar" % name,
@ -321,7 +322,6 @@ def databuild_graph(name, jobs, lookup, visibility = None):
repo_tags = ["databuild_%s_service:latest" % name],
)
# TODO there feels like a lot of boilerplate around wrapping a target with a script - can this be simplified?
def _databuild_graph_lookup_impl(ctx):
script = ctx.actions.declare_file(ctx.label.name)
@ -375,14 +375,13 @@ def _databuild_graph_analyze_impl(ctx):
script = ctx.actions.declare_file(ctx.label.name)
config_paths = {
"//" + job.label.package + ":" +job.label.name:
"$(rlocation _main/" + job[DataBuildJobInfo].configure.files_to_run.executable.short_path + ")"
"//" + job.label.package + ":" + job.label.name: "$(rlocation _main/" + job[DataBuildJobInfo].configure.files_to_run.executable.short_path + ")"
for job in ctx.attr.jobs
}
config_paths_str = "{" + ",".join(['\\"%s\\":\\"%s\\"' % (k, v) for k, v in config_paths.items()]) + "}"
candidate_job_env_var = "'" + ",".join([
"//" + target.label.package + ":" +target.label.name
"//" + target.label.package + ":" + target.label.name
for target in ctx.attr.jobs
]) + "'"
@ -417,7 +416,7 @@ export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path})
runfiles = ctx.runfiles(
files = [ctx.executable.lookup, ctx.executable._analyze] + configure_executables,
).merge(ctx.attr.lookup.default_runfiles).merge(ctx.attr._analyze.default_runfiles).merge(
ctx.attr._bash_runfiles.default_runfiles
ctx.attr._bash_runfiles.default_runfiles,
).merge_all([job.default_runfiles for job in ctx.attr.jobs])
# Merge runfiles from all configure targets
@ -432,7 +431,6 @@ export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path})
),
]
_databuild_graph_analyze = rule(
implementation = _databuild_graph_analyze_impl,
attrs = {
@ -467,14 +465,13 @@ def _databuild_graph_mermaid_impl(ctx):
script = ctx.actions.declare_file(ctx.label.name)
config_paths = {
"//" + job.label.package + ":" +job.label.name:
"$(rlocation _main/" + job[DataBuildJobInfo].configure.files_to_run.executable.short_path + ")"
"//" + job.label.package + ":" + job.label.name: "$(rlocation _main/" + job[DataBuildJobInfo].configure.files_to_run.executable.short_path + ")"
for job in ctx.attr.jobs
}
config_paths_str = "{" + ",".join(['\\"%s\\":\\"%s\\"' % (k, v) for k, v in config_paths.items()]) + "}"
candidate_job_env_var = "'" + ",".join([
"//" + target.label.package + ":" +target.label.name
"//" + target.label.package + ":" + target.label.name
for target in ctx.attr.jobs
]) + "'"
@ -509,7 +506,7 @@ export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path})
runfiles = ctx.runfiles(
files = [ctx.executable.lookup, ctx.executable._analyze] + configure_executables,
).merge(ctx.attr.lookup.default_runfiles).merge(ctx.attr._analyze.default_runfiles).merge(
ctx.attr._bash_runfiles.default_runfiles
ctx.attr._bash_runfiles.default_runfiles,
).merge_all([job.default_runfiles for job in ctx.attr.jobs])
# Merge runfiles from all configure targets
@ -521,7 +518,7 @@ export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path})
DefaultInfo(
executable = script,
runfiles = runfiles,
)
),
]
_databuild_graph_mermaid = rule(
@ -580,7 +577,7 @@ def _databuild_graph_exec_impl(ctx):
runfiles = ctx.runfiles(
files = [ctx.executable._execute] + execute_executables,
).merge(ctx.attr._execute.default_runfiles).merge(
ctx.attr._bash_runfiles.default_runfiles
ctx.attr._bash_runfiles.default_runfiles,
).merge_all([job.default_runfiles for job in ctx.attr.jobs])
# Merge runfiles from all execute targets
@ -615,7 +612,7 @@ _databuild_graph_exec = rule(
default = "@databuild//databuild/graph:execute",
executable = True,
cfg = "target",
)
),
},
executable = True,
)
@ -641,8 +638,10 @@ def _databuild_graph_build_impl(ctx):
separator = "," if i < len(ctx.attr.jobs) - 1 else ""
candidate_jobs_script_lines.append(
'CANDIDATE_JOBS_JSON="${CANDIDATE_JOBS_JSON}\\"%s\\":\\"$(rlocation _main/%s)\\"%s"' % (
job_label, configure_path, separator
)
job_label,
configure_path,
separator,
),
)
candidate_jobs_script_lines.append('CANDIDATE_JOBS_JSON="${CANDIDATE_JOBS_JSON}}"')
candidate_jobs_script = "\n".join(candidate_jobs_script_lines)
@ -663,7 +662,7 @@ export DATABUILD_BUILD_REQUEST_ID=$(python3 -c "import uuid; print(uuid.uuid4())
""" % (
candidate_jobs_script,
ctx.attr.lookup.files_to_run.executable.short_path,
ctx.attr.graph_label
ctx.attr.graph_label,
)
ctx.actions.write(
@ -685,6 +684,7 @@ export DATABUILD_BUILD_REQUEST_ID=$(python3 -c "import uuid; print(uuid.uuid4())
# We need to find the .exec target for each job
job_name = job.label.name
exec_target_name = job_name + ".exec"
# Find the .exec target in the same package
for attr_name in dir(job):
if attr_name.endswith("_exec") or exec_target_name in attr_name:
@ -702,13 +702,14 @@ export DATABUILD_BUILD_REQUEST_ID=$(python3 -c "import uuid; print(uuid.uuid4())
runfiles = ctx.runfiles(
files = [ctx.executable.cli_wrapper, ctx.executable.lookup] + configure_executables + execute_executables + all_job_files,
).merge(ctx.attr.cli_wrapper.default_runfiles).merge(ctx.attr.lookup.default_runfiles).merge(
ctx.attr._bash_runfiles.default_runfiles
ctx.attr._bash_runfiles.default_runfiles,
)
# Merge runfiles from all configure targets and job targets
for job in ctx.attr.jobs:
configure_target = job[DataBuildJobInfo].configure
runfiles = runfiles.merge(configure_target.default_runfiles)
# Also merge the job's own runfiles which should include the .exec target
runfiles = runfiles.merge(job.default_runfiles)
@ -761,8 +762,7 @@ def _databuild_graph_service_impl(ctx):
# Build job configurations mapping for DATABUILD_CANDIDATE_JOBS
config_paths = {
"//" + job.label.package + ":" + job.label.name:
"$(rlocation _main/" + job[DataBuildJobInfo].configure.files_to_run.executable.short_path + ")"
"//" + job.label.package + ":" + job.label.name: "$(rlocation _main/" + job[DataBuildJobInfo].configure.files_to_run.executable.short_path + ")"
for job in ctx.attr.jobs
}
config_paths_str = "{" + ",".join(['\\"%s\\":\\"%s\\"' % (k, v) for k, v in config_paths.items()]) + "}"
@ -833,8 +833,9 @@ fi
runfiles = ctx.runfiles(
files = [ctx.executable.lookup, ctx.executable._service, ctx.executable.analyze, ctx.executable.exec] + configure_executables + ctx.files._dashboard,
).merge(ctx.attr.lookup.default_runfiles).merge(ctx.attr._service.default_runfiles).merge(
ctx.attr.analyze.default_runfiles).merge(ctx.attr.exec.default_runfiles).merge(
ctx.attr._bash_runfiles.default_runfiles
ctx.attr.analyze.default_runfiles,
).merge(ctx.attr.exec.default_runfiles).merge(
ctx.attr._bash_runfiles.default_runfiles,
).merge_all([job.default_runfiles for job in ctx.attr.jobs])
# Merge runfiles from all configure targets

View file

@ -1,7 +1,7 @@
use super::*;
use crate::event_log::{current_timestamp_nanos, create_build_event};
use crate::orchestration::{BuildOrchestrator, BuildResult};
use crate::service::mermaid_utils;
use crate::mermaid_utils;
use axum::{
extract::{Path, State},
http::StatusCode,
@ -1044,7 +1044,7 @@ pub async fn get_job_metrics(
let started_at: i64 = row[3].parse().unwrap_or(0);
let completed_at: i64 = row[4].parse().unwrap_or(started_at);
let duration_ms: Option<i64> = if completed_at > started_at {
Some((completed_at - started_at))
Some(completed_at - started_at)
} else {
None
};

View file

@ -17,7 +17,6 @@ use tokio::sync::RwLock;
use uuid::Uuid;
pub mod handlers;
pub mod mermaid_utils;
#[derive(Clone)]
pub struct BuildGraphService {

View file

@ -3,6 +3,5 @@
- On build request detail page, show aggregated job results
- Use path based navigation instead of hashbang?
- Build event job links are not encoding job labels properly
- Get mermaid integrated into the build
- Resolve double type system with protobuf and openapi
- Prometheus metrics export