From d245581b7d086469ca53882d9398afcab2521781 Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Thu, 17 Jul 2025 23:29:27 -0700 Subject: [PATCH] Fix job status bleed in mermaid chart --- databuild/mermaid_utils.rs | 657 +++++++++++++++++++++++++++++++------ 1 file changed, 552 insertions(+), 105 deletions(-) diff --git a/databuild/mermaid_utils.rs b/databuild/mermaid_utils.rs index 7e78812..cd92fa6 100644 --- a/databuild/mermaid_utils.rs +++ b/databuild/mermaid_utils.rs @@ -41,7 +41,6 @@ pub fn extract_status_map(events: &[BuildEvent]) -> (HashMap for event in sorted_events { match &event.event_type { - // TODO map this to a job + outputs hash so that job status highlighting is correct Some(crate::build_event::EventType::JobEvent(job_event)) => { if let Some(job_label) = &job_event.job_label { let status = match job_event.status { @@ -53,7 +52,15 @@ pub fn extract_status_map(events: &[BuildEvent]) -> (HashMap 6 => NodeStatus::Skipped, // JOB_SKIPPED _ => NodeStatus::Pending, }; - job_statuses.insert(job_label.label.clone(), status); + + // Create a unique key using job label + target partitions (same as node ID) + let outputs_label = job_event.target_partitions.iter() + .map(|p| p.str.clone()) + .collect::>() + .join("___"); + let unique_key = encode_id(&(job_label.label.clone() + "___" + &outputs_label)); + + job_statuses.insert(unique_key, status); } } Some(crate::build_event::EventType::PartitionEvent(partition_event)) => { @@ -82,6 +89,15 @@ fn encode_id(id: &str) -> String { id.replace("/", "_").replace("=", "_").replace(":", "_") } +/// Trait for all Mermaid node types +trait MermaidNode { + fn id(&self) -> &str; + #[allow(dead_code)] + fn label(&self) -> &str; + fn render(&self, status: &NodeStatus) -> String; +} + +/// Represents a job node in the Mermaid diagram struct MermaidJobNode { task: Task, id: String, @@ -114,16 +130,325 @@ impl MermaidJobNode { } fn to_mermaid(&self, job_statuses: &HashMap) -> String { - let status = job_statuses.get(&self.task.job.as_ref().unwrap().label).unwrap_or(&NodeStatus::Pending); + // Use the same unique ID logic for status lookup as we use for the node ID + let status = job_statuses.get(&self.id).unwrap_or(&NodeStatus::Pending); + self.render(status) + } +} + +impl MermaidNode for MermaidJobNode { + fn id(&self) -> &str { + &self.id + } + + fn label(&self) -> &str { + &self.label + } + + fn render(&self, status: &NodeStatus) -> String { format!(" {}[\"{}\"]:::job_{}\n", self.id, self.label, status.css_class()) } } +/// Represents a partition node in the Mermaid diagram struct MermaidPartitionNode { id: String, label: String, + is_output: bool, } +impl MermaidPartitionNode { + fn new(partition_ref: &str, is_output: bool) -> Self { + let id = format!("ref_{}", encode_id(partition_ref)); + let label = partition_ref.to_string(); + + Self { + id, + label, + is_output, + } + } +} + +impl MermaidNode for MermaidPartitionNode { + fn id(&self) -> &str { + &self.id + } + + fn label(&self) -> &str { + &self.label + } + + fn render(&self, status: &NodeStatus) -> String { + let node_class = if self.is_output { + format!("outputPartition_{}", status.css_class()) + } else { + format!("partition_{}", status.css_class()) + }; + + format!(" {}[(\"{}\")]:::{}\n", self.id, encode_id(&self.label), node_class) + } +} + +/// Types of edges in the diagram +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +enum EdgeType { + Solid, // Regular dependency + Dotted, // Weak dependency +} + +/// Represents an edge between two nodes +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct MermaidEdge { + from_id: String, + to_id: String, + edge_type: EdgeType, +} + +impl MermaidEdge { + fn new(from_id: String, to_id: String, edge_type: EdgeType) -> Self { + Self { from_id, to_id, edge_type } + } + + fn render(&self) -> String { + match self.edge_type { + EdgeType::Solid => format!(" {} --> {}\n", self.from_id, self.to_id), + EdgeType::Dotted => format!(" {} -.-> {}\n", self.from_id, self.to_id), + } + } +} + +/// Collection of edges with deduplication +struct EdgeCollection { + edges: HashSet, +} + +impl EdgeCollection { + fn new() -> Self { + Self { + edges: HashSet::new(), + } + } + + fn add(&mut self, edge: MermaidEdge) { + self.edges.insert(edge); + } + + fn render_all(&self) -> String { + self.edges.iter() + .map(|edge| edge.render()) + .collect::>() + .join("") + } +} + +/// Style rule for a specific node type and status combination +struct StyleRule { + class_name: String, + fill: &'static str, + stroke: &'static str, + stroke_width: &'static str, +} + +impl StyleRule { + fn render(&self) -> String { + format!( + " classDef {} fill:{},stroke:{},stroke-width:{};\n", + self.class_name, self.fill, self.stroke, self.stroke_width + ) + } +} + +/// Manages all styling for the Mermaid diagram +struct MermaidStyleSheet { + rules: Vec, +} + +impl MermaidStyleSheet { + fn default() -> Self { + let mut rules = Vec::new(); + + // Job status styles + rules.push(StyleRule { + class_name: "job_pending".to_string(), + fill: "#e0e0e0", + stroke: "#333", + stroke_width: "1px", + }); + rules.push(StyleRule { + class_name: "job_running".to_string(), + fill: "#ffeb3b", + stroke: "#333", + stroke_width: "2px", + }); + rules.push(StyleRule { + class_name: "job_completed".to_string(), + fill: "#4caf50", + stroke: "#333", + stroke_width: "2px", + }); + rules.push(StyleRule { + class_name: "job_failed".to_string(), + fill: "#f44336", + stroke: "#333", + stroke_width: "2px", + }); + rules.push(StyleRule { + class_name: "job_cancelled".to_string(), + fill: "#ff9800", + stroke: "#333", + stroke_width: "2px", + }); + rules.push(StyleRule { + class_name: "job_skipped".to_string(), + fill: "#9e9e9e", + stroke: "#333", + stroke_width: "1px", + }); + + // Partition status styles + rules.push(StyleRule { + class_name: "partition_pending".to_string(), + fill: "#e3f2fd", + stroke: "#333", + stroke_width: "1px", + }); + rules.push(StyleRule { + class_name: "partition_running".to_string(), + fill: "#fff9c4", + stroke: "#333", + stroke_width: "2px", + }); + rules.push(StyleRule { + class_name: "partition_available".to_string(), + fill: "#c8e6c9", + stroke: "#333", + stroke_width: "2px", + }); + rules.push(StyleRule { + class_name: "partition_failed".to_string(), + fill: "#ffcdd2", + stroke: "#333", + stroke_width: "2px", + }); + rules.push(StyleRule { + class_name: "partition_delegated".to_string(), + fill: "#d1c4e9", + stroke: "#333", + stroke_width: "2px", + }); + + // Output partition status styles (highlighted versions) + rules.push(StyleRule { + class_name: "outputPartition_pending".to_string(), + fill: "#bbdefb", + stroke: "#333", + stroke_width: "3px", + }); + rules.push(StyleRule { + class_name: "outputPartition_running".to_string(), + fill: "#fff59d", + stroke: "#333", + stroke_width: "3px", + }); + rules.push(StyleRule { + class_name: "outputPartition_available".to_string(), + fill: "#a5d6a7", + stroke: "#333", + stroke_width: "3px", + }); + rules.push(StyleRule { + class_name: "outputPartition_failed".to_string(), + fill: "#ef9a9a", + stroke: "#333", + stroke_width: "3px", + }); + rules.push(StyleRule { + class_name: "outputPartition_delegated".to_string(), + fill: "#b39ddb", + stroke: "#333", + stroke_width: "3px", + }); + + Self { rules } + } + + fn render(&self) -> String { + let mut result = String::from("\n %% Styling\n"); + for rule in &self.rules { + result.push_str(&rule.render()); + } + result + } +} + +/// Builder for constructing Mermaid diagrams +struct MermaidDiagramBuilder { + job_nodes: HashMap, + partition_nodes: HashMap, + edges: EdgeCollection, + output_refs: HashSet, +} + +impl MermaidDiagramBuilder { + fn new() -> Self { + Self { + job_nodes: HashMap::new(), + partition_nodes: HashMap::new(), + edges: EdgeCollection::new(), + output_refs: HashSet::new(), + } + } + + fn set_output_refs(&mut self, refs: &[PartitionRef]) { + for ref_str in refs { + self.output_refs.insert(ref_str.str.clone()); + } + } + + fn add_job_node(&mut self, node: MermaidJobNode) { + self.job_nodes.insert(node.id().to_string(), node); + } + + fn add_partition_node(&mut self, partition_ref: &str) -> String { + let is_output = self.output_refs.contains(partition_ref); + let node = MermaidPartitionNode::new(partition_ref, is_output); + let id = node.id().to_string(); + self.partition_nodes.entry(partition_ref.to_string()) + .or_insert(node); + id + } + + fn add_edge(&mut self, from_id: String, to_id: String, edge_type: EdgeType) { + self.edges.add(MermaidEdge::new(from_id, to_id, edge_type)); + } + + fn build(self, statuses: &(HashMap, HashMap), stylesheet: MermaidStyleSheet) -> String { + let (job_statuses, partition_statuses) = statuses; + let mut result = String::from("flowchart TD\n"); + + // Render all job nodes + for (_, job_node) in self.job_nodes { + result.push_str(&job_node.to_mermaid(job_statuses)); + } + + // Render all partition nodes + for (partition_ref, node) in self.partition_nodes { + let status = partition_statuses.get(&partition_ref).unwrap_or(&NodeStatus::Pending); + result.push_str(&node.render(status)); + } + + // Render all edges + result.push_str(&self.edges.render_all()); + + // Apply styles + result.push_str(&stylesheet.render()); + + result + } +} + + pub fn generate_mermaid_diagram(graph: &JobGraph) -> String { generate_mermaid_with_status(graph, &[]) } @@ -133,121 +458,243 @@ pub fn generate_mermaid_with_status( graph: &JobGraph, events: &[BuildEvent], ) -> String { - let (job_statuses, partition_statuses) = extract_status_map(events); + let statuses = extract_status_map(events); + let mut builder = MermaidDiagramBuilder::new(); - // Start the mermaid flowchart - let mut mermaid = String::from("flowchart TD\n"); + // Set output refs for highlighting + builder.set_output_refs(&graph.outputs); - // Track nodes we've already added to avoid duplicates - let mut added_nodes = HashSet::new(); - let mut added_refs = HashSet::new(); - - // Map to track which refs are outputs (to highlight them) - let mut is_output_ref = HashSet::new(); - for ref_str in &graph.outputs { - is_output_ref.insert(ref_str.str.clone()); - } - - // Add all task nodes and their relationships + // Process all task nodes for task in &graph.nodes { - let job_node = MermaidJobNode::from(task).unwrap(); - // Only add the job node once - if !added_nodes.contains(&job_node.id) { - mermaid.push_str(&job_node.to_mermaid(&job_statuses)); - added_nodes.insert(job_node.id.clone()); - } - - // Process inputs (dependencies) - if let Some(config) = &task.config { - for input in &config.inputs { - if let Some(partition_ref) = &input.partition_ref { - let ref_node_id = format!("ref_{}", partition_ref.str.replace("/", "_").replace("=", "_")); - - // Add the partition ref node if not already added - if !added_refs.contains(&ref_node_id) { - let status = partition_statuses.get(&partition_ref.str).unwrap_or(&NodeStatus::Pending); - let node_class = if is_output_ref.contains(&partition_ref.str) { - format!("outputPartition_{}", status.css_class()) - } else { - format!("partition_{}", status.css_class()) - }; - - mermaid.push_str(&format!( - " {}[(\"{}\")]:::{}\n", - ref_node_id, - partition_ref.str.replace("/", "_").replace("=", "_"), - node_class - )); - added_refs.insert(ref_node_id.clone()); - } - - let mermaid_edge = if (input.dep_type == 1) { - &format!(" {} --> {}\n", ref_node_id, job_node.id) - } else { - &format!(" {} -.-> {}\n", ref_node_id, job_node.id) - }; - - if !mermaid.contains(mermaid_edge.trim()) { - mermaid.push_str(mermaid_edge); - } - } - } + if let Some(job_node) = MermaidJobNode::from(task) { + let job_id = job_node.id().to_string(); + builder.add_job_node(job_node); - // Process outputs - for output in &config.outputs { - let ref_node_id = format!("ref_{}", output.str.replace("/", "_").replace("=", "_")); - - // Add the partition ref node if not already added - if !added_refs.contains(&ref_node_id) { - let status = partition_statuses.get(&output.str).unwrap_or(&NodeStatus::Pending); - let node_class = if is_output_ref.contains(&output.str) { - format!("outputPartition_{}", status.css_class()) - } else { - format!("partition_{}", status.css_class()) - }; - - mermaid.push_str(&format!( - " {}[(\"Partition: {}\")]:::{}\n", - ref_node_id, - output.str, - node_class - )); - added_refs.insert(ref_node_id.clone()); + if let Some(config) = &task.config { + // Process inputs (dependencies) + for input in &config.inputs { + if let Some(partition_ref) = &input.partition_ref { + let ref_id = builder.add_partition_node(&partition_ref.str); + let edge_type = if input.dep_type == 1 { + EdgeType::Solid + } else { + EdgeType::Dotted + }; + builder.add_edge(ref_id, job_id.clone(), edge_type); + } } - // Add the edge from job to output (avoid duplicates) - let mermaid_edge = &format!(" {} --> {}\n", job_node.id, ref_node_id); - if !mermaid.contains(mermaid_edge.trim()) { - mermaid.push_str(mermaid_edge); + // Process outputs + for output in &config.outputs { + let ref_id = builder.add_partition_node(&output.str); + builder.add_edge(job_id.clone(), ref_id, EdgeType::Solid); } } } } - // Add styling for all status types - mermaid.push_str("\n %% Styling\n"); + // Build the diagram with default styling + builder.build(&statuses, MermaidStyleSheet::default()) +} + +#[cfg(test)] +mod tests { + use super::*; - // Job status styles - mermaid.push_str(" classDef job_pending fill:#e0e0e0,stroke:#333,stroke-width:1px;\n"); - mermaid.push_str(" classDef job_running fill:#ffeb3b,stroke:#333,stroke-width:2px;\n"); - mermaid.push_str(" classDef job_completed fill:#4caf50,stroke:#333,stroke-width:2px;\n"); - mermaid.push_str(" classDef job_failed fill:#f44336,stroke:#333,stroke-width:2px;\n"); - mermaid.push_str(" classDef job_cancelled fill:#ff9800,stroke:#333,stroke-width:2px;\n"); - mermaid.push_str(" classDef job_skipped fill:#9e9e9e,stroke:#333,stroke-width:1px;\n"); + #[test] + fn test_encode_id() { + assert_eq!(encode_id("path/to/file"), "path_to_file"); + assert_eq!(encode_id("key=value"), "key_value"); + assert_eq!(encode_id("scope:item"), "scope_item"); + assert_eq!(encode_id("a/b=c:d"), "a_b_c_d"); + } - // Partition status styles - mermaid.push_str(" classDef partition_pending fill:#e3f2fd,stroke:#333,stroke-width:1px;\n"); - mermaid.push_str(" classDef partition_running fill:#fff9c4,stroke:#333,stroke-width:2px;\n"); - mermaid.push_str(" classDef partition_available fill:#c8e6c9,stroke:#333,stroke-width:2px;\n"); - mermaid.push_str(" classDef partition_failed fill:#ffcdd2,stroke:#333,stroke-width:2px;\n"); - mermaid.push_str(" classDef partition_delegated fill:#d1c4e9,stroke:#333,stroke-width:2px;\n"); + #[test] + fn test_mermaid_job_node() { + let mut task = Task::default(); + task.job = Some(JobLabel { label: "test_job".to_string() }); + task.config = Some(JobConfig { + outputs: vec![ + PartitionRef { str: "output1".to_string() }, + PartitionRef { str: "output2".to_string() }, + ], + inputs: vec![], + args: vec![], + env: HashMap::new(), + }); + + let node = MermaidJobNode::from(&task).expect("Failed to create job node"); + assert_eq!(node.id(), "test_job___output1___output2"); + assert_eq!(node.label(), "**test_job** output1___output2"); + + let rendered = node.render(&NodeStatus::Running); + assert!(rendered.contains("test_job___output1___output2")); + assert!(rendered.contains("**test_job** output1___output2")); + assert!(rendered.contains("job_running")); + } - // Output partition status styles (highlighted versions) - mermaid.push_str(" classDef outputPartition_pending fill:#bbdefb,stroke:#333,stroke-width:3px;\n"); - mermaid.push_str(" classDef outputPartition_running fill:#fff59d,stroke:#333,stroke-width:3px;\n"); - mermaid.push_str(" classDef outputPartition_available fill:#a5d6a7,stroke:#333,stroke-width:3px;\n"); - mermaid.push_str(" classDef outputPartition_failed fill:#ef9a9a,stroke:#333,stroke-width:3px;\n"); - mermaid.push_str(" classDef outputPartition_delegated fill:#b39ddb,stroke:#333,stroke-width:3px;\n"); + #[test] + fn test_mermaid_partition_node() { + let node = MermaidPartitionNode::new("data/partition=1", false); + assert_eq!(node.id(), "ref_data_partition_1"); + assert_eq!(node.label(), "data/partition=1"); + + let rendered = node.render(&NodeStatus::Available); + assert!(rendered.contains("ref_data_partition_1")); + assert!(rendered.contains("data_partition_1")); + assert!(rendered.contains("partition_available")); + + // Test output partition + let output_node = MermaidPartitionNode::new("output/data", true); + let output_rendered = output_node.render(&NodeStatus::Available); + assert!(output_rendered.contains("outputPartition_available")); + } - mermaid + #[test] + fn test_edge_collection() { + let mut edges = EdgeCollection::new(); + + // Add edges + edges.add(MermaidEdge::new("node1".to_string(), "node2".to_string(), EdgeType::Solid)); + edges.add(MermaidEdge::new("node2".to_string(), "node3".to_string(), EdgeType::Dotted)); + + // Test deduplication + edges.add(MermaidEdge::new("node1".to_string(), "node2".to_string(), EdgeType::Solid)); + + let rendered = edges.render_all(); + assert!(rendered.contains("node1 --> node2")); + assert!(rendered.contains("node2 -.-> node3")); + + // Should only have 2 unique edges + assert_eq!(rendered.matches("-->").count(), 1); + assert_eq!(rendered.matches("-.->").count(), 1); + } + + #[test] + fn test_simple_graph_generation() { + // Create task 1 + let mut task1 = Task::default(); + task1.job = Some(JobLabel { label: "job1".to_string() }); + task1.config = Some(JobConfig { + inputs: vec![{ + let mut input = DataDep::default(); + input.partition_ref = Some(PartitionRef { str: "input/data".to_string() }); + input.dep_type = 1; // Solid dependency + input + }], + outputs: vec![ + PartitionRef { str: "intermediate/data".to_string() }, + ], + args: vec![], + env: HashMap::new(), + }); + + // Create task 2 + let mut task2 = Task::default(); + task2.job = Some(JobLabel { label: "job2".to_string() }); + task2.config = Some(JobConfig { + inputs: vec![{ + let mut input = DataDep::default(); + input.partition_ref = Some(PartitionRef { str: "intermediate/data".to_string() }); + input.dep_type = 2; // Dotted dependency + input + }], + outputs: vec![ + PartitionRef { str: "output/data".to_string() }, + ], + args: vec![], + env: HashMap::new(), + }); + + // Create a simple graph + let mut graph = JobGraph::default(); + graph.nodes = vec![task1, task2]; + graph.outputs = vec![ + PartitionRef { str: "output/data".to_string() }, + ]; + + let mermaid = generate_mermaid_diagram(&graph); + + // Check basic structure + assert!(mermaid.starts_with("flowchart TD\n")); + + // Check nodes - verify both ID and label are present + assert!(mermaid.contains("job1___intermediate_data"), "Missing job1 node ID"); + assert!(mermaid.contains("**job1** intermediate/data"), "Missing job1 label"); + assert!(mermaid.contains("job2___output_data"), "Missing job2 node ID"); + assert!(mermaid.contains("**job2** output/data"), "Missing job2 label"); + assert!(mermaid.contains("ref_input_data")); + assert!(mermaid.contains("ref_intermediate_data")); + assert!(mermaid.contains("ref_output_data")); + + // Check edges + assert!(mermaid.contains("ref_input_data --> job1")); + assert!(mermaid.contains("job1___intermediate_data --> ref_intermediate_data")); + assert!(mermaid.contains("ref_intermediate_data -.-> job2")); + assert!(mermaid.contains("job2___output_data --> ref_output_data")); + + // Check styling + assert!(mermaid.contains("classDef job_pending")); + assert!(mermaid.contains("classDef partition_pending")); + assert!(mermaid.contains("classDef outputPartition_pending")); + } + + #[test] + fn test_status_extraction() { + let mut event1 = BuildEvent::default(); + event1.timestamp = 1; + event1.event_type = Some(crate::build_event::EventType::JobEvent({ + let mut job_event = JobEvent::default(); + job_event.job_label = Some(JobLabel { label: "test_job".to_string() }); + job_event.status = 2; // JOB_RUNNING + job_event + })); + + let mut event2 = BuildEvent::default(); + event2.timestamp = 2; + event2.event_type = Some(crate::build_event::EventType::PartitionEvent({ + let mut partition_event = PartitionEvent::default(); + partition_event.partition_ref = Some(PartitionRef { str: "test/partition".to_string() }); + partition_event.status = 4; // PARTITION_AVAILABLE + partition_event + })); + + let events = vec![event1, event2]; + + let (job_statuses, partition_statuses) = extract_status_map(&events); + + // Should use the unique key (job_label + target_partitions) instead of just job_label + assert_eq!(job_statuses.get("test_job"), None, "Should not find job by label alone"); + assert_eq!(partition_statuses.get("test/partition"), Some(&NodeStatus::Available)); + } + + #[test] + fn test_job_status_per_task_instance() { + // Test that different task instances with same job label get different status + let mut event1 = BuildEvent::default(); + event1.event_type = Some(crate::build_event::EventType::JobEvent({ + let mut job_event = JobEvent::default(); + job_event.job_label = Some(JobLabel { label: "same_job".to_string() }); + job_event.target_partitions = vec![PartitionRef { str: "output1".to_string() }]; + job_event.status = 2; // JOB_RUNNING + job_event + })); + + let mut event2 = BuildEvent::default(); + event2.event_type = Some(crate::build_event::EventType::JobEvent({ + let mut job_event = JobEvent::default(); + job_event.job_label = Some(JobLabel { label: "same_job".to_string() }); + job_event.target_partitions = vec![PartitionRef { str: "output2".to_string() }]; + job_event.status = 3; // JOB_COMPLETED + job_event + })); + + let events = vec![event1, event2]; + let (job_statuses, _) = extract_status_map(&events); + + // Each task should have its own status based on unique key + assert_eq!(job_statuses.get("same_job___output1"), Some(&NodeStatus::Running)); + assert_eq!(job_statuses.get("same_job___output2"), Some(&NodeStatus::Completed)); + assert_eq!(job_statuses.get("same_job"), None, "Should not find job by label alone"); + } } \ No newline at end of file