diff --git a/databuild/BUILD.bazel b/databuild/BUILD.bazel index 7380d91..a808e73 100644 --- a/databuild/BUILD.bazel +++ b/databuild/BUILD.bazel @@ -44,6 +44,7 @@ rust_library( "orchestration/events.rs", "service/handlers.rs", "service/mod.rs", + "service/mermaid_utils.rs", ":generate_databuild_rust", ], edition = "2021", diff --git a/databuild/databuild.proto b/databuild/databuild.proto index 0a3b368..4af7a41 100644 --- a/databuild/databuild.proto +++ b/databuild/databuild.proto @@ -226,6 +226,12 @@ message DelegationEvent { string message = 3; // Optional message } +// Job graph analysis result event (stores the analyzed job graph) +message JobGraphEvent { + JobGraph job_graph = 1; // The analyzed job graph + string message = 2; // Optional message +} + // Individual build event message BuildEvent { // Event metadata @@ -239,6 +245,7 @@ message BuildEvent { PartitionEvent partition_event = 11; JobEvent job_event = 12; DelegationEvent delegation_event = 13; + JobGraphEvent job_graph_event = 14; } } diff --git a/databuild/event_log/sqlite.rs b/databuild/event_log/sqlite.rs index 303e4e0..7cc2255 100644 --- a/databuild/event_log/sqlite.rs +++ b/databuild/event_log/sqlite.rs @@ -126,6 +126,18 @@ impl SqliteBuildEventLog { message, })) } + "job_graph" => { + // Read from job_graph_events columns (indices 4, 5) + let job_graph_json: String = row.get(4)?; + let message: String = row.get(5)?; + + let job_graph: Option = serde_json::from_str(&job_graph_json).ok(); + + Some(crate::build_event::EventType::JobGraphEvent(JobGraphEvent { + job_graph, + message, + })) + } _ => None, }; @@ -155,6 +167,7 @@ impl BuildEventLog for SqliteBuildEventLog { Some(crate::build_event::EventType::PartitionEvent(_)) => "partition", Some(crate::build_event::EventType::JobEvent(_)) => "job", Some(crate::build_event::EventType::DelegationEvent(_)) => "delegation", + Some(crate::build_event::EventType::JobGraphEvent(_)) => "job_graph", None => "unknown", } ], @@ -223,6 +236,22 @@ impl BuildEventLog for SqliteBuildEventLog { ], ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; } + Some(crate::build_event::EventType::JobGraphEvent(jg_event)) => { + let job_graph_json = match serde_json::to_string(&jg_event.job_graph) { + Ok(json) => json, + Err(e) => { + return Err(BuildEventLogError::DatabaseError(format!("Failed to serialize job graph: {}", e))); + } + }; + conn.execute( + "INSERT INTO job_graph_events (event_id, job_graph_json, message) VALUES (?1, ?2, ?3)", + params![ + event.event_id, + job_graph_json, + jg_event.message + ], + ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; + } None => {} } @@ -261,6 +290,12 @@ impl BuildEventLog for SqliteBuildEventLog { FROM build_events be LEFT JOIN delegation_events de ON be.event_id = de.event_id WHERE be.build_request_id = ? AND be.event_type = 'delegation' + UNION ALL + SELECT be.event_id, be.timestamp, be.build_request_id, be.event_type, + jge.job_graph_json, jge.message, NULL, NULL, NULL, NULL, NULL + FROM build_events be + LEFT JOIN job_graph_events jge ON be.event_id = jge.event_id + WHERE be.build_request_id = ? AND be.event_type = 'job_graph' "; let query = if since.is_some() { @@ -273,11 +308,11 @@ impl BuildEventLog for SqliteBuildEventLog { .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let rows = if let Some(since_timestamp) = since { - // We need 5 parameters: build_request_id for each UNION + since_timestamp - stmt.query_map(params![build_request_id, build_request_id, build_request_id, build_request_id, since_timestamp], Self::row_to_build_event_from_join) + // We need 6 parameters: build_request_id for each UNION + since_timestamp + stmt.query_map(params![build_request_id, build_request_id, build_request_id, build_request_id, build_request_id, since_timestamp], Self::row_to_build_event_from_join) } else { - // We need 4 parameters: build_request_id for each UNION - stmt.query_map(params![build_request_id, build_request_id, build_request_id, build_request_id], Self::row_to_build_event_from_join) + // We need 5 parameters: build_request_id for each UNION + stmt.query_map(params![build_request_id, build_request_id, build_request_id, build_request_id, build_request_id], Self::row_to_build_event_from_join) }.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let mut events = Vec::new(); @@ -696,6 +731,15 @@ impl BuildEventLog for SqliteBuildEventLog { [], ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; + conn.execute( + "CREATE TABLE IF NOT EXISTS job_graph_events ( + event_id TEXT PRIMARY KEY REFERENCES build_events(event_id), + job_graph_json TEXT NOT NULL, + message TEXT + )", + [], + ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; + // Create indexes conn.execute( "CREATE INDEX IF NOT EXISTS idx_build_events_build_request ON build_events(build_request_id, timestamp)", diff --git a/databuild/graph/analyze.rs b/databuild/graph/analyze.rs index 3410677..545d7cc 100644 --- a/databuild/graph/analyze.rs +++ b/databuild/graph/analyze.rs @@ -336,6 +336,24 @@ async fn plan( if let Err(e) = event_log.append_event(event).await { error!("Failed to log analysis completion event: {}", e); } + + // Store the job graph as an event in the build event log + let job_graph = JobGraph { + label: Some(GraphLabel { label: "analyzed_graph".to_string() }), + outputs: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(), + nodes: nodes.clone(), + }; + + let job_graph_event = create_build_event( + build_request_id.to_string(), + crate::build_event::EventType::JobGraphEvent(JobGraphEvent { + job_graph: Some(job_graph), + message: format!("Job graph analysis completed with {} tasks", nodes.len()), + }), + ); + if let Err(e) = event_log.append_event(job_graph_event).await { + error!("Failed to log job graph event: {}", e); + } } Ok(JobGraph { diff --git a/databuild/service/handlers.rs b/databuild/service/handlers.rs index ec6bcd6..d9676e7 100644 --- a/databuild/service/handlers.rs +++ b/databuild/service/handlers.rs @@ -1,6 +1,7 @@ use super::*; use crate::event_log::{current_timestamp_nanos, create_build_event}; use crate::orchestration::{BuildOrchestrator, BuildResult}; +use crate::service::mermaid_utils; use axum::{ extract::{Path, State}, http::StatusCode, @@ -214,6 +215,9 @@ pub async fn get_build_status( ) })?; + // Clone events for later use in mermaid generation + let events_for_mermaid = events.clone(); + // Convert events to summary format for response let event_summaries: Vec = events.into_iter().map(|e| { let (job_label, partition_ref, delegated_build_id) = extract_navigation_data(&e.event_type); @@ -232,6 +236,38 @@ pub async fn get_build_status( let final_status_string = BuildGraphService::status_to_string(final_status); info!("Build request {}: Final status={}, partitions={:?}", build_request_id, final_status_string, requested_partitions); + // Extract the job graph from events (find the most recent JobGraphEvent) + let (job_graph_json, mermaid_diagram) = { + let mut job_graph: Option = None; + + // Find the most recent JobGraphEvent in the events + for event in &events_for_mermaid { + if let Some(crate::build_event::EventType::JobGraphEvent(graph_event)) = &event.event_type { + if let Some(ref graph) = graph_event.job_graph { + job_graph = Some(graph.clone()); + } + } + } + + if let Some(ref graph) = job_graph { + // Convert job graph to JSON + let graph_json = match serde_json::to_value(graph) { + Ok(json) => Some(json), + Err(e) => { + error!("Failed to serialize job graph: {}", e); + None + } + }; + + // Generate mermaid diagram with current status + let mermaid = mermaid_utils::generate_mermaid_with_status(graph, &events_for_mermaid); + + (graph_json, Some(mermaid)) + } else { + (None, None) + } + }; + Ok(Json(BuildStatusResponse { build_request_id, status: final_status_string, @@ -239,6 +275,8 @@ pub async fn get_build_status( created_at, updated_at, events: event_summaries, + job_graph: job_graph_json, + mermaid_diagram, })) } @@ -466,6 +504,7 @@ async fn execute_build_request( } }; + // Update status to executing update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestExecuting).await; @@ -593,6 +632,7 @@ fn event_type_to_string(event_type: &Option) -> S Some(crate::build_event::EventType::PartitionEvent(_)) => "partition".to_string(), Some(crate::build_event::EventType::JobEvent(_)) => "job".to_string(), Some(crate::build_event::EventType::DelegationEvent(_)) => "delegation".to_string(), + Some(crate::build_event::EventType::JobGraphEvent(_)) => "job_graph".to_string(), None => "INVALID_EVENT_TYPE".to_string(), // Make this obvious rather than hiding it } } @@ -603,6 +643,7 @@ fn event_to_message(event_type: &Option) -> Strin Some(crate::build_event::EventType::PartitionEvent(event)) => event.message.clone(), Some(crate::build_event::EventType::JobEvent(event)) => event.message.clone(), Some(crate::build_event::EventType::DelegationEvent(event)) => event.message.clone(), + Some(crate::build_event::EventType::JobGraphEvent(event)) => event.message.clone(), None => "INVALID_EVENT_NO_MESSAGE".to_string(), // Make this obvious } } @@ -625,6 +666,10 @@ fn extract_navigation_data(event_type: &Option) - // Build request events don't need navigation links (self-referential) (None, None, None) }, + Some(crate::build_event::EventType::JobGraphEvent(_)) => { + // Job graph events don't need navigation links + (None, None, None) + }, None => (None, None, None), } } diff --git a/databuild/service/mermaid_utils.rs b/databuild/service/mermaid_utils.rs new file mode 100644 index 0000000..df54ba9 --- /dev/null +++ b/databuild/service/mermaid_utils.rs @@ -0,0 +1,219 @@ +use crate::*; +use std::collections::{HashMap, HashSet}; + +/// Represents the status of a job or partition for visualization +#[derive(Debug, Clone, PartialEq)] +pub enum NodeStatus { + Pending, + Running, + Completed, + Failed, + Cancelled, + Skipped, + Available, + Delegated, +} + +impl NodeStatus { + /// Get the CSS class name for this status + fn css_class(&self) -> &'static str { + match self { + NodeStatus::Pending => "pending", + NodeStatus::Running => "running", + NodeStatus::Completed => "completed", + NodeStatus::Failed => "failed", + NodeStatus::Cancelled => "cancelled", + NodeStatus::Skipped => "skipped", + NodeStatus::Available => "available", + NodeStatus::Delegated => "delegated", + } + } +} + +/// Extract current status information from build events +pub fn extract_status_map(events: &[BuildEvent]) -> (HashMap, HashMap) { + let mut job_statuses: HashMap = HashMap::new(); + let mut partition_statuses: HashMap = HashMap::new(); + + // Process events in chronological order to get latest status + let mut sorted_events = events.to_vec(); + sorted_events.sort_by_key(|e| e.timestamp); + + for event in sorted_events { + match &event.event_type { + Some(crate::build_event::EventType::JobEvent(job_event)) => { + if let Some(job_label) = &job_event.job_label { + let status = match job_event.status { + 1 => NodeStatus::Running, // JOB_SCHEDULED + 2 => NodeStatus::Running, // JOB_RUNNING + 3 => NodeStatus::Completed, // JOB_COMPLETED + 4 => NodeStatus::Failed, // JOB_FAILED + 5 => NodeStatus::Cancelled, // JOB_CANCELLED + 6 => NodeStatus::Skipped, // JOB_SKIPPED + _ => NodeStatus::Pending, + }; + job_statuses.insert(job_label.label.clone(), status); + } + } + Some(crate::build_event::EventType::PartitionEvent(partition_event)) => { + if let Some(partition_ref) = &partition_event.partition_ref { + let status = match partition_event.status { + 1 => NodeStatus::Pending, // PARTITION_REQUESTED + 2 => NodeStatus::Pending, // PARTITION_ANALYZED + 3 => NodeStatus::Running, // PARTITION_BUILDING + 4 => NodeStatus::Available, // PARTITION_AVAILABLE + 5 => NodeStatus::Failed, // PARTITION_FAILED + 6 => NodeStatus::Delegated, // PARTITION_DELEGATED + _ => NodeStatus::Pending, + }; + partition_statuses.insert(partition_ref.str.clone(), status); + } + } + _ => {} + } + } + + (job_statuses, partition_statuses) +} + +/// Generate a mermaid diagram for a job graph with current status annotations +pub fn generate_mermaid_with_status( + graph: &JobGraph, + events: &[BuildEvent], +) -> String { + let (job_statuses, partition_statuses) = extract_status_map(events); + + // Start the mermaid flowchart + let mut mermaid = String::from("flowchart TD\n"); + + // Track nodes we've already added to avoid duplicates + let mut added_nodes = HashSet::new(); + let mut added_refs = HashSet::new(); + + // Map to track which refs are outputs (to highlight them) + let mut is_output_ref = HashSet::new(); + for ref_str in &graph.outputs { + is_output_ref.insert(ref_str.str.clone()); + } + + // Add all task nodes and their relationships + for task in &graph.nodes { + let job_label = match &task.job { + Some(label) => &label.label, + None => continue, + }; + + let job_node_id = format!("job_{}", job_label.replace("/", "_").replace(":", "_")); + + // Only add the job node once + if !added_nodes.contains(&job_node_id) { + let outputs_label = match &task.config { + Some(config) => config.outputs.iter() + .map(|o| o.str.clone()) + .collect::>() + .join(", "), + None => String::new(), + }; + + // Get the job status + let status = job_statuses.get(job_label).unwrap_or(&NodeStatus::Pending); + + mermaid.push_str(&format!( + " {}[\"`**{}** {}`\"]:::job_{}\n", + job_node_id, + job_label, + outputs_label, + status.css_class() + )); + added_nodes.insert(job_node_id.clone()); + } + + // Process inputs (dependencies) + if let Some(config) = &task.config { + for input in &config.inputs { + if let Some(partition_ref) = &input.partition_ref { + let ref_node_id = format!("ref_{}", partition_ref.str.replace("/", "_").replace("=", "_")); + + // Add the partition ref node if not already added + if !added_refs.contains(&ref_node_id) { + let status = partition_statuses.get(&partition_ref.str).unwrap_or(&NodeStatus::Pending); + let node_class = if is_output_ref.contains(&partition_ref.str) { + format!("outputPartition_{}", status.css_class()) + } else { + format!("partition_{}", status.css_class()) + }; + + mermaid.push_str(&format!( + " {}[(\"{}\")]:::{}\n", + ref_node_id, + partition_ref.str.replace("/", "_").replace("=", "_"), + node_class + )); + added_refs.insert(ref_node_id.clone()); + } + + // Add the edge from input to job + if input.dep_type == 1 { // MATERIALIZE = 1 + mermaid.push_str(&format!(" {} --> {}\n", ref_node_id, job_node_id)); + } else { + // Dashed line for query dependencies + mermaid.push_str(&format!(" {} -.-> {}\n", ref_node_id, job_node_id)); + } + } + } + + // Process outputs + for output in &config.outputs { + let ref_node_id = format!("ref_{}", output.str.replace("/", "_").replace("=", "_")); + + // Add the partition ref node if not already added + if !added_refs.contains(&ref_node_id) { + let status = partition_statuses.get(&output.str).unwrap_or(&NodeStatus::Pending); + let node_class = if is_output_ref.contains(&output.str) { + format!("outputPartition_{}", status.css_class()) + } else { + format!("partition_{}", status.css_class()) + }; + + mermaid.push_str(&format!( + " {}[(\"Partition: {}\")]:::{}\n", + ref_node_id, + output.str, + node_class + )); + added_refs.insert(ref_node_id.clone()); + } + + // Add the edge from job to output + mermaid.push_str(&format!(" {} --> {}\n", job_node_id, ref_node_id)); + } + } + } + + // Add styling for all status types + mermaid.push_str("\n %% Styling\n"); + + // Job status styles + mermaid.push_str(" classDef job_pending fill:#e0e0e0,stroke:#333,stroke-width:1px;\n"); + mermaid.push_str(" classDef job_running fill:#ffeb3b,stroke:#333,stroke-width:2px;\n"); + mermaid.push_str(" classDef job_completed fill:#4caf50,stroke:#333,stroke-width:2px;\n"); + mermaid.push_str(" classDef job_failed fill:#f44336,stroke:#333,stroke-width:2px;\n"); + mermaid.push_str(" classDef job_cancelled fill:#ff9800,stroke:#333,stroke-width:2px;\n"); + mermaid.push_str(" classDef job_skipped fill:#9e9e9e,stroke:#333,stroke-width:1px;\n"); + + // Partition status styles + mermaid.push_str(" classDef partition_pending fill:#e3f2fd,stroke:#333,stroke-width:1px;\n"); + mermaid.push_str(" classDef partition_running fill:#fff9c4,stroke:#333,stroke-width:2px;\n"); + mermaid.push_str(" classDef partition_available fill:#c8e6c9,stroke:#333,stroke-width:2px;\n"); + mermaid.push_str(" classDef partition_failed fill:#ffcdd2,stroke:#333,stroke-width:2px;\n"); + mermaid.push_str(" classDef partition_delegated fill:#d1c4e9,stroke:#333,stroke-width:2px;\n"); + + // Output partition status styles (highlighted versions) + mermaid.push_str(" classDef outputPartition_pending fill:#bbdefb,stroke:#333,stroke-width:3px;\n"); + mermaid.push_str(" classDef outputPartition_running fill:#fff59d,stroke:#333,stroke-width:3px;\n"); + mermaid.push_str(" classDef outputPartition_available fill:#a5d6a7,stroke:#333,stroke-width:3px;\n"); + mermaid.push_str(" classDef outputPartition_failed fill:#ef9a9a,stroke:#333,stroke-width:3px;\n"); + mermaid.push_str(" classDef outputPartition_delegated fill:#b39ddb,stroke:#333,stroke-width:3px;\n"); + + mermaid +} \ No newline at end of file diff --git a/databuild/service/mod.rs b/databuild/service/mod.rs index 595cead..6954a25 100644 --- a/databuild/service/mod.rs +++ b/databuild/service/mod.rs @@ -17,6 +17,7 @@ use tokio::sync::RwLock; use uuid::Uuid; pub mod handlers; +pub mod mermaid_utils; #[derive(Clone)] pub struct BuildGraphService { @@ -55,6 +56,8 @@ pub struct BuildStatusResponse { pub created_at: i64, pub updated_at: i64, pub events: Vec, + pub job_graph: Option, + pub mermaid_diagram: Option, } #[derive(Debug, Serialize, Deserialize, JsonSchema)]