databuild/databuild/event_log/sqlite_storage.rs
Stuart Axelbrooke f4c52cacc3
Some checks failed
/ setup (push) Has been cancelled
Big bump
2025-08-14 22:55:49 -07:00

154 lines
No EOL
5.6 KiB
Rust

use super::*;
use super::storage::BELStorage;
use async_trait::async_trait;
use rusqlite::{params, Connection};
use std::path::Path;
use std::sync::{Arc, Mutex};
pub struct SqliteBELStorage {
connection: Arc<Mutex<Connection>>,
}
impl SqliteBELStorage {
pub fn new(path: &str) -> Result<Self> {
// Create parent directory if it doesn't exist
if let Some(parent) = Path::new(path).parent() {
std::fs::create_dir_all(parent)
.map_err(|e| BuildEventLogError::ConnectionError(
format!("Failed to create directory {}: {}", parent.display(), e)
))?;
}
let conn = Connection::open(path)
.map_err(|e| BuildEventLogError::ConnectionError(e.to_string()))?;
Ok(Self {
connection: Arc::new(Mutex::new(conn)),
})
}
}
#[async_trait]
impl BELStorage for SqliteBELStorage {
async fn append_event(&self, event: BuildEvent) -> Result<i64> {
let serialized = serde_json::to_string(&event)
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
let conn = self.connection.lock().unwrap();
let _row_id = conn.execute(
"INSERT INTO build_events (event_data) VALUES (?)",
params![serialized],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
Ok(conn.last_insert_rowid())
}
async fn list_events(&self, since_idx: i64, filter: EventFilter) -> Result<EventPage> {
let conn = self.connection.lock().unwrap();
// For simplicity in the initial implementation, we'll do basic filtering
// More sophisticated JSON path filtering can be added later if needed
let mut query = "SELECT rowid, event_data FROM build_events WHERE rowid > ?".to_string();
let mut params_vec = vec![since_idx.to_string()];
// Add build request ID filter if provided
if !filter.build_request_ids.is_empty() {
query.push_str(" AND (");
for (i, build_id) in filter.build_request_ids.iter().enumerate() {
if i > 0 { query.push_str(" OR "); }
query.push_str("JSON_EXTRACT(event_data, '$.build_request_id') = ?");
params_vec.push(build_id.clone());
}
query.push_str(")");
}
// Add ordering and pagination
query.push_str(" ORDER BY rowid ASC LIMIT 1000");
let mut stmt = conn.prepare(&query)
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
// Convert params to rusqlite params
let param_refs: Vec<&dyn rusqlite::ToSql> = params_vec.iter()
.map(|p| p as &dyn rusqlite::ToSql)
.collect();
let rows = stmt.query_map(&param_refs[..], |row| {
let rowid: i64 = row.get(0)?;
let event_data: String = row.get(1)?;
Ok((rowid, event_data))
}).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let mut events = Vec::new();
let mut max_idx = since_idx;
for row in rows {
let (rowid, event_data) = row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let event: BuildEvent = serde_json::from_str(&event_data)
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
// Apply additional filtering in memory for now
let mut include_event = true;
if !filter.partition_refs.is_empty() {
include_event = false;
if let Some(event_type) = &event.event_type {
if let crate::build_event::EventType::PartitionEvent(pe) = event_type {
if let Some(partition_ref) = &pe.partition_ref {
if filter.partition_refs.contains(&partition_ref.str) {
include_event = true;
}
}
}
}
}
if !filter.job_run_ids.is_empty() && include_event {
include_event = false;
if let Some(event_type) = &event.event_type {
if let crate::build_event::EventType::JobEvent(je) = event_type {
if filter.job_run_ids.contains(&je.job_run_id) {
include_event = true;
}
}
}
}
if include_event {
events.push(event);
max_idx = rowid;
}
}
let has_more = events.len() >= 1000; // If we got the max limit, there might be more
Ok(EventPage {
events,
next_idx: max_idx,
has_more,
})
}
async fn initialize(&self) -> Result<()> {
let conn = self.connection.lock().unwrap();
conn.execute(
"CREATE TABLE IF NOT EXISTS build_events (
rowid INTEGER PRIMARY KEY AUTOINCREMENT,
event_data TEXT NOT NULL
)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
// Create index for efficient JSON queries
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_build_request_id ON build_events(
JSON_EXTRACT(event_data, '$.build_request_id')
)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
Ok(())
}
}