Implement sqlite BELStorage
This commit is contained in:
parent
2009ac1c12
commit
2edfe90fd4
1 changed files with 163 additions and 1 deletions
|
|
@ -1,7 +1,10 @@
|
||||||
use crate::data_build_event::Event;
|
use crate::data_build_event::Event;
|
||||||
use crate::{BuildState, DataBuildEvent, WantDetail};
|
use crate::{BuildState, DataBuildEvent, WantDetail};
|
||||||
|
use prost::Message;
|
||||||
|
use rusqlite::{Connection, Result as SqliteResult};
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
trait BELStorage {
|
trait BELStorage {
|
||||||
fn append_event(&mut self, event: Event) -> Result<u64, Box<dyn Error>>;
|
fn append_event(&mut self, event: Event) -> Result<u64, Box<dyn Error>>;
|
||||||
|
|
@ -12,6 +15,96 @@ trait BELStorage {
|
||||||
) -> Result<Vec<DataBuildEvent>, Box<dyn Error>>;
|
) -> Result<Vec<DataBuildEvent>, Box<dyn Error>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct SqliteBELStorage {
|
||||||
|
connection: Connection,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SqliteBELStorage {
|
||||||
|
fn create(database_url: &str) -> Result<SqliteBELStorage, Box<dyn Error>> {
|
||||||
|
let connection = Connection::open(database_url)?;
|
||||||
|
|
||||||
|
// Create the events table
|
||||||
|
connection.execute(
|
||||||
|
"CREATE TABLE IF NOT EXISTS events (
|
||||||
|
event_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
timestamp INTEGER NOT NULL,
|
||||||
|
event_data BLOB NOT NULL
|
||||||
|
)",
|
||||||
|
(),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(SqliteBELStorage { connection })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BELStorage for SqliteBELStorage {
|
||||||
|
fn append_event(&mut self, event: Event) -> Result<u64, Box<dyn Error>> {
|
||||||
|
let now = SystemTime::now();
|
||||||
|
let duration_since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
|
||||||
|
let timestamp = duration_since_epoch.as_nanos() as u64;
|
||||||
|
|
||||||
|
// Serialize the event using prost
|
||||||
|
let dbe = DataBuildEvent {
|
||||||
|
timestamp,
|
||||||
|
event_id: 0, // Will be set by the database
|
||||||
|
event: Some(event),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut buf = Vec::new();
|
||||||
|
prost::Message::encode(&dbe, &mut buf)?;
|
||||||
|
|
||||||
|
self.connection.execute(
|
||||||
|
"INSERT INTO events (timestamp, event_data) VALUES (?1, ?2)",
|
||||||
|
(×tamp, &buf),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let event_id = self.connection.last_insert_rowid() as u64;
|
||||||
|
Ok(event_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn list_events(
|
||||||
|
&self,
|
||||||
|
since_idx: u64,
|
||||||
|
limit: u64,
|
||||||
|
) -> Result<Vec<DataBuildEvent>, Box<dyn Error>> {
|
||||||
|
let mut stmt = self.connection.prepare(
|
||||||
|
"SELECT event_id, timestamp, event_data FROM events
|
||||||
|
WHERE timestamp > ?1
|
||||||
|
ORDER BY event_id
|
||||||
|
LIMIT ?2"
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let rows = stmt.query_map([since_idx, limit], |row| {
|
||||||
|
let event_id: u64 = row.get(0)?;
|
||||||
|
let timestamp: u64 = row.get(1)?;
|
||||||
|
let event_data: Vec<u8> = row.get(2)?;
|
||||||
|
|
||||||
|
// Deserialize the event using prost
|
||||||
|
let mut dbe = DataBuildEvent::decode(event_data.as_slice())
|
||||||
|
.map_err(|e| rusqlite::Error::InvalidColumnType(
|
||||||
|
0,
|
||||||
|
"event_data".to_string(),
|
||||||
|
rusqlite::types::Type::Blob
|
||||||
|
))?;
|
||||||
|
|
||||||
|
// Update the event_id from the database
|
||||||
|
dbe.event_id = event_id;
|
||||||
|
dbe.timestamp = timestamp;
|
||||||
|
|
||||||
|
let result: DataBuildEvent = dbe;
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let mut events = Vec::new();
|
||||||
|
for row_result in rows {
|
||||||
|
events.push(row_result?);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(events)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct BuildEventLog<B: BELStorage> {
|
struct BuildEventLog<B: BELStorage> {
|
||||||
storage: B,
|
storage: B,
|
||||||
state: Arc<RwLock<BuildState>>,
|
state: Arc<RwLock<BuildState>>,
|
||||||
|
|
@ -56,7 +149,7 @@ impl<B: BELStorage> BuildEventLog<B> {
|
||||||
}
|
}
|
||||||
|
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::build_event_log::{BELStorage, BuildEventLog};
|
use crate::build_event_log::{BELStorage, BuildEventLog, SqliteBELStorage};
|
||||||
use crate::data_build_event::Event;
|
use crate::data_build_event::Event;
|
||||||
use crate::{DataBuildEvent, PartitionRef, WantCreateEventV1};
|
use crate::{DataBuildEvent, PartitionRef, WantCreateEventV1};
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
|
|
@ -146,4 +239,73 @@ mod tests {
|
||||||
"want_id not equal",
|
"want_id not equal",
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_sqlite_append_event() {
|
||||||
|
let storage = SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage");
|
||||||
|
let mut log = BuildEventLog::create(storage);
|
||||||
|
|
||||||
|
let want_id = "sqlite_test_1234".to_string();
|
||||||
|
|
||||||
|
// Initial state - verify storage is empty
|
||||||
|
let events = log.storage.list_events(0, 100).expect("Failed to list events");
|
||||||
|
assert_eq!(events.len(), 0);
|
||||||
|
|
||||||
|
// Verify want doesn't exist in state
|
||||||
|
{
|
||||||
|
let state = log.state.read().unwrap();
|
||||||
|
assert!(state.wants.get(&want_id).is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Append an event
|
||||||
|
let event_id = log.append_event(Event::WantCreateV1(WantCreateEventV1 {
|
||||||
|
want_id: want_id.clone(),
|
||||||
|
root_want_id: "sqlite_root_123".to_string(),
|
||||||
|
parent_want_id: "sqlite_parent_123".to_string(),
|
||||||
|
partitions: vec![PartitionRef {
|
||||||
|
r#ref: "sqlite_partition_1234".to_string(),
|
||||||
|
}],
|
||||||
|
data_timestamp: 0,
|
||||||
|
ttl_seconds: 1,
|
||||||
|
sla_seconds: 1,
|
||||||
|
source: None,
|
||||||
|
comment: None,
|
||||||
|
}))
|
||||||
|
.expect("append_event failed");
|
||||||
|
|
||||||
|
// Verify event was stored
|
||||||
|
assert!(event_id > 0);
|
||||||
|
|
||||||
|
// Verify event can be retrieved
|
||||||
|
let events = log.storage.list_events(0, 100).expect("Failed to list events");
|
||||||
|
assert_eq!(events.len(), 1);
|
||||||
|
|
||||||
|
let stored_event = &events[0];
|
||||||
|
assert_eq!(stored_event.event_id, event_id);
|
||||||
|
assert!(stored_event.timestamp > 0);
|
||||||
|
|
||||||
|
// Verify the event content
|
||||||
|
if let Some(Event::WantCreateV1(want_event)) = &stored_event.event {
|
||||||
|
assert_eq!(want_event.want_id, want_id);
|
||||||
|
assert_eq!(want_event.root_want_id, "sqlite_root_123");
|
||||||
|
assert_eq!(want_event.parent_want_id, "sqlite_parent_123");
|
||||||
|
assert_eq!(want_event.partitions.len(), 1);
|
||||||
|
assert_eq!(want_event.partitions[0].r#ref, "sqlite_partition_1234");
|
||||||
|
} else {
|
||||||
|
panic!("Expected WantCreateV1 event, got {:?}", stored_event.event);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify state was updated
|
||||||
|
let state = log.state.read().expect("couldn't take read lock");
|
||||||
|
assert!(state.wants.get(&want_id).is_some(), "want_id not found in state");
|
||||||
|
assert_eq!(
|
||||||
|
state
|
||||||
|
.wants
|
||||||
|
.get(&want_id)
|
||||||
|
.map(|want| want.want_id.clone())
|
||||||
|
.expect("state.wants want_id not found"),
|
||||||
|
want_id,
|
||||||
|
"want_id not equal in state",
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue