From 52e4d2e9d464b568e93ecf57a0aeb48c55287556 Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Fri, 25 Apr 2025 13:20:55 -0700 Subject: [PATCH] mermaid wip --- graph/analyze.go | 132 ++++++++++++++++++++++++++++++++++++++++------- rules.bzl | 97 ++++++++++++++++++++++++++++++++++ 2 files changed, 209 insertions(+), 20 deletions(-) diff --git a/graph/analyze.go b/graph/analyze.go index 59f3f4c..4aa6cae 100644 --- a/graph/analyze.go +++ b/graph/analyze.go @@ -62,28 +62,28 @@ func jobLabelToCfgPath(jobLabel string) string { // configure configures the specified job to produce the desired outputs func configure(jobLabel string, outputRefs []string) ([]Task, error) { - candidateJobsStr := os.Getenv("DATABUILD_CANDIDATE_JOBS") - var jobPathMap map[string]string - if err := json.Unmarshal([]byte(candidateJobsStr), &jobPathMap); err != nil { - return nil, fmt.Errorf("failed to parse DATABUILD_CANDIDATE_JOBS: %v", err) - } + candidateJobsStr := os.Getenv("DATABUILD_CANDIDATE_JOBS") + var jobPathMap map[string]string + if err := json.Unmarshal([]byte(candidateJobsStr), &jobPathMap); err != nil { + return nil, fmt.Errorf("failed to parse DATABUILD_CANDIDATE_JOBS: %v", err) + } - // Look up the executable path for this job - execPath, ok := jobPathMap[jobLabel] - if !ok { - return nil, fmt.Errorf("job %s is not a candidate job", jobLabel) - } + // Look up the executable path for this job + execPath, ok := jobPathMap[jobLabel] + if !ok { + return nil, fmt.Errorf("job %s is not a candidate job", jobLabel) + } - // Check if executable exists - if _, err := os.Stat(execPath); err != nil { - if os.IsNotExist(err) { - return nil, fmt.Errorf("executable not found at path: %s", execPath) - } - return nil, fmt.Errorf("error checking executable: %v", err) - } + // Check if executable exists + if _, err := os.Stat(execPath); err != nil { + if os.IsNotExist(err) { + return nil, fmt.Errorf("executable not found at path: %s", execPath) + } + return nil, fmt.Errorf("error checking executable: %v", err) + } log.Printf("Executing job configuration: %s %v", execPath, outputRefs) - cmd := exec.Command(execPath, outputRefs...) + cmd := exec.Command(execPath, outputRefs...) var stdout, stderr strings.Builder cmd.Stdout = &stdout @@ -155,7 +155,7 @@ func configureParallel(jobRefs map[string][]string, numWorkers int) ([]Task, err tasksChan := make(chan []Task, len(jobRefs)) errorChan := make(chan error, len(jobRefs)) jobsChan := make(chan struct { - jobLabel string + jobLabel string producedRefs []string }, len(jobRefs)) @@ -166,7 +166,7 @@ func configureParallel(jobRefs map[string][]string, numWorkers int) ([]Task, err // Fill the jobs channel for jobLabel, producedRefs := range jobRefs { jobsChan <- struct { - jobLabel string + jobLabel string producedRefs []string }{jobLabel, producedRefs} } @@ -319,6 +319,85 @@ func plan(outputRefs []string) (*JobGraph, error) { } } +// generateMermaidDiagram generates a Mermaid flowchart diagram from a job graph +func generateMermaidDiagram(graph *JobGraph) string { + // Start the mermaid flowchart + mermaid := "flowchart TD\n" + + // Track nodes we've already added to avoid duplicates + addedNodes := make(map[string]bool) + addedRefs := make(map[string]bool) + + // Map to track which refs are outputs (to highlight them) + isOutputRef := make(map[string]bool) + for _, ref := range graph.Outputs { + isOutputRef[ref] = true + } + + // Process each task in the graph + for _, task := range graph.Nodes { + jobNodeId := "job_" + strings.Replace(task.JobLabel, "//", "_", -1) + jobNodeId = strings.Replace(jobNodeId, ":", "_", -1) + + // Add the job node if not already added + if !addedNodes[jobNodeId] { + // Represent job as a process shape + mermaid += fmt.Sprintf(" %s[\"%s\"]:::job\n", jobNodeId, task.JobLabel) + addedNodes[jobNodeId] = true + } + + // Process inputs (dependencies) + for _, input := range task.Config.Inputs { + refNodeId := "ref_" + strings.Replace(input.Ref, "/", "_", -1) + + // Add the partition ref node if not already added + if !addedRefs[refNodeId] { + // Represent partition as a cylinder + mermaid += fmt.Sprintf(" %s[(Partition: %s)]:::partition\n", refNodeId, input.Ref) + addedRefs[refNodeId] = true + } + + // Add the edge from input to job + if input.DepType == Materialize { + // Solid line for materialize dependencies + mermaid += fmt.Sprintf(" %s --> %s\n", refNodeId, jobNodeId) + } else { + // Dashed line for query dependencies + mermaid += fmt.Sprintf(" %s -.-> %s\n", refNodeId, jobNodeId) + } + } + + // Process outputs + for _, output := range task.Config.Outputs { + refNodeId := "ref_" + strings.Replace(output, "/", "_", -1) + + // Add the partition ref node if not already added + if !addedRefs[refNodeId] { + // Represent partition as a cylinder + mermaid += fmt.Sprintf(" %s[(Partition: %s)]:::partition\n", refNodeId, output) + addedRefs[refNodeId] = true + } + + // Add the edge from job to output + mermaid += fmt.Sprintf(" %s --> %s\n", jobNodeId, refNodeId) + } + } + + // Add styling + mermaid += "\n %% Styling\n" + mermaid += " classDef job fill:#f9f,stroke:#333,stroke-width:1px;\n" + mermaid += " classDef partition fill:#bbf,stroke:#333,stroke-width:1px;\n" + mermaid += " classDef outputPartition fill:#bfb,stroke:#333,stroke-width:2px;\n" + + // Apply output styling to output refs + for ref := range isOutputRef { + refNodeId := "ref_" + strings.Replace(ref, "/", "_", -1) + mermaid += fmt.Sprintf(" class %s outputPartition;\n", refNodeId) + } + + return mermaid +} + func main() { mode := os.Getenv("DATABUILD_MODE") log.Printf("Starting analyze.go in mode: %s", mode) @@ -359,6 +438,19 @@ func main() { } log.Printf("Successfully completed lookup for %d output refs with %d job mappings", len(outputRefs), len(result)) fmt.Println(string(jsonData)) + } else if mode == "mermaid" { + // Get output refs from command line arguments + outputRefs := os.Args[1:] + graph, err := plan(outputRefs) + if err != nil { + fmt.Fprintf(os.Stderr, "Error: %s\n", err) + os.Exit(1) + } + + // Generate and output the mermaid diagram + mermaidDiagram := generateMermaidDiagram(graph) + fmt.Println(mermaidDiagram) + log.Printf("Successfully generated mermaid diagram for %d nodes", len(graph.Nodes)) } else if mode == "import_test" { log.Printf("Running in import_test mode") fmt.Println("ok :)") diff --git a/rules.bzl b/rules.bzl index 5c4a7bc..1517d5c 100644 --- a/rules.bzl +++ b/rules.bzl @@ -247,6 +247,12 @@ def databuild_graph(name, jobs, lookup, visibility = None): jobs = jobs, visibility = visibility, ) + _databuild_graph_mermaid( + name = "%s.mermaid" % name, + lookup = "%s.lookup" % name, + jobs = jobs, + visibility = visibility, + ) _databuild_graph_exec( name = "%s.exec" % name, jobs = jobs, @@ -415,7 +421,98 @@ _databuild_graph_analyze = rule( default = "@databuild//graph:analyze", executable = True, cfg = "target", + ), + }, + executable = True, +) + +def _databuild_graph_mermaid_impl(ctx): + script = ctx.actions.declare_file(ctx.label.name) + + config_paths = { + "//" + job.label.package + ":" +job.label.name: + "$(rlocation _main/" + job[DataBuildJobInfo].configure.files_to_run.executable.short_path + ")" + for job in ctx.attr.jobs + } + config_paths_str = "{" + ",".join(['\\"%s\\":\\"%s\\"' % (k, v) for k, v in config_paths.items()]) + "}" + + candidate_job_env_var = "'" + ",".join([ + "//" + target.label.package + ":" +target.label.name + for target in ctx.attr.jobs + ]) + "'" + + env_setup = """ +export DATABUILD_CANDIDATE_JOBS="{candidate_job_env_var}" +export DATABUILD_MODE=mermaid +export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path}) + """.format( + candidate_job_env_var = config_paths_str, + lookup_path = ctx.attr.lookup.files_to_run.executable.short_path, + ) + + script_prefix = env_setup + + ctx.actions.expand_template( + template = ctx.file._template, + output = script, + substitutions = { + "%{EXECUTABLE_PATH}": ctx.attr._analyze.files_to_run.executable.path, + "%{RUNFILES_PREFIX}": RUNFILES_PREFIX, + "%{PREFIX}": script_prefix, + }, + is_executable = True, + ) + + # Gather the configure executables + configure_executables = [ + job[DataBuildJobInfo].configure.files_to_run.executable + for job in ctx.attr.jobs + ] + + runfiles = ctx.runfiles( + files = [ctx.executable.lookup, ctx.executable._analyze] + configure_executables, + ).merge(ctx.attr.lookup.default_runfiles).merge(ctx.attr._analyze.default_runfiles).merge( + ctx.attr._bash_runfiles.default_runfiles + ).merge_all([job.default_runfiles for job in ctx.attr.jobs]) + + # Merge runfiles from all configure targets + for job in ctx.attr.jobs: + configure_target = job[DataBuildJobInfo].configure + runfiles = runfiles.merge(configure_target.default_runfiles) + + return [ + DefaultInfo( + executable = script, + runfiles = runfiles, ) + ] + +_databuild_graph_mermaid = rule( + implementation = _databuild_graph_mermaid_impl, + attrs = { + "lookup": attr.label( + doc = "Target that implements job lookup for desired partition refs", + mandatory = True, + executable = True, + cfg = "target", + ), + "jobs": attr.label_list( + doc = "The list of jobs that are candidates for building partitions in this databuild graph", + allow_empty = False, + ), + "_template": attr.label( + default = "@databuild//graph:go_analyze_wrapper.sh.tpl", + allow_single_file = True, + ), + "_bash_runfiles": attr.label( + default = Label("@bazel_tools//tools/bash/runfiles"), + allow_files = True, + ), + "_analyze": attr.label( + default = "@databuild//graph:analyze", + executable = True, + cfg = "target", + ), }, executable = True, )