This commit is contained in:
parent
d5ece9ac56
commit
ec6494ee59
1 changed files with 217 additions and 7 deletions
|
|
@ -84,6 +84,20 @@ pub fn extract_status_map(events: &[BuildEvent]) -> (HashMap<String, NodeStatus>
|
||||||
(job_statuses, partition_statuses)
|
(job_statuses, partition_statuses)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Convert NodeStatus to EdgeStatus for edge coloring
|
||||||
|
fn map_node_status_to_edge_status(node_status: &NodeStatus) -> EdgeStatus {
|
||||||
|
match node_status {
|
||||||
|
NodeStatus::Failed => EdgeStatus::Failed,
|
||||||
|
NodeStatus::Running => EdgeStatus::Running,
|
||||||
|
NodeStatus::Completed => EdgeStatus::Completed,
|
||||||
|
NodeStatus::Available => EdgeStatus::Available,
|
||||||
|
NodeStatus::Pending => EdgeStatus::Pending,
|
||||||
|
NodeStatus::Cancelled => EdgeStatus::Failed, // Treat cancelled as failed
|
||||||
|
NodeStatus::Skipped => EdgeStatus::Pending, // Treat skipped as pending
|
||||||
|
NodeStatus::Delegated => EdgeStatus::Available, // Treat delegated as available
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Encodes ID for safe usage in mermaid graph
|
/// Encodes ID for safe usage in mermaid graph
|
||||||
fn encode_id(id: &str) -> String {
|
fn encode_id(id: &str) -> String {
|
||||||
id.replace("/", "_").replace("=", "_").replace(":", "_")
|
id.replace("/", "_").replace("=", "_").replace(":", "_")
|
||||||
|
|
@ -197,6 +211,16 @@ enum EdgeType {
|
||||||
Dotted, // Weak dependency
|
Dotted, // Weak dependency
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Status of an edge for coloring purposes
|
||||||
|
#[derive(Debug, Clone, PartialEq)]
|
||||||
|
enum EdgeStatus {
|
||||||
|
Failed, // Red - critical path issues
|
||||||
|
Running, // Yellow - actively processing
|
||||||
|
Completed, // Green - successfully processed
|
||||||
|
Available, // Light green - data ready
|
||||||
|
Pending, // Gray - waiting/not started
|
||||||
|
}
|
||||||
|
|
||||||
/// Represents an edge between two nodes
|
/// Represents an edge between two nodes
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
struct MermaidEdge {
|
struct MermaidEdge {
|
||||||
|
|
@ -380,6 +404,16 @@ impl MermaidStyleSheet {
|
||||||
}
|
}
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_edge_color(&self, status: &EdgeStatus) -> &'static str {
|
||||||
|
match status {
|
||||||
|
EdgeStatus::Failed => "#ff4444", // Red
|
||||||
|
EdgeStatus::Running => "#ffaa00", // Orange
|
||||||
|
EdgeStatus::Completed => "#44aa44", // Green
|
||||||
|
EdgeStatus::Available => "#88cc88", // Light green
|
||||||
|
EdgeStatus::Pending => "#888888", // Gray
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Builder for constructing Mermaid diagrams
|
/// Builder for constructing Mermaid diagrams
|
||||||
|
|
@ -388,6 +422,7 @@ struct MermaidDiagramBuilder {
|
||||||
partition_nodes: HashMap<String, MermaidPartitionNode>,
|
partition_nodes: HashMap<String, MermaidPartitionNode>,
|
||||||
edges: EdgeCollection,
|
edges: EdgeCollection,
|
||||||
output_refs: HashSet<String>,
|
output_refs: HashSet<String>,
|
||||||
|
edge_count: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MermaidDiagramBuilder {
|
impl MermaidDiagramBuilder {
|
||||||
|
|
@ -397,6 +432,7 @@ impl MermaidDiagramBuilder {
|
||||||
partition_nodes: HashMap::new(),
|
partition_nodes: HashMap::new(),
|
||||||
edges: EdgeCollection::new(),
|
edges: EdgeCollection::new(),
|
||||||
output_refs: HashSet::new(),
|
output_refs: HashSet::new(),
|
||||||
|
edge_count: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -423,7 +459,34 @@ impl MermaidDiagramBuilder {
|
||||||
self.edges.add(MermaidEdge::new(from_id, to_id, edge_type));
|
self.edges.add(MermaidEdge::new(from_id, to_id, edge_type));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build(self, statuses: &(HashMap<String, NodeStatus>, HashMap<String, NodeStatus>), stylesheet: MermaidStyleSheet) -> String {
|
fn add_edge_with_status(&mut self, from_id: String, to_id: String, edge_type: EdgeType,
|
||||||
|
edge_status: EdgeStatus, result: &mut String, stylesheet: &MermaidStyleSheet) {
|
||||||
|
// Create the edge
|
||||||
|
let edge = MermaidEdge::new(from_id, to_id, edge_type);
|
||||||
|
|
||||||
|
// Check if this edge already exists (for deduplication)
|
||||||
|
if self.edges.edges.contains(&edge) {
|
||||||
|
return; // Skip duplicate edge
|
||||||
|
}
|
||||||
|
|
||||||
|
// Render the edge
|
||||||
|
result.push_str(&edge.render());
|
||||||
|
|
||||||
|
// Add edge to collection for deduplication tracking
|
||||||
|
self.edges.add(edge);
|
||||||
|
|
||||||
|
// Immediately render the linkStyle if status is not pending
|
||||||
|
if edge_status != EdgeStatus::Pending {
|
||||||
|
let color = stylesheet.get_edge_color(&edge_status);
|
||||||
|
result.push_str(&format!(" linkStyle {} stroke:{},stroke-width:2px\n",
|
||||||
|
self.edge_count, color));
|
||||||
|
}
|
||||||
|
|
||||||
|
self.edge_count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_with_edges(self, statuses: &(HashMap<String, NodeStatus>, HashMap<String, NodeStatus>),
|
||||||
|
stylesheet: MermaidStyleSheet, edges_content: String) -> String {
|
||||||
let (job_statuses, partition_statuses) = statuses;
|
let (job_statuses, partition_statuses) = statuses;
|
||||||
let mut result = String::from("flowchart TD\n");
|
let mut result = String::from("flowchart TD\n");
|
||||||
|
|
||||||
|
|
@ -438,8 +501,8 @@ impl MermaidDiagramBuilder {
|
||||||
result.push_str(&node.render(status));
|
result.push_str(&node.render(status));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Render all edges
|
// Add the edges content (which includes linkStyle statements)
|
||||||
result.push_str(&self.edges.render_all());
|
result.push_str(&edges_content);
|
||||||
|
|
||||||
// Apply styles
|
// Apply styles
|
||||||
result.push_str(&stylesheet.render());
|
result.push_str(&stylesheet.render());
|
||||||
|
|
@ -459,11 +522,16 @@ pub fn generate_mermaid_with_status(
|
||||||
events: &[BuildEvent],
|
events: &[BuildEvent],
|
||||||
) -> String {
|
) -> String {
|
||||||
let statuses = extract_status_map(events);
|
let statuses = extract_status_map(events);
|
||||||
|
let (job_statuses, partition_statuses) = &statuses;
|
||||||
let mut builder = MermaidDiagramBuilder::new();
|
let mut builder = MermaidDiagramBuilder::new();
|
||||||
|
let stylesheet = MermaidStyleSheet::default();
|
||||||
|
|
||||||
// Set output refs for highlighting
|
// Set output refs for highlighting
|
||||||
builder.set_output_refs(&graph.outputs);
|
builder.set_output_refs(&graph.outputs);
|
||||||
|
|
||||||
|
// String to accumulate edges with their styles
|
||||||
|
let mut edges_content = String::new();
|
||||||
|
|
||||||
// Process all task nodes
|
// Process all task nodes
|
||||||
for task in &graph.nodes {
|
for task in &graph.nodes {
|
||||||
if let Some(job_node) = MermaidJobNode::from(task) {
|
if let Some(job_node) = MermaidJobNode::from(task) {
|
||||||
|
|
@ -480,21 +548,35 @@ pub fn generate_mermaid_with_status(
|
||||||
} else {
|
} else {
|
||||||
EdgeType::Dotted
|
EdgeType::Dotted
|
||||||
};
|
};
|
||||||
builder.add_edge(ref_id, job_id.clone(), edge_type);
|
|
||||||
|
// Get partition status for edge coloring
|
||||||
|
let partition_status = partition_statuses.get(&partition_ref.str)
|
||||||
|
.unwrap_or(&NodeStatus::Pending);
|
||||||
|
let edge_status = map_node_status_to_edge_status(partition_status);
|
||||||
|
|
||||||
|
builder.add_edge_with_status(ref_id, job_id.clone(), edge_type,
|
||||||
|
edge_status, &mut edges_content, &stylesheet);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process outputs
|
// Process outputs
|
||||||
for output in &config.outputs {
|
for output in &config.outputs {
|
||||||
let ref_id = builder.add_partition_node(&output.str);
|
let ref_id = builder.add_partition_node(&output.str);
|
||||||
builder.add_edge(job_id.clone(), ref_id, EdgeType::Solid);
|
|
||||||
|
// Get job status for edge coloring
|
||||||
|
let job_status = job_statuses.get(&job_id)
|
||||||
|
.unwrap_or(&NodeStatus::Pending);
|
||||||
|
let edge_status = map_node_status_to_edge_status(job_status);
|
||||||
|
|
||||||
|
builder.add_edge_with_status(job_id.clone(), ref_id, EdgeType::Solid,
|
||||||
|
edge_status, &mut edges_content, &stylesheet);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build the diagram with default styling
|
// Build the diagram with edges content
|
||||||
builder.build(&statuses, MermaidStyleSheet::default())
|
builder.build_with_edges(&statuses, stylesheet, edges_content)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
@ -697,4 +779,132 @@ mod tests {
|
||||||
assert_eq!(job_statuses.get("same_job___output2"), Some(&NodeStatus::Completed));
|
assert_eq!(job_statuses.get("same_job___output2"), Some(&NodeStatus::Completed));
|
||||||
assert_eq!(job_statuses.get("same_job"), None, "Should not find job by label alone");
|
assert_eq!(job_statuses.get("same_job"), None, "Should not find job by label alone");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_edge_coloring_with_status() {
|
||||||
|
// Create a simple graph with status
|
||||||
|
let mut task1 = Task::default();
|
||||||
|
task1.job = Some(JobLabel { label: "job1".to_string() });
|
||||||
|
task1.config = Some(JobConfig {
|
||||||
|
inputs: vec![{
|
||||||
|
let mut input = DataDep::default();
|
||||||
|
input.partition_ref = Some(PartitionRef { str: "input/data".to_string() });
|
||||||
|
input.dep_type = 1; // Solid dependency
|
||||||
|
input
|
||||||
|
}],
|
||||||
|
outputs: vec![
|
||||||
|
PartitionRef { str: "intermediate/data".to_string() },
|
||||||
|
],
|
||||||
|
args: vec![],
|
||||||
|
env: HashMap::new(),
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut graph = JobGraph::default();
|
||||||
|
graph.nodes = vec![task1];
|
||||||
|
graph.outputs = vec![
|
||||||
|
PartitionRef { str: "intermediate/data".to_string() },
|
||||||
|
];
|
||||||
|
|
||||||
|
// Create events to set status
|
||||||
|
let mut partition_event = BuildEvent::default();
|
||||||
|
partition_event.event_type = Some(crate::build_event::EventType::PartitionEvent({
|
||||||
|
let mut pe = PartitionEvent::default();
|
||||||
|
pe.partition_ref = Some(PartitionRef { str: "input/data".to_string() });
|
||||||
|
pe.status = 4; // PARTITION_AVAILABLE
|
||||||
|
pe
|
||||||
|
}));
|
||||||
|
|
||||||
|
let mut job_event = BuildEvent::default();
|
||||||
|
job_event.event_type = Some(crate::build_event::EventType::JobEvent({
|
||||||
|
let mut je = JobEvent::default();
|
||||||
|
je.job_label = Some(JobLabel { label: "job1".to_string() });
|
||||||
|
je.target_partitions = vec![PartitionRef { str: "intermediate/data".to_string() }];
|
||||||
|
je.status = 2; // JOB_RUNNING
|
||||||
|
je
|
||||||
|
}));
|
||||||
|
|
||||||
|
let events = vec![partition_event, job_event];
|
||||||
|
let mermaid = generate_mermaid_with_status(&graph, &events);
|
||||||
|
|
||||||
|
// Check that linkStyle statements are present
|
||||||
|
assert!(mermaid.contains("linkStyle"), "Should contain linkStyle statements");
|
||||||
|
assert!(mermaid.contains("#88cc88"), "Should contain available edge color (light green)");
|
||||||
|
assert!(mermaid.contains("#ffaa00"), "Should contain running edge color (orange)");
|
||||||
|
|
||||||
|
// Check basic structure is still intact
|
||||||
|
assert!(mermaid.contains("flowchart TD"));
|
||||||
|
assert!(mermaid.contains("job1___intermediate_data"));
|
||||||
|
assert!(mermaid.contains("ref_input_data"));
|
||||||
|
assert!(mermaid.contains("ref_intermediate_data"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_edge_status_mapping() {
|
||||||
|
assert_eq!(map_node_status_to_edge_status(&NodeStatus::Failed), EdgeStatus::Failed);
|
||||||
|
assert_eq!(map_node_status_to_edge_status(&NodeStatus::Running), EdgeStatus::Running);
|
||||||
|
assert_eq!(map_node_status_to_edge_status(&NodeStatus::Completed), EdgeStatus::Completed);
|
||||||
|
assert_eq!(map_node_status_to_edge_status(&NodeStatus::Available), EdgeStatus::Available);
|
||||||
|
assert_eq!(map_node_status_to_edge_status(&NodeStatus::Pending), EdgeStatus::Pending);
|
||||||
|
assert_eq!(map_node_status_to_edge_status(&NodeStatus::Cancelled), EdgeStatus::Failed);
|
||||||
|
assert_eq!(map_node_status_to_edge_status(&NodeStatus::Skipped), EdgeStatus::Pending);
|
||||||
|
assert_eq!(map_node_status_to_edge_status(&NodeStatus::Delegated), EdgeStatus::Available);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_edge_deduplication() {
|
||||||
|
// Create a graph that could potentially have duplicate edges
|
||||||
|
let mut task1 = Task::default();
|
||||||
|
task1.job = Some(JobLabel { label: "job1".to_string() });
|
||||||
|
task1.config = Some(JobConfig {
|
||||||
|
inputs: vec![{
|
||||||
|
let mut input = DataDep::default();
|
||||||
|
input.partition_ref = Some(PartitionRef { str: "shared_input".to_string() });
|
||||||
|
input.dep_type = 1;
|
||||||
|
input
|
||||||
|
}],
|
||||||
|
outputs: vec![
|
||||||
|
PartitionRef { str: "output1".to_string() },
|
||||||
|
],
|
||||||
|
args: vec![],
|
||||||
|
env: HashMap::new(),
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut task2 = Task::default();
|
||||||
|
task2.job = Some(JobLabel { label: "job2".to_string() });
|
||||||
|
task2.config = Some(JobConfig {
|
||||||
|
inputs: vec![{
|
||||||
|
let mut input = DataDep::default();
|
||||||
|
input.partition_ref = Some(PartitionRef { str: "shared_input".to_string() });
|
||||||
|
input.dep_type = 1;
|
||||||
|
input
|
||||||
|
}],
|
||||||
|
outputs: vec![
|
||||||
|
PartitionRef { str: "output2".to_string() },
|
||||||
|
],
|
||||||
|
args: vec![],
|
||||||
|
env: HashMap::new(),
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut graph = JobGraph::default();
|
||||||
|
graph.nodes = vec![task1, task2];
|
||||||
|
graph.outputs = vec![
|
||||||
|
PartitionRef { str: "output1".to_string() },
|
||||||
|
PartitionRef { str: "output2".to_string() },
|
||||||
|
];
|
||||||
|
|
||||||
|
let mermaid = generate_mermaid_diagram(&graph);
|
||||||
|
|
||||||
|
// Count how many times the shared edge appears
|
||||||
|
let shared_edge_count = mermaid.matches("ref_shared_input --> job").count();
|
||||||
|
|
||||||
|
// Should only appear once per job (2 total), not duplicated
|
||||||
|
assert_eq!(shared_edge_count, 2, "Should have exactly 2 edges from shared_input (one to each job)");
|
||||||
|
|
||||||
|
// Verify no duplicate edges in the output
|
||||||
|
let lines: Vec<&str> = mermaid.lines().collect();
|
||||||
|
let edge_lines: Vec<&str> = lines.iter().filter(|line| line.contains("-->") || line.contains("-.->")).cloned().collect();
|
||||||
|
let unique_edges: std::collections::HashSet<&str> = edge_lines.iter().cloned().collect();
|
||||||
|
|
||||||
|
assert_eq!(edge_lines.len(), unique_edges.len(), "Should have no duplicate edges in output");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Loading…
Reference in a new issue