databuild/databuild/build_event_log.rs
Stuart Axelbrooke 8176a8261e
Some checks failed
/ setup (push) Has been cancelled
fix up want lineage view
2025-12-01 03:54:29 +08:00

457 lines
15 KiB
Rust

use crate::build_state::BuildState;
use crate::data_build_event::Event;
use crate::util::{DatabuildError, current_timestamp};
use crate::{
CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse,
CreateWantRequest, CreateWantResponse, DataBuildEvent, GetTaintRequest, GetTaintResponse,
GetWantRequest, GetWantResponse, ListJobRunsRequest, ListJobRunsResponse,
ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse,
ListWantsRequest, ListWantsResponse, TaintCreateEventV1, WantCancelEventV1, WantCreateEventV1,
};
use prost::Message;
use rusqlite::Connection;
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
pub trait BELStorage: Send + Sync {
fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError>;
fn list_events(
&self,
since_idx: u64,
limit: u64,
) -> Result<Vec<DataBuildEvent>, DatabuildError>;
fn get_event(&self, event_id: u64) -> Result<Option<DataBuildEvent>, DatabuildError>;
fn latest_event_id(&self) -> Result<u64, DatabuildError>;
}
#[derive(Debug, Clone)]
pub struct MemoryBELStorage {
pub events: Vec<DataBuildEvent>,
}
impl Default for MemoryBELStorage {
fn default() -> Self {
Self::new()
}
}
impl MemoryBELStorage {
pub fn new() -> MemoryBELStorage {
MemoryBELStorage { events: vec![] }
}
}
impl BELStorage for MemoryBELStorage {
fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> {
let timestamp = current_timestamp();
let dbe = DataBuildEvent {
timestamp,
event_id: self.events.len() as u64,
event: Some(event.clone()),
};
self.events.push(dbe);
Ok(self.events.len() as u64)
}
fn list_events(
&self,
since_idx: u64,
limit: u64,
) -> Result<Vec<DataBuildEvent>, DatabuildError> {
Ok(self
.events
.iter()
.cloned()
.filter(|e| e.timestamp > since_idx)
.take(limit as usize)
.collect())
}
fn get_event(&self, event_id: u64) -> Result<Option<DataBuildEvent>, DatabuildError> {
Ok(self.events.iter().find(|e| e.event_id == event_id).cloned())
}
fn latest_event_id(&self) -> Result<u64, DatabuildError> {
Ok(self.events.len().saturating_sub(1) as u64)
}
}
#[derive(Debug, Clone)]
pub struct SqliteBELStorage {
connection: Arc<Mutex<Connection>>,
}
impl SqliteBELStorage {
pub fn create(database_url: &str) -> Result<SqliteBELStorage, DatabuildError> {
let connection = Connection::open(database_url)?;
// Create the events table
connection.execute(
"CREATE TABLE IF NOT EXISTS events (
event_id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp INTEGER NOT NULL,
event_data BLOB NOT NULL
)",
(),
)?;
Ok(SqliteBELStorage {
connection: Arc::new(Mutex::new(connection)),
})
}
}
impl BELStorage for SqliteBELStorage {
fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> {
let now = SystemTime::now();
let duration_since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
let timestamp = duration_since_epoch.as_nanos() as u64;
// Serialize the event using prost
let dbe = DataBuildEvent {
timestamp,
event_id: 0, // Will be set by the database
event: Some(event.clone()),
};
let mut buf = Vec::new();
prost::Message::encode(&dbe, &mut buf)?;
let connection = self
.connection
.lock()
.map_err(|e| format!("Failed to acquire lock: {}", e))?;
connection.execute(
"INSERT INTO events (timestamp, event_data) VALUES (?1, ?2)",
(&timestamp, &buf),
)?;
let event_id = connection.last_insert_rowid() as u64;
Ok(event_id)
}
fn list_events(
&self,
since_idx: u64,
limit: u64,
) -> Result<Vec<DataBuildEvent>, DatabuildError> {
let connection = self
.connection
.lock()
.map_err(|e| format!("Failed to acquire lock: {}", e))?;
let mut stmt = connection.prepare(
"SELECT event_id, timestamp, event_data FROM events
WHERE timestamp > ?1
ORDER BY event_id
LIMIT ?2",
)?;
let rows = stmt.query_map([since_idx, limit], |row| {
let event_id: u64 = row.get(0)?;
let timestamp: u64 = row.get(1)?;
let event_data: Vec<u8> = row.get(2)?;
// Deserialize the event using prost
let mut dbe = DataBuildEvent::decode(event_data.as_slice()).map_err(|_e| {
rusqlite::Error::InvalidColumnType(
0,
"event_data".to_string(),
rusqlite::types::Type::Blob,
)
})?;
// Update the event_id from the database
dbe.event_id = event_id;
dbe.timestamp = timestamp;
let result: DataBuildEvent = dbe;
Ok(result)
})?;
let mut events = Vec::new();
for row_result in rows {
events.push(row_result?);
}
Ok(events)
}
fn get_event(&self, event_id: u64) -> Result<Option<DataBuildEvent>, DatabuildError> {
let connection = self
.connection
.lock()
.map_err(|e| format!("Failed to acquire lock: {}", e))?;
let mut stmt = connection
.prepare("SELECT event_id, timestamp, event_data FROM events WHERE event_id = ?1")?;
let result = stmt.query_row([event_id], |row| {
let event_id: u64 = row.get(0)?;
let timestamp: u64 = row.get(1)?;
let event_data: Vec<u8> = row.get(2)?;
// Deserialize the event using prost
let mut dbe = DataBuildEvent::decode(event_data.as_slice()).map_err(|_e| {
rusqlite::Error::InvalidColumnType(
0,
"event_data".to_string(),
rusqlite::types::Type::Blob,
)
})?;
// Update the event_id from the database
dbe.event_id = event_id;
dbe.timestamp = timestamp;
Ok(dbe)
});
match result {
Ok(event) => Ok(Some(event)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
fn latest_event_id(&self) -> Result<u64, DatabuildError> {
let connection = self
.connection
.lock()
.map_err(|e| format!("Failed to acquire lock: {}", e))?;
let result: Result<u64, rusqlite::Error> =
connection.query_row("SELECT MAX(event_id) FROM events", [], |row| row.get(0));
match result {
Ok(id) => Ok(id),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0),
Err(e) => Err(e.into()),
}
}
}
#[derive(Debug, Default)]
pub struct BuildEventLog<S: BELStorage + Debug> {
pub storage: S,
pub state: BuildState,
/// Optional event broadcaster for HTTP server mirroring
#[cfg_attr(not(feature = "server"), allow(dead_code))]
pub event_broadcaster: Option<tokio::sync::broadcast::Sender<Event>>,
}
impl<S: BELStorage + Debug> BuildEventLog<S> {
pub fn new(storage: S, state: BuildState) -> BuildEventLog<S> {
BuildEventLog {
storage,
state,
event_broadcaster: None,
}
}
pub fn with_broadcaster(mut self, broadcaster: tokio::sync::broadcast::Sender<Event>) -> Self {
self.event_broadcaster = Some(broadcaster);
self
}
pub fn append_event(&mut self, event: &Event) -> Result<u64, DatabuildError> {
let events = self.state.handle_event(&event);
let idx = self.storage.append_event(event)?;
// Broadcast event to HTTP server (if configured)
if let Some(ref tx) = self.event_broadcaster {
let _ = tx.send(event.clone());
}
// Recursion here might be dangerous, but in theory the event propagation always terminates
for event in events {
self.append_event(&event)?;
}
Ok(idx)
}
pub fn append_event_no_recurse(&mut self, event: &Event) -> Result<u64, DatabuildError> {
self.state.handle_event(&event);
let idx = self.storage.append_event(event)?;
// Recursion here might be dangerous, but in theory the event propagation always terminates
Ok(idx)
}
// API methods
pub fn api_handle_list_wants(&self, req: ListWantsRequest) -> ListWantsResponse {
self.state.list_wants(&req)
}
pub fn api_handle_list_taints(&self, req: ListTaintsRequest) -> ListTaintsResponse {
self.state.list_taints(&req)
}
pub fn api_handle_list_partitions(&self, req: ListPartitionsRequest) -> ListPartitionsResponse {
self.state.list_partitions(&req)
}
pub fn api_handle_list_job_runs(&self, req: ListJobRunsRequest) -> ListJobRunsResponse {
self.state.list_job_runs(&req)
}
pub fn api_handle_want_create(
&mut self,
req: CreateWantRequest,
) -> Result<CreateWantResponse, DatabuildError> {
let ev: WantCreateEventV1 = req.into();
self.append_event(&ev.clone().into())?;
Ok(self.state.get_want(&ev.want_id).into())
}
pub fn api_handle_want_get(&self, req: GetWantRequest) -> GetWantResponse {
self.state.get_want(&req.want_id).into()
}
pub fn api_handle_want_cancel(
&mut self,
req: CancelWantRequest,
) -> Result<CancelWantResponse, DatabuildError> {
let ev: WantCancelEventV1 = req.into();
self.append_event(&ev.clone().into())?;
Ok(self.state.get_want(&ev.want_id).into())
}
pub fn api_handle_taint_create(
&mut self,
req: CreateTaintRequest,
) -> Result<CreateTaintResponse, DatabuildError> {
// TODO Need to do this hierarchically? A taint will impact downstream partitions also
todo!();
let ev: TaintCreateEventV1 = req.into();
self.append_event(&ev.clone().into())?;
Ok(self.state.get_taint(&ev.taint_id).into())
}
pub fn api_handle_taint_get(&self, req: GetTaintRequest) -> GetTaintResponse {
todo!()
}
// Not implemented yet
// pub fn api_handle_taint_cancel(&mut self, req: CancelWantRequest) -> CancelWantResponse {
// todo!()
// }
}
impl Clone for BuildEventLog<MemoryBELStorage> {
fn clone(&self) -> Self {
Self {
storage: self.storage.clone(),
state: self.state.clone(),
event_broadcaster: self.event_broadcaster.clone(),
}
}
}
#[cfg(test)]
mod tests {
mod sqlite_bel_storage {
use crate::build_event_log::{BELStorage, BuildEventLog, SqliteBELStorage};
use crate::build_state::BuildState;
use crate::data_build_event::Event;
use crate::util::test_scenarios::default_originating_lifetime;
use crate::{PartitionRef, WantCreateEventV1};
use uuid::Uuid;
#[test]
fn test_sqlite_append_event() {
let storage =
SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage");
let state = BuildState::default();
let mut log = BuildEventLog {
storage,
state,
event_broadcaster: None,
};
let want_id = "sqlite_test_1234".to_string();
// Initial state - verify storage is empty
let events = log
.storage
.list_events(0, 100)
.expect("Failed to list events");
assert_eq!(events.len(), 0);
// Verify want doesn't exist in state
assert!(log.state.get_want(&want_id).is_none());
// Append an event
let mut e = WantCreateEventV1::default();
e.want_id = want_id.clone();
e.partitions = vec![PartitionRef {
r#ref: "sqlite_partition_1234".to_string(),
}];
e.lifetime = Some(default_originating_lifetime());
let event_id = log
.append_event(&Event::WantCreateV1(e))
.expect("append_event failed");
// Verify event was stored
assert!(event_id > 0);
// Verify event can be retrieved
let events = log
.storage
.list_events(0, 100)
.expect("Failed to list events");
assert_eq!(events.len(), 1);
let stored_event = &events[0];
assert_eq!(stored_event.event_id, event_id);
assert!(stored_event.timestamp > 0);
// Verify the event content
if let Some(Event::WantCreateV1(want_event)) = &stored_event.event {
assert_eq!(want_event.want_id, want_id);
assert_eq!(want_event.partitions.len(), 1);
assert_eq!(want_event.partitions[0].r#ref, "sqlite_partition_1234");
} else {
panic!("Expected WantCreateV1 event, got {:?}", stored_event.event);
}
// Verify state was updated
assert!(
log.state.get_want(&want_id).is_some(),
"want_id not found in state"
);
assert_eq!(
log.state
.get_want(&want_id)
.map(|want| want.want_id.clone())
.expect("state.wants want_id not found"),
want_id,
"want_id not equal in state",
);
let mut e2 = WantCreateEventV1::default();
e2.want_id = Uuid::new_v4().into();
e2.lifetime = Some(default_originating_lifetime());
log.append_event(&Event::WantCreateV1(e2))
.expect("append_event failed");
let mut e3 = WantCreateEventV1::default();
e3.want_id = Uuid::new_v4().into();
e3.lifetime = Some(default_originating_lifetime());
log.append_event(&Event::WantCreateV1(e3))
.expect("append_event failed");
let mut e4 = WantCreateEventV1::default();
e4.want_id = Uuid::new_v4().into();
e4.lifetime = Some(default_originating_lifetime());
log.append_event(&Event::WantCreateV1(e4))
.expect("append_event failed");
let events = log
.storage
.list_events(0, 100)
.expect("Failed to list events");
assert_eq!(events.len(), 4);
}
}
}