diff --git a/databuild/build_event_log.rs b/databuild/build_event_log.rs index aad6fc9..9df6f8e 100644 --- a/databuild/build_event_log.rs +++ b/databuild/build_event_log.rs @@ -1,7 +1,10 @@ use crate::data_build_event::Event; use crate::{BuildState, DataBuildEvent, WantDetail}; +use prost::Message; +use rusqlite::{Connection, Result as SqliteResult}; use std::error::Error; use std::sync::{Arc, RwLock}; +use std::time::{SystemTime, UNIX_EPOCH}; trait BELStorage { fn append_event(&mut self, event: Event) -> Result>; @@ -12,6 +15,96 @@ trait BELStorage { ) -> Result, Box>; } +struct SqliteBELStorage { + connection: Connection, +} + +impl SqliteBELStorage { + fn create(database_url: &str) -> Result> { + let connection = Connection::open(database_url)?; + + // Create the events table + connection.execute( + "CREATE TABLE IF NOT EXISTS events ( + event_id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp INTEGER NOT NULL, + event_data BLOB NOT NULL + )", + (), + )?; + + Ok(SqliteBELStorage { connection }) + } +} + +impl BELStorage for SqliteBELStorage { + fn append_event(&mut self, event: Event) -> Result> { + let now = SystemTime::now(); + let duration_since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards"); + let timestamp = duration_since_epoch.as_nanos() as u64; + + // Serialize the event using prost + let dbe = DataBuildEvent { + timestamp, + event_id: 0, // Will be set by the database + event: Some(event), + }; + + let mut buf = Vec::new(); + prost::Message::encode(&dbe, &mut buf)?; + + self.connection.execute( + "INSERT INTO events (timestamp, event_data) VALUES (?1, ?2)", + (×tamp, &buf), + )?; + + let event_id = self.connection.last_insert_rowid() as u64; + Ok(event_id) + } + + fn list_events( + &self, + since_idx: u64, + limit: u64, + ) -> Result, Box> { + let mut stmt = self.connection.prepare( + "SELECT event_id, timestamp, event_data FROM events + WHERE timestamp > ?1 + ORDER BY event_id + LIMIT ?2" + )?; + + let rows = stmt.query_map([since_idx, limit], |row| { + let event_id: u64 = row.get(0)?; + let timestamp: u64 = row.get(1)?; + let event_data: Vec = row.get(2)?; + + // Deserialize the event using prost + let mut dbe = DataBuildEvent::decode(event_data.as_slice()) + .map_err(|e| rusqlite::Error::InvalidColumnType( + 0, + "event_data".to_string(), + rusqlite::types::Type::Blob + ))?; + + // Update the event_id from the database + dbe.event_id = event_id; + dbe.timestamp = timestamp; + + let result: DataBuildEvent = dbe; + + Ok(result) + })?; + + let mut events = Vec::new(); + for row_result in rows { + events.push(row_result?); + } + + Ok(events) + } +} + struct BuildEventLog { storage: B, state: Arc>, @@ -56,7 +149,7 @@ impl BuildEventLog { } mod tests { - use crate::build_event_log::{BELStorage, BuildEventLog}; + use crate::build_event_log::{BELStorage, BuildEventLog, SqliteBELStorage}; use crate::data_build_event::Event; use crate::{DataBuildEvent, PartitionRef, WantCreateEventV1}; use std::error::Error; @@ -146,4 +239,73 @@ mod tests { "want_id not equal", ); } + + #[test] + fn test_sqlite_append_event() { + let storage = SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage"); + let mut log = BuildEventLog::create(storage); + + let want_id = "sqlite_test_1234".to_string(); + + // Initial state - verify storage is empty + let events = log.storage.list_events(0, 100).expect("Failed to list events"); + assert_eq!(events.len(), 0); + + // Verify want doesn't exist in state + { + let state = log.state.read().unwrap(); + assert!(state.wants.get(&want_id).is_none()); + } + + // Append an event + let event_id = log.append_event(Event::WantCreateV1(WantCreateEventV1 { + want_id: want_id.clone(), + root_want_id: "sqlite_root_123".to_string(), + parent_want_id: "sqlite_parent_123".to_string(), + partitions: vec![PartitionRef { + r#ref: "sqlite_partition_1234".to_string(), + }], + data_timestamp: 0, + ttl_seconds: 1, + sla_seconds: 1, + source: None, + comment: None, + })) + .expect("append_event failed"); + + // Verify event was stored + assert!(event_id > 0); + + // Verify event can be retrieved + let events = log.storage.list_events(0, 100).expect("Failed to list events"); + assert_eq!(events.len(), 1); + + let stored_event = &events[0]; + assert_eq!(stored_event.event_id, event_id); + assert!(stored_event.timestamp > 0); + + // Verify the event content + if let Some(Event::WantCreateV1(want_event)) = &stored_event.event { + assert_eq!(want_event.want_id, want_id); + assert_eq!(want_event.root_want_id, "sqlite_root_123"); + assert_eq!(want_event.parent_want_id, "sqlite_parent_123"); + assert_eq!(want_event.partitions.len(), 1); + assert_eq!(want_event.partitions[0].r#ref, "sqlite_partition_1234"); + } else { + panic!("Expected WantCreateV1 event, got {:?}", stored_event.event); + } + + // Verify state was updated + let state = log.state.read().expect("couldn't take read lock"); + assert!(state.wants.get(&want_id).is_some(), "want_id not found in state"); + assert_eq!( + state + .wants + .get(&want_id) + .map(|want| want.want_id.clone()) + .expect("state.wants want_id not found"), + want_id, + "want_id not equal in state", + ); + } }