diff --git a/databuild/BUILD.bazel b/databuild/BUILD.bazel index a808e73..40a146b 100644 --- a/databuild/BUILD.bazel +++ b/databuild/BUILD.bazel @@ -39,12 +39,12 @@ rust_library( "event_log/sqlite.rs", "event_log/stdout.rs", "lib.rs", - "orchestration/mod.rs", + "mermaid_utils.rs", "orchestration/error.rs", "orchestration/events.rs", + "orchestration/mod.rs", "service/handlers.rs", "service/mod.rs", - "service/mermaid_utils.rs", ":generate_databuild_rust", ], edition = "2021", diff --git a/databuild/graph/analyze.rs b/databuild/graph/analyze.rs index 545d7cc..18c8aa7 100644 --- a/databuild/graph/analyze.rs +++ b/databuild/graph/analyze.rs @@ -9,6 +9,7 @@ use clap::{Arg, Command as ClapCommand}; use uuid::Uuid; use databuild::*; use databuild::event_log::{BuildEventLog, create_build_event_log, create_build_event}; +use databuild::mermaid_utils::generate_mermaid_diagram; // Configure a job to produce the desired outputs fn configure(job_label: &str, output_refs: &[String]) -> Result, String> { @@ -384,120 +385,120 @@ async fn plan( } // Generate a Mermaid flowchart diagram from a job graph -fn generate_mermaid_diagram(graph: &JobGraph) -> String { - // Start the mermaid flowchart - let mut mermaid = String::from("flowchart TD\n"); - - // Track nodes we've already added to avoid duplicates - let mut added_nodes = HashSet::new(); - let mut added_refs = HashSet::new(); - - // Map to track which refs are outputs (to highlight them) - let mut is_output_ref = HashSet::new(); - for ref_str in &graph.outputs { - is_output_ref.insert(ref_str.str.clone()); - } - - // Process each task in the graph - for task in &graph.nodes { - // Create a unique ID for this job+outputs combination - let outputs_strs: Vec = task.config.as_ref().unwrap().outputs.iter().map(|o| o.str.clone()).collect(); - let outputs_key = outputs_strs.join("_"); - let mut job_node_id = format!("job_{}", task.job.as_ref().unwrap().label.replace("//", "_")); - job_node_id = job_node_id.replace(":", "_").replace("=", "_").replace("?", "_").replace(" ", "_"); - job_node_id = format!("{}_{}", job_node_id, outputs_key.replace("/", "_").replace("=", "_")); - - // Create a descriptive label that includes both job label and outputs - let job_label = &task.job.as_ref().unwrap().label; - let outputs_label = if !task.config.as_ref().unwrap().outputs.is_empty() { - if task.config.as_ref().unwrap().outputs.len() == 1 { - format!(" [{}]", task.config.as_ref().unwrap().outputs[0].str) - } else { - format!(" [{}, ...]", task.config.as_ref().unwrap().outputs[0].str) - } - } else { - String::new() - }; - - // Add the job node if not already added - if !added_nodes.contains(&job_node_id) { - // Represent job as a process shape with escaped label - mermaid.push_str(&format!( - " {}[\"`**{}** {}`\"]:::job\n", - job_node_id, - job_label, - outputs_label - )); - added_nodes.insert(job_node_id.clone()); - } - - // Process inputs (dependencies) - for input in &task.config.as_ref().unwrap().inputs { - let ref_node_id = format!("ref_{}", input.partition_ref.as_ref().unwrap().str.replace("/", "_").replace("=", "_")); - - // Add the partition ref node if not already added - if !added_refs.contains(&ref_node_id) { - let node_class = if is_output_ref.contains(&input.partition_ref.as_ref().unwrap().str) { - "outputPartition" - } else { - "partition" - }; - - // Represent partition as a cylinder - mermaid.push_str(&format!( - " {}[(\"{}\")]:::{}\n", - ref_node_id, - input.partition_ref.as_ref().unwrap().str.replace("/", "_").replace("=", "_"), - node_class - )); - added_refs.insert(ref_node_id.clone()); - } - - // Add the edge from input to job - if input.dep_type == 1 { // MATERIALIZE = 1 - // Solid line for materialize dependencies - mermaid.push_str(&format!(" {} --> {}\n", ref_node_id, job_node_id)); - } else { - // Dashed line for query dependencies - mermaid.push_str(&format!(" {} -.-> {}\n", ref_node_id, job_node_id)); - } - } - - // Process outputs - for output in &task.config.as_ref().unwrap().outputs { - let ref_node_id = format!("ref_{}", output.str.replace("/", "_").replace("=", "_")); - - // Add the partition ref node if not already added - if !added_refs.contains(&ref_node_id) { - let node_class = if is_output_ref.contains(&output.str) { - "outputPartition" - } else { - "partition" - }; - - // Represent partition as a cylinder - mermaid.push_str(&format!( - " {}[(\"Partition: {}\")]:::{}\n", - ref_node_id, - output.str, - node_class - )); - added_refs.insert(ref_node_id.clone()); - } - - // Add the edge from job to output - mermaid.push_str(&format!(" {} --> {}\n", job_node_id, ref_node_id)); - } - } - - // Add styling - mermaid.push_str("\n %% Styling\n"); - mermaid.push_str(" classDef job fill:#f9f,stroke:#333,stroke-width:1px;\n"); - mermaid.push_str(" classDef partition fill:#bbf,stroke:#333,stroke-width:1px;\n"); - mermaid.push_str(" classDef outputPartition fill:#bfb,stroke:#333,stroke-width:2px;\n"); - - mermaid -} +// fn generate_mermaid_diagram(graph: &JobGraph) -> String { +// // Start the mermaid flowchart +// let mut mermaid = String::from("flowchart TD\n"); +// +// // Track nodes we've already added to avoid duplicates +// let mut added_nodes = HashSet::new(); +// let mut added_refs = HashSet::new(); +// +// // Map to track which refs are outputs (to highlight them) +// let mut is_output_ref = HashSet::new(); +// for ref_str in &graph.outputs { +// is_output_ref.insert(ref_str.str.clone()); +// } +// +// // Process each task in the graph +// for task in &graph.nodes { +// // Create a unique ID for this job+outputs combination +// let outputs_strs: Vec = task.config.as_ref().unwrap().outputs.iter().map(|o| o.str.clone()).collect(); +// let outputs_key = outputs_strs.join("_"); +// let mut job_node_id = format!("job_{}", task.job.as_ref().unwrap().label.replace("//", "_")); +// job_node_id = job_node_id.replace(":", "_").replace("=", "_").replace("?", "_").replace(" ", "_"); +// job_node_id = format!("{}_{}", job_node_id, outputs_key.replace("/", "_").replace("=", "_")); +// +// // Create a descriptive label that includes both job label and outputs +// let job_label = &task.job.as_ref().unwrap().label; +// let outputs_label = if !task.config.as_ref().unwrap().outputs.is_empty() { +// if task.config.as_ref().unwrap().outputs.len() == 1 { +// format!(" [{}]", task.config.as_ref().unwrap().outputs[0].str) +// } else { +// format!(" [{}, ...]", task.config.as_ref().unwrap().outputs[0].str) +// } +// } else { +// String::new() +// }; +// +// // Add the job node if not already added +// if !added_nodes.contains(&job_node_id) { +// // Represent job as a process shape with escaped label +// mermaid.push_str(&format!( +// " {}[\"`**{}** {}`\"]:::job\n", +// job_node_id, +// job_label, +// outputs_label +// )); +// added_nodes.insert(job_node_id.clone()); +// } +// +// // Process inputs (dependencies) +// for input in &task.config.as_ref().unwrap().inputs { +// let ref_node_id = format!("ref_{}", input.partition_ref.as_ref().unwrap().str.replace("/", "_").replace("=", "_")); +// +// // Add the partition ref node if not already added +// if !added_refs.contains(&ref_node_id) { +// let node_class = if is_output_ref.contains(&input.partition_ref.as_ref().unwrap().str) { +// "outputPartition" +// } else { +// "partition" +// }; +// +// // Represent partition as a cylinder +// mermaid.push_str(&format!( +// " {}[(\"{}\")]:::{}\n", +// ref_node_id, +// input.partition_ref.as_ref().unwrap().str.replace("/", "_").replace("=", "_"), +// node_class +// )); +// added_refs.insert(ref_node_id.clone()); +// } +// +// // Add the edge from input to job +// if input.dep_type == 1 { // MATERIALIZE = 1 +// // Solid line for materialize dependencies +// mermaid.push_str(&format!(" {} --> {}\n", ref_node_id, job_node_id)); +// } else { +// // Dashed line for query dependencies +// mermaid.push_str(&format!(" {} -.-> {}\n", ref_node_id, job_node_id)); +// } +// } +// +// // Process outputs +// for output in &task.config.as_ref().unwrap().outputs { +// let ref_node_id = format!("ref_{}", output.str.replace("/", "_").replace("=", "_")); +// +// // Add the partition ref node if not already added +// if !added_refs.contains(&ref_node_id) { +// let node_class = if is_output_ref.contains(&output.str) { +// "outputPartition" +// } else { +// "partition" +// }; +// +// // Represent partition as a cylinder +// mermaid.push_str(&format!( +// " {}[(\"Partition: {}\")]:::{}\n", +// ref_node_id, +// output.str, +// node_class +// )); +// added_refs.insert(ref_node_id.clone()); +// } +// +// // Add the edge from job to output +// mermaid.push_str(&format!(" {} --> {}\n", job_node_id, ref_node_id)); +// } +// } +// +// // Add styling +// mermaid.push_str("\n %% Styling\n"); +// mermaid.push_str(" classDef job fill:#f9f,stroke:#333,stroke-width:1px;\n"); +// mermaid.push_str(" classDef partition fill:#bbf,stroke:#333,stroke-width:1px;\n"); +// mermaid.push_str(" classDef outputPartition fill:#bfb,stroke:#333,stroke-width:2px;\n"); +// +// mermaid +// } #[tokio::main] async fn main() { diff --git a/databuild/lib.rs b/databuild/lib.rs index 3c6ace4..e901799 100644 --- a/databuild/lib.rs +++ b/databuild/lib.rs @@ -10,6 +10,8 @@ pub mod orchestration; // Service module pub mod service; +pub mod mermaid_utils; + // Re-export commonly used types from event_log pub use event_log::{BuildEventLog, BuildEventLogError, create_build_event_log}; diff --git a/databuild/service/mermaid_utils.rs b/databuild/mermaid_utils.rs similarity index 78% rename from databuild/service/mermaid_utils.rs rename to databuild/mermaid_utils.rs index 79f9d2a..7e78812 100644 --- a/databuild/service/mermaid_utils.rs +++ b/databuild/mermaid_utils.rs @@ -41,6 +41,7 @@ pub fn extract_status_map(events: &[BuildEvent]) -> (HashMap for event in sorted_events { match &event.event_type { + // TODO map this to a job + outputs hash so that job status highlighting is correct Some(crate::build_event::EventType::JobEvent(job_event)) => { if let Some(job_label) = &job_event.job_label { let status = match job_event.status { @@ -76,6 +77,57 @@ pub fn extract_status_map(events: &[BuildEvent]) -> (HashMap (job_statuses, partition_statuses) } +/// Encodes ID for safe usage in mermaid graph +fn encode_id(id: &str) -> String { + id.replace("/", "_").replace("=", "_").replace(":", "_") +} + +struct MermaidJobNode { + task: Task, + id: String, + label: String, +} + +impl MermaidJobNode { + fn from(task: &Task) -> Option { + let job_label: String = match &task.job { + Some(job) => job.label.clone(), + None => return None, + }; + + let outputs_label: String = match &task.config { + Some(config) => config.outputs.iter() + .map(|o| o.str.clone()) + .collect::>() + .join("___"), + None => String::new(), + }; + + let id = encode_id(&(job_label.clone() + "___" + &outputs_label)); + let label = format!("**{}** {}", job_label, outputs_label); + + Some(MermaidJobNode { + task: task.clone(), + id, + label, + }) + } + + fn to_mermaid(&self, job_statuses: &HashMap) -> String { + let status = job_statuses.get(&self.task.job.as_ref().unwrap().label).unwrap_or(&NodeStatus::Pending); + format!(" {}[\"{}\"]:::job_{}\n", self.id, self.label, status.css_class()) + } +} + +struct MermaidPartitionNode { + id: String, + label: String, +} + +pub fn generate_mermaid_diagram(graph: &JobGraph) -> String { + generate_mermaid_with_status(graph, &[]) +} + /// Generate a mermaid diagram for a job graph with current status annotations pub fn generate_mermaid_with_status( graph: &JobGraph, @@ -89,7 +141,6 @@ pub fn generate_mermaid_with_status( // Track nodes we've already added to avoid duplicates let mut added_nodes = HashSet::new(); let mut added_refs = HashSet::new(); - let mut added_edges = HashSet::new(); // Map to track which refs are outputs (to highlight them) let mut is_output_ref = HashSet::new(); @@ -99,34 +150,11 @@ pub fn generate_mermaid_with_status( // Add all task nodes and their relationships for task in &graph.nodes { - let job_label = match &task.job { - Some(label) => &label.label, - None => continue, - }; - - let job_node_id = format!("job_{}", job_label.replace("/", "_").replace(":", "_")); - + let job_node = MermaidJobNode::from(task).unwrap(); // Only add the job node once - if !added_nodes.contains(&job_node_id) { - let outputs_label = match &task.config { - Some(config) => config.outputs.iter() - .map(|o| o.str.clone()) - .collect::>() - .join(", "), - None => String::new(), - }; - - // Get the job status - let status = job_statuses.get(job_label).unwrap_or(&NodeStatus::Pending); - - mermaid.push_str(&format!( - " {}[\"`**{}** {}`\"]:::job_{}\n", - job_node_id, - job_label, - outputs_label, - status.css_class() - )); - added_nodes.insert(job_node_id.clone()); + if !added_nodes.contains(&job_node.id) { + mermaid.push_str(&job_node.to_mermaid(&job_statuses)); + added_nodes.insert(job_node.id.clone()); } // Process inputs (dependencies) @@ -152,22 +180,15 @@ pub fn generate_mermaid_with_status( )); added_refs.insert(ref_node_id.clone()); } - - // Add the edge from input to job (avoid duplicates) - let edge_key = if input.dep_type == 1 { // MATERIALIZE = 1 - format!("{}-->{}", ref_node_id, job_node_id) + + let mermaid_edge = if (input.dep_type == 1) { + &format!(" {} --> {}\n", ref_node_id, job_node.id) } else { - format!("{}-.->{}", ref_node_id, job_node_id) + &format!(" {} -.-> {}\n", ref_node_id, job_node.id) }; - - if !added_edges.contains(&edge_key) { - if input.dep_type == 1 { // MATERIALIZE = 1 - mermaid.push_str(&format!(" {} --> {}\n", ref_node_id, job_node_id)); - } else { - // Dashed line for query dependencies - mermaid.push_str(&format!(" {} -.-> {}\n", ref_node_id, job_node_id)); - } - added_edges.insert(edge_key); + + if !mermaid.contains(mermaid_edge.trim()) { + mermaid.push_str(mermaid_edge); } } } @@ -195,10 +216,9 @@ pub fn generate_mermaid_with_status( } // Add the edge from job to output (avoid duplicates) - let edge_key = format!("{}-->{}", job_node_id, ref_node_id); - if !added_edges.contains(&edge_key) { - mermaid.push_str(&format!(" {} --> {}\n", job_node_id, ref_node_id)); - added_edges.insert(edge_key); + let mermaid_edge = &format!(" {} --> {}\n", job_node.id, ref_node_id); + if !mermaid.contains(mermaid_edge.trim()) { + mermaid.push_str(mermaid_edge); } } } diff --git a/databuild/rules.bzl b/databuild/rules.bzl index 605264f..70b9fe8 100644 --- a/databuild/rules.bzl +++ b/databuild/rules.bzl @@ -1,5 +1,5 @@ -load("@rules_oci//oci:defs.bzl", "oci_image", "oci_load") load("@aspect_bazel_lib//lib:tar.bzl", "tar") +load("@rules_oci//oci:defs.bzl", "oci_image", "oci_load") RUNFILES_PREFIX = """ # ================= BEGIN RUNFILES INIT ================= @@ -233,13 +233,13 @@ _databuild_job_rule = rule( def databuild_graph(name, jobs, lookup, visibility = None): """Creates a databuild graph target. - + Args: name: Name of the graph target jobs: List of job targets lookup: Job lookup binary visibility: Visibility specification - + Note: Build event logging is configured via the DATABUILD_BUILD_EVENT_LOG environment variable. """ _databuild_graph_lookup( @@ -272,6 +272,7 @@ def databuild_graph(name, jobs, lookup, visibility = None): graph_label = "//%s:%s" % (native.package_name(), name), visibility = visibility, ) + # Build deployment targets (renamed for hierarchical namespacing) tar( name = "%s.build.tar" % name, @@ -291,7 +292,7 @@ def databuild_graph(name, jobs, lookup, visibility = None): visibility = visibility, repo_tags = ["databuild_%s_build:latest" % name], ) - + # Service targets _databuild_graph_service( name = "%s.service" % name, @@ -321,7 +322,6 @@ def databuild_graph(name, jobs, lookup, visibility = None): repo_tags = ["databuild_%s_service:latest" % name], ) - # TODO there feels like a lot of boilerplate around wrapping a target with a script - can this be simplified? def _databuild_graph_lookup_impl(ctx): script = ctx.actions.declare_file(ctx.label.name) @@ -375,14 +375,13 @@ def _databuild_graph_analyze_impl(ctx): script = ctx.actions.declare_file(ctx.label.name) config_paths = { - "//" + job.label.package + ":" +job.label.name: - "$(rlocation _main/" + job[DataBuildJobInfo].configure.files_to_run.executable.short_path + ")" + "//" + job.label.package + ":" + job.label.name: "$(rlocation _main/" + job[DataBuildJobInfo].configure.files_to_run.executable.short_path + ")" for job in ctx.attr.jobs } config_paths_str = "{" + ",".join(['\\"%s\\":\\"%s\\"' % (k, v) for k, v in config_paths.items()]) + "}" candidate_job_env_var = "'" + ",".join([ - "//" + target.label.package + ":" +target.label.name + "//" + target.label.package + ":" + target.label.name for target in ctx.attr.jobs ]) + "'" @@ -417,7 +416,7 @@ export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path}) runfiles = ctx.runfiles( files = [ctx.executable.lookup, ctx.executable._analyze] + configure_executables, ).merge(ctx.attr.lookup.default_runfiles).merge(ctx.attr._analyze.default_runfiles).merge( - ctx.attr._bash_runfiles.default_runfiles + ctx.attr._bash_runfiles.default_runfiles, ).merge_all([job.default_runfiles for job in ctx.attr.jobs]) # Merge runfiles from all configure targets @@ -432,7 +431,6 @@ export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path}) ), ] - _databuild_graph_analyze = rule( implementation = _databuild_graph_analyze_impl, attrs = { @@ -467,14 +465,13 @@ def _databuild_graph_mermaid_impl(ctx): script = ctx.actions.declare_file(ctx.label.name) config_paths = { - "//" + job.label.package + ":" +job.label.name: - "$(rlocation _main/" + job[DataBuildJobInfo].configure.files_to_run.executable.short_path + ")" + "//" + job.label.package + ":" + job.label.name: "$(rlocation _main/" + job[DataBuildJobInfo].configure.files_to_run.executable.short_path + ")" for job in ctx.attr.jobs } config_paths_str = "{" + ",".join(['\\"%s\\":\\"%s\\"' % (k, v) for k, v in config_paths.items()]) + "}" candidate_job_env_var = "'" + ",".join([ - "//" + target.label.package + ":" +target.label.name + "//" + target.label.package + ":" + target.label.name for target in ctx.attr.jobs ]) + "'" @@ -509,7 +506,7 @@ export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path}) runfiles = ctx.runfiles( files = [ctx.executable.lookup, ctx.executable._analyze] + configure_executables, ).merge(ctx.attr.lookup.default_runfiles).merge(ctx.attr._analyze.default_runfiles).merge( - ctx.attr._bash_runfiles.default_runfiles + ctx.attr._bash_runfiles.default_runfiles, ).merge_all([job.default_runfiles for job in ctx.attr.jobs]) # Merge runfiles from all configure targets @@ -521,7 +518,7 @@ export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path}) DefaultInfo( executable = script, runfiles = runfiles, - ) + ), ] _databuild_graph_mermaid = rule( @@ -580,7 +577,7 @@ def _databuild_graph_exec_impl(ctx): runfiles = ctx.runfiles( files = [ctx.executable._execute] + execute_executables, ).merge(ctx.attr._execute.default_runfiles).merge( - ctx.attr._bash_runfiles.default_runfiles + ctx.attr._bash_runfiles.default_runfiles, ).merge_all([job.default_runfiles for job in ctx.attr.jobs]) # Merge runfiles from all execute targets @@ -615,7 +612,7 @@ _databuild_graph_exec = rule( default = "@databuild//databuild/graph:execute", executable = True, cfg = "target", - ) + ), }, executable = True, ) @@ -632,21 +629,23 @@ DataBuildGraphInfo = provider( def _databuild_graph_build_impl(ctx): """Wraps the DataBuild CLI wrapper in a shell script.""" script = ctx.actions.declare_file(ctx.label.name) - + # Build DATABUILD_CANDIDATE_JOBS JSON string with runtime rlocation resolution candidate_jobs_script_lines = ["CANDIDATE_JOBS_JSON=\"{\""] for i, job in enumerate(ctx.attr.jobs): - job_label = "//" + job.label.package + ":" + job.label.name + job_label = "//" + job.label.package + ":" + job.label.name configure_path = job[DataBuildJobInfo].configure.files_to_run.executable.short_path separator = "," if i < len(ctx.attr.jobs) - 1 else "" candidate_jobs_script_lines.append( 'CANDIDATE_JOBS_JSON="${CANDIDATE_JOBS_JSON}\\"%s\\":\\"$(rlocation _main/%s)\\"%s"' % ( - job_label, configure_path, separator - ) + job_label, + configure_path, + separator, + ), ) candidate_jobs_script_lines.append('CANDIDATE_JOBS_JSON="${CANDIDATE_JOBS_JSON}}"') candidate_jobs_script = "\n".join(candidate_jobs_script_lines) - + script_content = RUNFILES_PREFIX + """ # Build DATABUILD_CANDIDATE_JOBS dynamically with proper rlocation resolution %s @@ -663,21 +662,21 @@ export DATABUILD_BUILD_REQUEST_ID=$(python3 -c "import uuid; print(uuid.uuid4()) """ % ( candidate_jobs_script, ctx.attr.lookup.files_to_run.executable.short_path, - ctx.attr.graph_label + ctx.attr.graph_label, ) - + ctx.actions.write( output = script, is_executable = True, content = script_content, ) - # Gather the configure and execute executables + # Gather the configure and execute executables configure_executables = [ job[DataBuildJobInfo].configure.files_to_run.executable for job in ctx.attr.jobs ] - + # Get the execute targets - these are the .exec files that need to be in runfiles execute_executables = [] for job in ctx.attr.jobs: @@ -685,6 +684,7 @@ export DATABUILD_BUILD_REQUEST_ID=$(python3 -c "import uuid; print(uuid.uuid4()) # We need to find the .exec target for each job job_name = job.label.name exec_target_name = job_name + ".exec" + # Find the .exec target in the same package for attr_name in dir(job): if attr_name.endswith("_exec") or exec_target_name in attr_name: @@ -692,26 +692,27 @@ export DATABUILD_BUILD_REQUEST_ID=$(python3 -c "import uuid; print(uuid.uuid4()) if exec_target and hasattr(exec_target, "files_to_run"): execute_executables.append(exec_target.files_to_run.executable) break - + # Also check if we can access exec targets directly from job dependencies all_job_files = [] for job in ctx.attr.jobs: if hasattr(job, "default_runfiles") and job.default_runfiles: all_job_files.extend(job.default_runfiles.files.to_list()) - + runfiles = ctx.runfiles( files = [ctx.executable.cli_wrapper, ctx.executable.lookup] + configure_executables + execute_executables + all_job_files, ).merge(ctx.attr.cli_wrapper.default_runfiles).merge(ctx.attr.lookup.default_runfiles).merge( - ctx.attr._bash_runfiles.default_runfiles + ctx.attr._bash_runfiles.default_runfiles, ) - + # Merge runfiles from all configure targets and job targets for job in ctx.attr.jobs: configure_target = job[DataBuildJobInfo].configure runfiles = runfiles.merge(configure_target.default_runfiles) + # Also merge the job's own runfiles which should include the .exec target runfiles = runfiles.merge(job.default_runfiles) - + return [ DefaultInfo( executable = script, @@ -758,19 +759,18 @@ _databuild_graph_build = rule( def _databuild_graph_service_impl(ctx): """Implementation of the service target that runs the Build Graph Service.""" script = ctx.actions.declare_file(ctx.label.name) - + # Build job configurations mapping for DATABUILD_CANDIDATE_JOBS config_paths = { - "//" + job.label.package + ":" + job.label.name: - "$(rlocation _main/" + job[DataBuildJobInfo].configure.files_to_run.executable.short_path + ")" + "//" + job.label.package + ":" + job.label.name: "$(rlocation _main/" + job[DataBuildJobInfo].configure.files_to_run.executable.short_path + ")" for job in ctx.attr.jobs } config_paths_str = "{" + ",".join(['\\"%s\\":\\"%s\\"' % (k, v) for k, v in config_paths.items()]) + "}" - + # Default service configuration default_port = "8080" default_db = "sqlite:///tmp/%s.db" % ctx.label.name.replace(".", "_") - + env_setup = """ export DATABUILD_CANDIDATE_JOBS="{candidate_jobs}" export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path}) @@ -782,8 +782,8 @@ export DATABUILD_EXECUTE_BINARY=$(rlocation _main/{exec_path}) analyze_path = ctx.attr.analyze.files_to_run.executable.short_path, exec_path = ctx.attr.exec.files_to_run.executable.short_path, ) - - # Generate a custom script instead of using the template to handle the external binary correctly + + # Generate a custom script instead of using the template to handle the external binary correctly script_content = RUNFILES_PREFIX + env_setup + """ EXECUTABLE_BINARY="$(rlocation "databuild+/databuild/build_graph_service")" @@ -817,31 +817,32 @@ fi db = default_db, port = default_port, ) - + ctx.actions.write( output = script, content = script_content, is_executable = True, ) - + # Gather all dependencies for runfiles configure_executables = [ job[DataBuildJobInfo].configure.files_to_run.executable for job in ctx.attr.jobs ] - + runfiles = ctx.runfiles( files = [ctx.executable.lookup, ctx.executable._service, ctx.executable.analyze, ctx.executable.exec] + configure_executables + ctx.files._dashboard, ).merge(ctx.attr.lookup.default_runfiles).merge(ctx.attr._service.default_runfiles).merge( - ctx.attr.analyze.default_runfiles).merge(ctx.attr.exec.default_runfiles).merge( - ctx.attr._bash_runfiles.default_runfiles + ctx.attr.analyze.default_runfiles, + ).merge(ctx.attr.exec.default_runfiles).merge( + ctx.attr._bash_runfiles.default_runfiles, ).merge_all([job.default_runfiles for job in ctx.attr.jobs]) - + # Merge runfiles from all configure targets for job in ctx.attr.jobs: configure_target = job[DataBuildJobInfo].configure runfiles = runfiles.merge(configure_target.default_runfiles) - + return [ DefaultInfo( executable = script, @@ -869,7 +870,7 @@ _databuild_graph_service = rule( cfg = "target", ), "exec": attr.label( - doc = "Target that implements the graph execution logic", + doc = "Target that implements the graph execution logic", mandatory = True, executable = True, cfg = "target", diff --git a/databuild/service/handlers.rs b/databuild/service/handlers.rs index 6c3d387..27cb1ac 100644 --- a/databuild/service/handlers.rs +++ b/databuild/service/handlers.rs @@ -1,7 +1,7 @@ use super::*; use crate::event_log::{current_timestamp_nanos, create_build_event}; use crate::orchestration::{BuildOrchestrator, BuildResult}; -use crate::service::mermaid_utils; +use crate::mermaid_utils; use axum::{ extract::{Path, State}, http::StatusCode, @@ -1044,7 +1044,7 @@ pub async fn get_job_metrics( let started_at: i64 = row[3].parse().unwrap_or(0); let completed_at: i64 = row[4].parse().unwrap_or(started_at); let duration_ms: Option = if completed_at > started_at { - Some((completed_at - started_at)) + Some(completed_at - started_at) } else { None }; diff --git a/databuild/service/mod.rs b/databuild/service/mod.rs index 6d7f5f6..2060feb 100644 --- a/databuild/service/mod.rs +++ b/databuild/service/mod.rs @@ -17,7 +17,6 @@ use tokio::sync::RwLock; use uuid::Uuid; pub mod handlers; -pub mod mermaid_utils; #[derive(Clone)] pub struct BuildGraphService { diff --git a/plans/todo.md b/plans/todo.md index 36ca87c..f63abd5 100644 --- a/plans/todo.md +++ b/plans/todo.md @@ -3,6 +3,5 @@ - On build request detail page, show aggregated job results - Use path based navigation instead of hashbang? - Build event job links are not encoding job labels properly -- Get mermaid integrated into the build - Resolve double type system with protobuf and openapi - +- Prometheus metrics export