Add build graph recording and mermaid generation
Some checks are pending
/ setup (push) Waiting to run

This commit is contained in:
soaxelbrooke 2025-07-16 19:16:16 -07:00
parent 7075f901da
commit ad15ffc3c8
7 changed files with 341 additions and 4 deletions

View file

@ -44,6 +44,7 @@ rust_library(
"orchestration/events.rs",
"service/handlers.rs",
"service/mod.rs",
"service/mermaid_utils.rs",
":generate_databuild_rust",
],
edition = "2021",

View file

@ -226,6 +226,12 @@ message DelegationEvent {
string message = 3; // Optional message
}
// Job graph analysis result event (stores the analyzed job graph)
message JobGraphEvent {
JobGraph job_graph = 1; // The analyzed job graph
string message = 2; // Optional message
}
// Individual build event
message BuildEvent {
// Event metadata
@ -239,6 +245,7 @@ message BuildEvent {
PartitionEvent partition_event = 11;
JobEvent job_event = 12;
DelegationEvent delegation_event = 13;
JobGraphEvent job_graph_event = 14;
}
}

View file

@ -126,6 +126,18 @@ impl SqliteBuildEventLog {
message,
}))
}
"job_graph" => {
// Read from job_graph_events columns (indices 4, 5)
let job_graph_json: String = row.get(4)?;
let message: String = row.get(5)?;
let job_graph: Option<JobGraph> = serde_json::from_str(&job_graph_json).ok();
Some(crate::build_event::EventType::JobGraphEvent(JobGraphEvent {
job_graph,
message,
}))
}
_ => None,
};
@ -155,6 +167,7 @@ impl BuildEventLog for SqliteBuildEventLog {
Some(crate::build_event::EventType::PartitionEvent(_)) => "partition",
Some(crate::build_event::EventType::JobEvent(_)) => "job",
Some(crate::build_event::EventType::DelegationEvent(_)) => "delegation",
Some(crate::build_event::EventType::JobGraphEvent(_)) => "job_graph",
None => "unknown",
}
],
@ -223,6 +236,22 @@ impl BuildEventLog for SqliteBuildEventLog {
],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
}
Some(crate::build_event::EventType::JobGraphEvent(jg_event)) => {
let job_graph_json = match serde_json::to_string(&jg_event.job_graph) {
Ok(json) => json,
Err(e) => {
return Err(BuildEventLogError::DatabaseError(format!("Failed to serialize job graph: {}", e)));
}
};
conn.execute(
"INSERT INTO job_graph_events (event_id, job_graph_json, message) VALUES (?1, ?2, ?3)",
params![
event.event_id,
job_graph_json,
jg_event.message
],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
}
None => {}
}
@ -261,6 +290,12 @@ impl BuildEventLog for SqliteBuildEventLog {
FROM build_events be
LEFT JOIN delegation_events de ON be.event_id = de.event_id
WHERE be.build_request_id = ? AND be.event_type = 'delegation'
UNION ALL
SELECT be.event_id, be.timestamp, be.build_request_id, be.event_type,
jge.job_graph_json, jge.message, NULL, NULL, NULL, NULL, NULL
FROM build_events be
LEFT JOIN job_graph_events jge ON be.event_id = jge.event_id
WHERE be.build_request_id = ? AND be.event_type = 'job_graph'
";
let query = if since.is_some() {
@ -273,11 +308,11 @@ impl BuildEventLog for SqliteBuildEventLog {
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let rows = if let Some(since_timestamp) = since {
// We need 5 parameters: build_request_id for each UNION + since_timestamp
stmt.query_map(params![build_request_id, build_request_id, build_request_id, build_request_id, since_timestamp], Self::row_to_build_event_from_join)
// We need 6 parameters: build_request_id for each UNION + since_timestamp
stmt.query_map(params![build_request_id, build_request_id, build_request_id, build_request_id, build_request_id, since_timestamp], Self::row_to_build_event_from_join)
} else {
// We need 4 parameters: build_request_id for each UNION
stmt.query_map(params![build_request_id, build_request_id, build_request_id, build_request_id], Self::row_to_build_event_from_join)
// We need 5 parameters: build_request_id for each UNION
stmt.query_map(params![build_request_id, build_request_id, build_request_id, build_request_id, build_request_id], Self::row_to_build_event_from_join)
}.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let mut events = Vec::new();
@ -696,6 +731,15 @@ impl BuildEventLog for SqliteBuildEventLog {
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute(
"CREATE TABLE IF NOT EXISTS job_graph_events (
event_id TEXT PRIMARY KEY REFERENCES build_events(event_id),
job_graph_json TEXT NOT NULL,
message TEXT
)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
// Create indexes
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_build_events_build_request ON build_events(build_request_id, timestamp)",

View file

@ -336,6 +336,24 @@ async fn plan(
if let Err(e) = event_log.append_event(event).await {
error!("Failed to log analysis completion event: {}", e);
}
// Store the job graph as an event in the build event log
let job_graph = JobGraph {
label: Some(GraphLabel { label: "analyzed_graph".to_string() }),
outputs: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(),
nodes: nodes.clone(),
};
let job_graph_event = create_build_event(
build_request_id.to_string(),
crate::build_event::EventType::JobGraphEvent(JobGraphEvent {
job_graph: Some(job_graph),
message: format!("Job graph analysis completed with {} tasks", nodes.len()),
}),
);
if let Err(e) = event_log.append_event(job_graph_event).await {
error!("Failed to log job graph event: {}", e);
}
}
Ok(JobGraph {

View file

@ -1,6 +1,7 @@
use super::*;
use crate::event_log::{current_timestamp_nanos, create_build_event};
use crate::orchestration::{BuildOrchestrator, BuildResult};
use crate::service::mermaid_utils;
use axum::{
extract::{Path, State},
http::StatusCode,
@ -214,6 +215,9 @@ pub async fn get_build_status(
)
})?;
// Clone events for later use in mermaid generation
let events_for_mermaid = events.clone();
// Convert events to summary format for response
let event_summaries: Vec<BuildEventSummary> = events.into_iter().map(|e| {
let (job_label, partition_ref, delegated_build_id) = extract_navigation_data(&e.event_type);
@ -232,6 +236,38 @@ pub async fn get_build_status(
let final_status_string = BuildGraphService::status_to_string(final_status);
info!("Build request {}: Final status={}, partitions={:?}", build_request_id, final_status_string, requested_partitions);
// Extract the job graph from events (find the most recent JobGraphEvent)
let (job_graph_json, mermaid_diagram) = {
let mut job_graph: Option<JobGraph> = None;
// Find the most recent JobGraphEvent in the events
for event in &events_for_mermaid {
if let Some(crate::build_event::EventType::JobGraphEvent(graph_event)) = &event.event_type {
if let Some(ref graph) = graph_event.job_graph {
job_graph = Some(graph.clone());
}
}
}
if let Some(ref graph) = job_graph {
// Convert job graph to JSON
let graph_json = match serde_json::to_value(graph) {
Ok(json) => Some(json),
Err(e) => {
error!("Failed to serialize job graph: {}", e);
None
}
};
// Generate mermaid diagram with current status
let mermaid = mermaid_utils::generate_mermaid_with_status(graph, &events_for_mermaid);
(graph_json, Some(mermaid))
} else {
(None, None)
}
};
Ok(Json(BuildStatusResponse {
build_request_id,
status: final_status_string,
@ -239,6 +275,8 @@ pub async fn get_build_status(
created_at,
updated_at,
events: event_summaries,
job_graph: job_graph_json,
mermaid_diagram,
}))
}
@ -466,6 +504,7 @@ async fn execute_build_request(
}
};
// Update status to executing
update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestExecuting).await;
@ -593,6 +632,7 @@ fn event_type_to_string(event_type: &Option<crate::build_event::EventType>) -> S
Some(crate::build_event::EventType::PartitionEvent(_)) => "partition".to_string(),
Some(crate::build_event::EventType::JobEvent(_)) => "job".to_string(),
Some(crate::build_event::EventType::DelegationEvent(_)) => "delegation".to_string(),
Some(crate::build_event::EventType::JobGraphEvent(_)) => "job_graph".to_string(),
None => "INVALID_EVENT_TYPE".to_string(), // Make this obvious rather than hiding it
}
}
@ -603,6 +643,7 @@ fn event_to_message(event_type: &Option<crate::build_event::EventType>) -> Strin
Some(crate::build_event::EventType::PartitionEvent(event)) => event.message.clone(),
Some(crate::build_event::EventType::JobEvent(event)) => event.message.clone(),
Some(crate::build_event::EventType::DelegationEvent(event)) => event.message.clone(),
Some(crate::build_event::EventType::JobGraphEvent(event)) => event.message.clone(),
None => "INVALID_EVENT_NO_MESSAGE".to_string(), // Make this obvious
}
}
@ -625,6 +666,10 @@ fn extract_navigation_data(event_type: &Option<crate::build_event::EventType>) -
// Build request events don't need navigation links (self-referential)
(None, None, None)
},
Some(crate::build_event::EventType::JobGraphEvent(_)) => {
// Job graph events don't need navigation links
(None, None, None)
},
None => (None, None, None),
}
}

View file

@ -0,0 +1,219 @@
use crate::*;
use std::collections::{HashMap, HashSet};
/// Represents the status of a job or partition for visualization
#[derive(Debug, Clone, PartialEq)]
pub enum NodeStatus {
Pending,
Running,
Completed,
Failed,
Cancelled,
Skipped,
Available,
Delegated,
}
impl NodeStatus {
/// Get the CSS class name for this status
fn css_class(&self) -> &'static str {
match self {
NodeStatus::Pending => "pending",
NodeStatus::Running => "running",
NodeStatus::Completed => "completed",
NodeStatus::Failed => "failed",
NodeStatus::Cancelled => "cancelled",
NodeStatus::Skipped => "skipped",
NodeStatus::Available => "available",
NodeStatus::Delegated => "delegated",
}
}
}
/// Extract current status information from build events
pub fn extract_status_map(events: &[BuildEvent]) -> (HashMap<String, NodeStatus>, HashMap<String, NodeStatus>) {
let mut job_statuses: HashMap<String, NodeStatus> = HashMap::new();
let mut partition_statuses: HashMap<String, NodeStatus> = HashMap::new();
// Process events in chronological order to get latest status
let mut sorted_events = events.to_vec();
sorted_events.sort_by_key(|e| e.timestamp);
for event in sorted_events {
match &event.event_type {
Some(crate::build_event::EventType::JobEvent(job_event)) => {
if let Some(job_label) = &job_event.job_label {
let status = match job_event.status {
1 => NodeStatus::Running, // JOB_SCHEDULED
2 => NodeStatus::Running, // JOB_RUNNING
3 => NodeStatus::Completed, // JOB_COMPLETED
4 => NodeStatus::Failed, // JOB_FAILED
5 => NodeStatus::Cancelled, // JOB_CANCELLED
6 => NodeStatus::Skipped, // JOB_SKIPPED
_ => NodeStatus::Pending,
};
job_statuses.insert(job_label.label.clone(), status);
}
}
Some(crate::build_event::EventType::PartitionEvent(partition_event)) => {
if let Some(partition_ref) = &partition_event.partition_ref {
let status = match partition_event.status {
1 => NodeStatus::Pending, // PARTITION_REQUESTED
2 => NodeStatus::Pending, // PARTITION_ANALYZED
3 => NodeStatus::Running, // PARTITION_BUILDING
4 => NodeStatus::Available, // PARTITION_AVAILABLE
5 => NodeStatus::Failed, // PARTITION_FAILED
6 => NodeStatus::Delegated, // PARTITION_DELEGATED
_ => NodeStatus::Pending,
};
partition_statuses.insert(partition_ref.str.clone(), status);
}
}
_ => {}
}
}
(job_statuses, partition_statuses)
}
/// Generate a mermaid diagram for a job graph with current status annotations
pub fn generate_mermaid_with_status(
graph: &JobGraph,
events: &[BuildEvent],
) -> String {
let (job_statuses, partition_statuses) = extract_status_map(events);
// Start the mermaid flowchart
let mut mermaid = String::from("flowchart TD\n");
// Track nodes we've already added to avoid duplicates
let mut added_nodes = HashSet::new();
let mut added_refs = HashSet::new();
// Map to track which refs are outputs (to highlight them)
let mut is_output_ref = HashSet::new();
for ref_str in &graph.outputs {
is_output_ref.insert(ref_str.str.clone());
}
// Add all task nodes and their relationships
for task in &graph.nodes {
let job_label = match &task.job {
Some(label) => &label.label,
None => continue,
};
let job_node_id = format!("job_{}", job_label.replace("/", "_").replace(":", "_"));
// Only add the job node once
if !added_nodes.contains(&job_node_id) {
let outputs_label = match &task.config {
Some(config) => config.outputs.iter()
.map(|o| o.str.clone())
.collect::<Vec<_>>()
.join(", "),
None => String::new(),
};
// Get the job status
let status = job_statuses.get(job_label).unwrap_or(&NodeStatus::Pending);
mermaid.push_str(&format!(
" {}[\"`**{}** {}`\"]:::job_{}\n",
job_node_id,
job_label,
outputs_label,
status.css_class()
));
added_nodes.insert(job_node_id.clone());
}
// Process inputs (dependencies)
if let Some(config) = &task.config {
for input in &config.inputs {
if let Some(partition_ref) = &input.partition_ref {
let ref_node_id = format!("ref_{}", partition_ref.str.replace("/", "_").replace("=", "_"));
// Add the partition ref node if not already added
if !added_refs.contains(&ref_node_id) {
let status = partition_statuses.get(&partition_ref.str).unwrap_or(&NodeStatus::Pending);
let node_class = if is_output_ref.contains(&partition_ref.str) {
format!("outputPartition_{}", status.css_class())
} else {
format!("partition_{}", status.css_class())
};
mermaid.push_str(&format!(
" {}[(\"{}\")]:::{}\n",
ref_node_id,
partition_ref.str.replace("/", "_").replace("=", "_"),
node_class
));
added_refs.insert(ref_node_id.clone());
}
// Add the edge from input to job
if input.dep_type == 1 { // MATERIALIZE = 1
mermaid.push_str(&format!(" {} --> {}\n", ref_node_id, job_node_id));
} else {
// Dashed line for query dependencies
mermaid.push_str(&format!(" {} -.-> {}\n", ref_node_id, job_node_id));
}
}
}
// Process outputs
for output in &config.outputs {
let ref_node_id = format!("ref_{}", output.str.replace("/", "_").replace("=", "_"));
// Add the partition ref node if not already added
if !added_refs.contains(&ref_node_id) {
let status = partition_statuses.get(&output.str).unwrap_or(&NodeStatus::Pending);
let node_class = if is_output_ref.contains(&output.str) {
format!("outputPartition_{}", status.css_class())
} else {
format!("partition_{}", status.css_class())
};
mermaid.push_str(&format!(
" {}[(\"Partition: {}\")]:::{}\n",
ref_node_id,
output.str,
node_class
));
added_refs.insert(ref_node_id.clone());
}
// Add the edge from job to output
mermaid.push_str(&format!(" {} --> {}\n", job_node_id, ref_node_id));
}
}
}
// Add styling for all status types
mermaid.push_str("\n %% Styling\n");
// Job status styles
mermaid.push_str(" classDef job_pending fill:#e0e0e0,stroke:#333,stroke-width:1px;\n");
mermaid.push_str(" classDef job_running fill:#ffeb3b,stroke:#333,stroke-width:2px;\n");
mermaid.push_str(" classDef job_completed fill:#4caf50,stroke:#333,stroke-width:2px;\n");
mermaid.push_str(" classDef job_failed fill:#f44336,stroke:#333,stroke-width:2px;\n");
mermaid.push_str(" classDef job_cancelled fill:#ff9800,stroke:#333,stroke-width:2px;\n");
mermaid.push_str(" classDef job_skipped fill:#9e9e9e,stroke:#333,stroke-width:1px;\n");
// Partition status styles
mermaid.push_str(" classDef partition_pending fill:#e3f2fd,stroke:#333,stroke-width:1px;\n");
mermaid.push_str(" classDef partition_running fill:#fff9c4,stroke:#333,stroke-width:2px;\n");
mermaid.push_str(" classDef partition_available fill:#c8e6c9,stroke:#333,stroke-width:2px;\n");
mermaid.push_str(" classDef partition_failed fill:#ffcdd2,stroke:#333,stroke-width:2px;\n");
mermaid.push_str(" classDef partition_delegated fill:#d1c4e9,stroke:#333,stroke-width:2px;\n");
// Output partition status styles (highlighted versions)
mermaid.push_str(" classDef outputPartition_pending fill:#bbdefb,stroke:#333,stroke-width:3px;\n");
mermaid.push_str(" classDef outputPartition_running fill:#fff59d,stroke:#333,stroke-width:3px;\n");
mermaid.push_str(" classDef outputPartition_available fill:#a5d6a7,stroke:#333,stroke-width:3px;\n");
mermaid.push_str(" classDef outputPartition_failed fill:#ef9a9a,stroke:#333,stroke-width:3px;\n");
mermaid.push_str(" classDef outputPartition_delegated fill:#b39ddb,stroke:#333,stroke-width:3px;\n");
mermaid
}

View file

@ -17,6 +17,7 @@ use tokio::sync::RwLock;
use uuid::Uuid;
pub mod handlers;
pub mod mermaid_utils;
#[derive(Clone)]
pub struct BuildGraphService {
@ -55,6 +56,8 @@ pub struct BuildStatusResponse {
pub created_at: i64,
pub updated_at: i64,
pub events: Vec<BuildEventSummary>,
pub job_graph: Option<serde_json::Value>,
pub mermaid_diagram: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema)]