use super::*; use async_trait::async_trait; use rusqlite::{params, Connection, Row}; use serde_json; use std::sync::{Arc, Mutex}; // Helper functions to convert integer values back to enum values fn int_to_build_request_status(i: i32) -> BuildRequestStatus { match i { 0 => BuildRequestStatus::BuildRequestUnknown, 1 => BuildRequestStatus::BuildRequestReceived, 2 => BuildRequestStatus::BuildRequestPlanning, 3 => BuildRequestStatus::BuildRequestExecuting, 4 => BuildRequestStatus::BuildRequestCompleted, 5 => BuildRequestStatus::BuildRequestFailed, 6 => BuildRequestStatus::BuildRequestCancelled, _ => BuildRequestStatus::BuildRequestUnknown, } } fn int_to_partition_status(i: i32) -> PartitionStatus { match i { 0 => PartitionStatus::PartitionUnknown, 1 => PartitionStatus::PartitionRequested, 2 => PartitionStatus::PartitionScheduled, 3 => PartitionStatus::PartitionBuilding, 4 => PartitionStatus::PartitionAvailable, 5 => PartitionStatus::PartitionFailed, 6 => PartitionStatus::PartitionDelegated, _ => PartitionStatus::PartitionUnknown, } } pub struct SqliteBuildEventLog { connection: Arc>, } impl SqliteBuildEventLog { pub async fn new(path: &str) -> Result { let conn = Connection::open(path) .map_err(|e| BuildEventLogError::ConnectionError(e.to_string()))?; Ok(Self { connection: Arc::new(Mutex::new(conn)), }) } fn row_to_build_event(row: &Row) -> rusqlite::Result { let event_id: String = row.get(0)?; let timestamp: i64 = row.get(1)?; let build_request_id: String = row.get(2)?; let event_type_name: String = row.get(3)?; // Determine event type and fetch additional data let event_type = match event_type_name.as_str() { "build_request" => { // For now, create a basic event - in a real implementation we'd join with other tables Some(crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent { status: 0, // BUILD_REQUEST_UNKNOWN requested_partitions: vec![], message: String::new(), })) } "partition" => { Some(crate::build_event::EventType::PartitionEvent(PartitionEvent { partition_ref: Some(PartitionRef { str: String::new() }), status: 0, // PARTITION_UNKNOWN message: String::new(), job_run_id: String::new(), })) } "job" => { Some(crate::build_event::EventType::JobEvent(JobEvent { job_run_id: String::new(), job_label: Some(JobLabel { label: String::new() }), target_partitions: vec![], status: 0, // JOB_UNKNOWN message: String::new(), config: None, manifests: vec![], })) } "delegation" => { Some(crate::build_event::EventType::DelegationEvent(DelegationEvent { partition_ref: Some(PartitionRef { str: String::new() }), delegated_to_build_request_id: String::new(), message: String::new(), })) } _ => None, }; Ok(BuildEvent { event_id, timestamp, build_request_id, event_type, }) } } #[async_trait] impl BuildEventLog for SqliteBuildEventLog { async fn append_event(&self, event: BuildEvent) -> Result<()> { let conn = self.connection.lock().unwrap(); // First insert into build_events table conn.execute( "INSERT INTO build_events (event_id, timestamp, build_request_id, event_type) VALUES (?1, ?2, ?3, ?4)", params![ event.event_id, event.timestamp, event.build_request_id, match &event.event_type { Some(crate::build_event::EventType::BuildRequestEvent(_)) => "build_request", Some(crate::build_event::EventType::PartitionEvent(_)) => "partition", Some(crate::build_event::EventType::JobEvent(_)) => "job", Some(crate::build_event::EventType::DelegationEvent(_)) => "delegation", None => "unknown", } ], ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; // Insert into specific event type table match &event.event_type { Some(crate::build_event::EventType::BuildRequestEvent(br_event)) => { let partitions_json = serde_json::to_string(&br_event.requested_partitions) .map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?; conn.execute( "INSERT INTO build_request_events (event_id, status, requested_partitions, message) VALUES (?1, ?2, ?3, ?4)", params![ event.event_id, br_event.status.to_string(), partitions_json, br_event.message ], ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; } Some(crate::build_event::EventType::PartitionEvent(p_event)) => { conn.execute( "INSERT INTO partition_events (event_id, partition_ref, status, message, job_run_id) VALUES (?1, ?2, ?3, ?4, ?5)", params![ event.event_id, p_event.partition_ref.as_ref().map(|r| &r.str).unwrap_or(&String::new()), p_event.status.to_string(), p_event.message, if p_event.job_run_id.is_empty() { None } else { Some(&p_event.job_run_id) } ], ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; } Some(crate::build_event::EventType::JobEvent(j_event)) => { let partitions_json = serde_json::to_string(&j_event.target_partitions) .map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?; let config_json = j_event.config.as_ref() .map(|c| serde_json::to_string(c)) .transpose() .map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?; let manifests_json = serde_json::to_string(&j_event.manifests) .map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?; conn.execute( "INSERT INTO job_events (event_id, job_run_id, job_label, target_partitions, status, message, config_json, manifests_json) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", params![ event.event_id, j_event.job_run_id, j_event.job_label.as_ref().map(|l| &l.label).unwrap_or(&String::new()), partitions_json, j_event.status.to_string(), j_event.message, config_json, manifests_json ], ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; } Some(crate::build_event::EventType::DelegationEvent(d_event)) => { conn.execute( "INSERT INTO delegation_events (event_id, partition_ref, delegated_to_build_request_id, message) VALUES (?1, ?2, ?3, ?4)", params![ event.event_id, d_event.partition_ref.as_ref().map(|r| &r.str).unwrap_or(&String::new()), d_event.delegated_to_build_request_id, d_event.message ], ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; } None => {} } Ok(()) } async fn get_build_request_events( &self, build_request_id: &str, since: Option ) -> Result> { let conn = self.connection.lock().unwrap(); let mut stmt = if let Some(since_timestamp) = since { let mut stmt = conn.prepare("SELECT event_id, timestamp, build_request_id, event_type FROM build_events WHERE build_request_id = ?1 AND timestamp > ?2 ORDER BY timestamp") .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let rows = stmt.query_map(params![build_request_id, since_timestamp], Self::row_to_build_event) .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let mut events = Vec::new(); for row in rows { events.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?); } return Ok(events); } else { conn.prepare("SELECT event_id, timestamp, build_request_id, event_type FROM build_events WHERE build_request_id = ?1 ORDER BY timestamp") .map_err(|e| BuildEventLogError::QueryError(e.to_string()))? }; let rows = stmt.query_map([build_request_id], Self::row_to_build_event) .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let mut events = Vec::new(); for row in rows { events.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?); } Ok(events) } async fn get_partition_events( &self, partition_ref: &str, since: Option ) -> Result> { let conn = self.connection.lock().unwrap(); let mut stmt = if let Some(since_timestamp) = since { let mut stmt = conn.prepare("SELECT be.event_id, be.timestamp, be.build_request_id, be.event_type FROM build_events be JOIN partition_events pe ON be.event_id = pe.event_id WHERE pe.partition_ref = ?1 AND be.timestamp > ?2 ORDER BY be.timestamp") .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let rows = stmt.query_map(params![partition_ref, since_timestamp], Self::row_to_build_event) .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let mut events = Vec::new(); for row in rows { events.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?); } return Ok(events); } else { conn.prepare("SELECT be.event_id, be.timestamp, be.build_request_id, be.event_type FROM build_events be JOIN partition_events pe ON be.event_id = pe.event_id WHERE pe.partition_ref = ?1 ORDER BY be.timestamp") .map_err(|e| BuildEventLogError::QueryError(e.to_string()))? }; let rows = stmt.query_map([partition_ref], Self::row_to_build_event) .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let mut events = Vec::new(); for row in rows { events.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?); } Ok(events) } async fn get_job_run_events( &self, job_run_id: &str ) -> Result> { let conn = self.connection.lock().unwrap(); let query = "SELECT be.event_id, be.timestamp, be.build_request_id, be.event_type FROM build_events be JOIN job_events je ON be.event_id = je.event_id WHERE je.job_run_id = ?1 ORDER BY be.timestamp"; let mut stmt = conn.prepare(query) .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let rows = stmt.query_map([job_run_id], Self::row_to_build_event) .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let mut events = Vec::new(); for row in rows { events.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?); } Ok(events) } async fn get_events_in_range( &self, start_time: i64, end_time: i64 ) -> Result> { let conn = self.connection.lock().unwrap(); let query = "SELECT event_id, timestamp, build_request_id, event_type FROM build_events WHERE timestamp >= ?1 AND timestamp <= ?2 ORDER BY timestamp"; let mut stmt = conn.prepare(query) .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let rows = stmt.query_map([start_time, end_time], Self::row_to_build_event) .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let mut events = Vec::new(); for row in rows { events.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?); } Ok(events) } async fn execute_query(&self, query: &str) -> Result { let conn = self.connection.lock().unwrap(); let mut stmt = conn.prepare(query) .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let column_count = stmt.column_count(); let columns: Vec = (0..column_count) .map(|i| stmt.column_name(i).unwrap_or("unknown").to_string()) .collect(); let rows = stmt.query_map([], |row| { let mut row_data = Vec::new(); for i in 0..column_count { let value: String = row.get(i).unwrap_or_default(); row_data.push(value); } Ok(row_data) }).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let mut result_rows = Vec::new(); for row in rows { result_rows.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?); } Ok(QueryResult { columns, rows: result_rows, }) } async fn get_latest_partition_status( &self, partition_ref: &str ) -> Result> { let conn = self.connection.lock().unwrap(); let query = "SELECT pe.status, be.timestamp FROM partition_events pe JOIN build_events be ON pe.event_id = be.event_id WHERE pe.partition_ref = ?1 ORDER BY be.timestamp DESC LIMIT 1"; let mut stmt = conn.prepare(query) .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let result = stmt.query_row([partition_ref], |row| { let status_str: String = row.get(0)?; let timestamp: i64 = row.get(1)?; let status = status_str.parse::().unwrap_or(0); Ok((status, timestamp)) }); match result { Ok((status, timestamp)) => { let partition_status = PartitionStatus::try_from(status).unwrap_or(PartitionStatus::PartitionUnknown); Ok(Some((partition_status, timestamp))) } Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), Err(e) => Err(BuildEventLogError::QueryError(e.to_string())), } } async fn get_active_builds_for_partition( &self, partition_ref: &str ) -> Result> { let conn = self.connection.lock().unwrap(); // Look for build requests that are actively building this partition // A build is considered active if: // 1. It has scheduled/building events for this partition, AND // 2. The build request itself has not completed (status 4=COMPLETED or 5=FAILED) let query = "SELECT DISTINCT be.build_request_id FROM partition_events pe JOIN build_events be ON pe.event_id = be.event_id WHERE pe.partition_ref = ?1 AND pe.status IN ('2', '3') -- PARTITION_SCHEDULED or PARTITION_BUILDING AND be.build_request_id NOT IN ( SELECT DISTINCT be3.build_request_id FROM build_request_events bre JOIN build_events be3 ON bre.event_id = be3.event_id WHERE bre.status IN ('4', '5') -- BUILD_REQUEST_COMPLETED or BUILD_REQUEST_FAILED )"; let mut stmt = conn.prepare(query) .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let rows = stmt.query_map([partition_ref], |row| { let build_request_id: String = row.get(0)?; Ok(build_request_id) }).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let mut build_request_ids = Vec::new(); for row in rows { build_request_ids.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?); } Ok(build_request_ids) } async fn list_build_requests( &self, limit: u32, offset: u32, status_filter: Option, ) -> Result<(Vec, u32)> { let conn = self.connection.lock().unwrap(); // Build query based on status filter let (where_clause, count_where_clause) = match status_filter { Some(_) => (" WHERE bre.status = ?1", " WHERE bre.status = ?1"), None => ("", ""), }; let query = format!( "SELECT DISTINCT be.build_request_id, bre.status, bre.requested_partitions, MIN(be.timestamp) as created_at, MAX(be.timestamp) as updated_at FROM build_events be JOIN build_request_events bre ON be.event_id = bre.event_id{} GROUP BY be.build_request_id ORDER BY created_at DESC LIMIT {} OFFSET {}", where_clause, limit, offset ); let count_query = format!( "SELECT COUNT(DISTINCT be.build_request_id) FROM build_events be JOIN build_request_events bre ON be.event_id = bre.event_id{}", count_where_clause ); // Execute count query first let total_count: u32 = if let Some(status) = status_filter { let status_str = format!("{:?}", status); conn.query_row(&count_query, params![status_str], |row| row.get(0)) .map_err(|e| BuildEventLogError::QueryError(e.to_string()))? } else { conn.query_row(&count_query, [], |row| row.get(0)) .map_err(|e| BuildEventLogError::QueryError(e.to_string()))? }; // Execute main query let mut stmt = conn.prepare(&query) .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let build_row_mapper = |row: &Row| -> rusqlite::Result { let status_str: String = row.get(1)?; let status = status_str.parse::() .map(int_to_build_request_status) .unwrap_or(BuildRequestStatus::BuildRequestUnknown); Ok(BuildRequestSummary { build_request_id: row.get(0)?, status, requested_partitions: serde_json::from_str(&row.get::<_, String>(2)?).unwrap_or_default(), created_at: row.get(3)?, updated_at: row.get(4)?, }) }; let rows = if let Some(status) = status_filter { let status_str = format!("{:?}", status); stmt.query_map(params![status_str], build_row_mapper) } else { stmt.query_map([], build_row_mapper) }.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let mut summaries = Vec::new(); for row in rows { summaries.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?); } Ok((summaries, total_count)) } async fn list_recent_partitions( &self, limit: u32, offset: u32, status_filter: Option, ) -> Result<(Vec, u32)> { let conn = self.connection.lock().unwrap(); // Build query based on status filter let (where_clause, count_where_clause) = match status_filter { Some(_) => (" WHERE pe.status = ?1", " WHERE pe.status = ?1"), None => ("", ""), }; let query = format!( "SELECT pe.partition_ref, pe.status, MAX(be.timestamp) as updated_at, be.build_request_id FROM build_events be JOIN partition_events pe ON be.event_id = pe.event_id{} GROUP BY pe.partition_ref ORDER BY updated_at DESC LIMIT {} OFFSET {}", where_clause, limit, offset ); let count_query = format!( "SELECT COUNT(DISTINCT pe.partition_ref) FROM build_events be JOIN partition_events pe ON be.event_id = pe.event_id{}", count_where_clause ); // Execute count query first let total_count: u32 = if let Some(status) = status_filter { let status_str = format!("{:?}", status); conn.query_row(&count_query, params![status_str], |row| row.get(0)) .map_err(|e| BuildEventLogError::QueryError(e.to_string()))? } else { conn.query_row(&count_query, [], |row| row.get(0)) .map_err(|e| BuildEventLogError::QueryError(e.to_string()))? }; // Execute main query let mut stmt = conn.prepare(&query) .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let row_mapper = |row: &Row| -> rusqlite::Result { let status_str: String = row.get(1)?; let status = status_str.parse::() .map(int_to_partition_status) .unwrap_or(PartitionStatus::PartitionUnknown); Ok(PartitionSummary { partition_ref: row.get(0)?, status, updated_at: row.get(2)?, build_request_id: Some(row.get(3)?), }) }; let rows = if let Some(status) = status_filter { let status_str = format!("{:?}", status); stmt.query_map(params![status_str], row_mapper) } else { stmt.query_map([], row_mapper) }.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; let mut summaries = Vec::new(); for row in rows { summaries.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?); } Ok((summaries, total_count)) } async fn get_activity_summary(&self) -> Result { // First get the simple counts without holding the lock across awaits let (active_builds_count, total_partitions_count) = { let conn = self.connection.lock().unwrap(); // Get active builds count (builds that are not completed, failed, or cancelled) let active_builds_count: u32 = conn.query_row( "SELECT COUNT(DISTINCT be.build_request_id) FROM build_events be JOIN build_request_events bre ON be.event_id = bre.event_id WHERE bre.status IN ('BuildRequestReceived', 'BuildRequestPlanning', 'BuildRequestExecuting')", [], |row| row.get(0) ).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; // Get total partitions count let total_partitions_count: u32 = conn.query_row( "SELECT COUNT(DISTINCT pe.partition_ref) FROM partition_events pe JOIN build_events be ON pe.event_id = be.event_id", [], |row| row.get(0) ).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; (active_builds_count, total_partitions_count) }; // Get recent builds (limit to 5 for summary) let (recent_builds, _) = self.list_build_requests(5, 0, None).await?; // Get recent partitions (limit to 5 for summary) let (recent_partitions, _) = self.list_recent_partitions(5, 0, None).await?; Ok(ActivitySummary { active_builds_count, recent_builds, recent_partitions, total_partitions_count, }) } async fn initialize(&self) -> Result<()> { let conn = self.connection.lock().unwrap(); // Create tables conn.execute( "CREATE TABLE IF NOT EXISTS build_events ( event_id TEXT PRIMARY KEY, timestamp INTEGER NOT NULL, build_request_id TEXT NOT NULL, event_type TEXT NOT NULL )", [], ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; conn.execute( "CREATE TABLE IF NOT EXISTS build_request_events ( event_id TEXT PRIMARY KEY REFERENCES build_events(event_id), status TEXT NOT NULL, requested_partitions TEXT NOT NULL, message TEXT )", [], ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; conn.execute( "CREATE TABLE IF NOT EXISTS partition_events ( event_id TEXT PRIMARY KEY REFERENCES build_events(event_id), partition_ref TEXT NOT NULL, status TEXT NOT NULL, message TEXT, job_run_id TEXT )", [], ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; conn.execute( "CREATE TABLE IF NOT EXISTS job_events ( event_id TEXT PRIMARY KEY REFERENCES build_events(event_id), job_run_id TEXT NOT NULL, job_label TEXT NOT NULL, target_partitions TEXT NOT NULL, status TEXT NOT NULL, message TEXT, config_json TEXT, manifests_json TEXT )", [], ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; conn.execute( "CREATE TABLE IF NOT EXISTS delegation_events ( event_id TEXT PRIMARY KEY REFERENCES build_events(event_id), partition_ref TEXT NOT NULL, delegated_to_build_request_id TEXT NOT NULL, message TEXT )", [], ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; // Create indexes conn.execute( "CREATE INDEX IF NOT EXISTS idx_build_events_build_request ON build_events(build_request_id, timestamp)", [], ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; conn.execute( "CREATE INDEX IF NOT EXISTS idx_build_events_timestamp ON build_events(timestamp)", [], ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; conn.execute( "CREATE INDEX IF NOT EXISTS idx_partition_events_partition ON partition_events(partition_ref)", [], ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; conn.execute( "CREATE INDEX IF NOT EXISTS idx_job_events_job_run ON job_events(job_run_id)", [], ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; Ok(()) } }