755 lines
No EOL
31 KiB
Rust
755 lines
No EOL
31 KiB
Rust
use crate::*;
|
|
use crate::event_log::{BuildEventLog, BuildEventLogError, Result, QueryResult, BuildRequestSummary, PartitionSummary, ActivitySummary};
|
|
use async_trait::async_trait;
|
|
use std::sync::{Arc, Mutex};
|
|
use rusqlite::{Connection, params};
|
|
|
|
/// MockBuildEventLog provides an in-memory SQLite database for testing
|
|
///
|
|
/// This implementation makes it easy to specify test data and verify behavior
|
|
/// while using the real code paths for event writing and repository queries.
|
|
///
|
|
/// Key features:
|
|
/// - Uses in-memory SQLite for parallel test execution
|
|
/// - Provides event constructors with sensible defaults
|
|
/// - Allows easy specification of test scenarios
|
|
/// - Uses the same SQL schema as production SQLite implementation
|
|
pub struct MockBuildEventLog {
|
|
connection: Arc<Mutex<Connection>>,
|
|
}
|
|
|
|
impl MockBuildEventLog {
|
|
/// Create a new MockBuildEventLog with an in-memory SQLite database
|
|
pub async fn new() -> Result<Self> {
|
|
let mut conn = Connection::open(":memory:")
|
|
.map_err(|e| BuildEventLogError::ConnectionError(e.to_string()))?;
|
|
|
|
// Disable foreign key constraints for simplicity in testing
|
|
// conn.execute("PRAGMA foreign_keys = ON", [])
|
|
|
|
let mock = Self {
|
|
connection: Arc::new(Mutex::new(conn)),
|
|
};
|
|
|
|
// Initialize the schema
|
|
mock.initialize().await?;
|
|
|
|
Ok(mock)
|
|
}
|
|
|
|
/// Create a new MockBuildEventLog with predefined events
|
|
pub async fn with_events(events: Vec<BuildEvent>) -> Result<Self> {
|
|
let mock = Self::new().await?;
|
|
|
|
// Insert all provided events
|
|
for event in events {
|
|
mock.append_event(event).await?;
|
|
}
|
|
|
|
Ok(mock)
|
|
}
|
|
|
|
/// Get the number of events in the mock event log
|
|
pub async fn event_count(&self) -> Result<usize> {
|
|
let conn = self.connection.lock().unwrap();
|
|
let mut stmt = conn.prepare("SELECT COUNT(*) FROM build_events")
|
|
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
|
|
let count: i64 = stmt.query_row([], |row| row.get(0))
|
|
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
|
|
Ok(count as usize)
|
|
}
|
|
|
|
/// Get all events ordered by timestamp
|
|
pub async fn get_all_events(&self) -> Result<Vec<BuildEvent>> {
|
|
let conn = self.connection.lock().unwrap();
|
|
let mut stmt = conn.prepare(
|
|
"SELECT event_data FROM build_events ORDER BY timestamp ASC"
|
|
).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
|
|
let rows = stmt.query_map([], |row| {
|
|
let event_data: String = row.get(0)?;
|
|
Ok(event_data)
|
|
}).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
|
|
let mut events = Vec::new();
|
|
for row in rows {
|
|
let event_data = row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
let event: BuildEvent = serde_json::from_str(&event_data)
|
|
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
|
|
events.push(event);
|
|
}
|
|
|
|
Ok(events)
|
|
}
|
|
|
|
/// Clear all events from the mock event log
|
|
pub async fn clear(&self) -> Result<()> {
|
|
let conn = self.connection.lock().unwrap();
|
|
|
|
// Clear all tables
|
|
conn.execute("DELETE FROM build_events", [])
|
|
.map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
conn.execute("DELETE FROM build_request_events", [])
|
|
.map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
conn.execute("DELETE FROM partition_events", [])
|
|
.map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
conn.execute("DELETE FROM job_events", [])
|
|
.map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
conn.execute("DELETE FROM delegation_events", [])
|
|
.map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
conn.execute("DELETE FROM job_graph_events", [])
|
|
.map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl BuildEventLog for MockBuildEventLog {
|
|
async fn append_event(&self, event: BuildEvent) -> Result<()> {
|
|
let conn = self.connection.lock().unwrap();
|
|
|
|
// Serialize the entire event for storage
|
|
let event_data = serde_json::to_string(&event)
|
|
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
|
|
|
|
// Insert into main events table
|
|
conn.execute(
|
|
"INSERT INTO build_events (event_id, timestamp, build_request_id, event_type, event_data) VALUES (?1, ?2, ?3, ?4, ?5)",
|
|
params![
|
|
event.event_id,
|
|
event.timestamp,
|
|
event.build_request_id,
|
|
match &event.event_type {
|
|
Some(crate::build_event::EventType::BuildRequestEvent(_)) => "build_request",
|
|
Some(crate::build_event::EventType::PartitionEvent(_)) => "partition",
|
|
Some(crate::build_event::EventType::JobEvent(_)) => "job",
|
|
Some(crate::build_event::EventType::DelegationEvent(_)) => "delegation",
|
|
Some(crate::build_event::EventType::JobGraphEvent(_)) => "job_graph",
|
|
Some(crate::build_event::EventType::PartitionInvalidationEvent(_)) => "partition_invalidation",
|
|
Some(crate::build_event::EventType::TaskCancelEvent(_)) => "task_cancel",
|
|
Some(crate::build_event::EventType::BuildCancelEvent(_)) => "build_cancel",
|
|
None => "unknown",
|
|
},
|
|
event_data
|
|
],
|
|
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
|
|
// Insert into specific event type table for better querying
|
|
match &event.event_type {
|
|
Some(crate::build_event::EventType::BuildRequestEvent(br_event)) => {
|
|
let partitions_json = serde_json::to_string(&br_event.requested_partitions)
|
|
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
|
|
|
|
conn.execute(
|
|
"INSERT INTO build_request_events (event_id, status, requested_partitions, message) VALUES (?1, ?2, ?3, ?4)",
|
|
params![
|
|
event.event_id,
|
|
br_event.status_code.to_string(),
|
|
partitions_json,
|
|
br_event.message
|
|
],
|
|
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
}
|
|
Some(crate::build_event::EventType::PartitionEvent(p_event)) => {
|
|
conn.execute(
|
|
"INSERT INTO partition_events (event_id, partition_ref, status, message, job_run_id) VALUES (?1, ?2, ?3, ?4, ?5)",
|
|
params![
|
|
event.event_id,
|
|
p_event.partition_ref.as_ref().map(|r| &r.str).unwrap_or(&String::new()),
|
|
p_event.status_code.to_string(),
|
|
p_event.message,
|
|
if p_event.job_run_id.is_empty() { None } else { Some(&p_event.job_run_id) }
|
|
],
|
|
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
}
|
|
Some(crate::build_event::EventType::JobEvent(j_event)) => {
|
|
let partitions_json = serde_json::to_string(&j_event.target_partitions)
|
|
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
|
|
let config_json = j_event.config.as_ref()
|
|
.map(|c| serde_json::to_string(c))
|
|
.transpose()
|
|
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
|
|
let manifests_json = serde_json::to_string(&j_event.manifests)
|
|
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
|
|
|
|
conn.execute(
|
|
"INSERT INTO job_events (event_id, job_run_id, job_label, target_partitions, status, message, config_json, manifests_json) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
|
|
params![
|
|
event.event_id,
|
|
j_event.job_run_id,
|
|
j_event.job_label.as_ref().map(|l| &l.label).unwrap_or(&String::new()),
|
|
partitions_json,
|
|
j_event.status_code.to_string(),
|
|
j_event.message,
|
|
config_json,
|
|
manifests_json
|
|
],
|
|
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
}
|
|
Some(crate::build_event::EventType::DelegationEvent(d_event)) => {
|
|
conn.execute(
|
|
"INSERT INTO delegation_events (event_id, partition_ref, delegated_to_build_request_id, message) VALUES (?1, ?2, ?3, ?4)",
|
|
params![
|
|
event.event_id,
|
|
d_event.partition_ref.as_ref().map(|r| &r.str).unwrap_or(&String::new()),
|
|
d_event.delegated_to_build_request_id,
|
|
d_event.message
|
|
],
|
|
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
}
|
|
Some(crate::build_event::EventType::JobGraphEvent(jg_event)) => {
|
|
let job_graph_json = match serde_json::to_string(&jg_event.job_graph) {
|
|
Ok(json) => json,
|
|
Err(e) => {
|
|
return Err(BuildEventLogError::DatabaseError(format!("Failed to serialize job graph: {}", e)));
|
|
}
|
|
};
|
|
conn.execute(
|
|
"INSERT INTO job_graph_events (event_id, job_graph_json, message) VALUES (?1, ?2, ?3)",
|
|
params![
|
|
event.event_id,
|
|
job_graph_json,
|
|
jg_event.message
|
|
],
|
|
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
}
|
|
Some(crate::build_event::EventType::PartitionInvalidationEvent(_pi_event)) => {
|
|
// For now, just store in main events table
|
|
}
|
|
Some(crate::build_event::EventType::TaskCancelEvent(_tc_event)) => {
|
|
// For now, just store in main events table
|
|
}
|
|
Some(crate::build_event::EventType::BuildCancelEvent(_bc_event)) => {
|
|
// For now, just store in main events table
|
|
}
|
|
None => {}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn get_build_request_events(
|
|
&self,
|
|
build_request_id: &str,
|
|
since: Option<i64>
|
|
) -> Result<Vec<BuildEvent>> {
|
|
let conn = self.connection.lock().unwrap();
|
|
let (query, params): (String, Vec<_>) = match since {
|
|
Some(timestamp) => (
|
|
"SELECT event_data FROM build_events WHERE build_request_id = ?1 AND timestamp > ?2 ORDER BY timestamp ASC".to_string(),
|
|
vec![build_request_id.to_string(), timestamp.to_string()]
|
|
),
|
|
None => (
|
|
"SELECT event_data FROM build_events WHERE build_request_id = ?1 ORDER BY timestamp ASC".to_string(),
|
|
vec![build_request_id.to_string()]
|
|
)
|
|
};
|
|
|
|
let mut stmt = conn.prepare(&query)
|
|
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
|
|
let rows = stmt.query_map(rusqlite::params_from_iter(params.iter()), |row| {
|
|
let event_data: String = row.get(0)?;
|
|
Ok(event_data)
|
|
}).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
|
|
let mut events = Vec::new();
|
|
for row in rows {
|
|
let event_data = row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
let event: BuildEvent = serde_json::from_str(&event_data)
|
|
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
|
|
events.push(event);
|
|
}
|
|
|
|
Ok(events)
|
|
}
|
|
|
|
async fn get_partition_events(
|
|
&self,
|
|
partition_ref: &str,
|
|
since: Option<i64>
|
|
) -> Result<Vec<BuildEvent>> {
|
|
let conn = self.connection.lock().unwrap();
|
|
let (query, params): (String, Vec<_>) = match since {
|
|
Some(timestamp) => (
|
|
"SELECT be.event_data FROM build_events be JOIN partition_events pe ON be.event_id = pe.event_id WHERE pe.partition_ref = ?1 AND be.timestamp > ?2 ORDER BY be.timestamp ASC".to_string(),
|
|
vec![partition_ref.to_string(), timestamp.to_string()]
|
|
),
|
|
None => (
|
|
"SELECT be.event_data FROM build_events be JOIN partition_events pe ON be.event_id = pe.event_id WHERE pe.partition_ref = ?1 ORDER BY be.timestamp ASC".to_string(),
|
|
vec![partition_ref.to_string()]
|
|
)
|
|
};
|
|
|
|
let mut stmt = conn.prepare(&query)
|
|
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
|
|
let rows = stmt.query_map(rusqlite::params_from_iter(params.iter()), |row| {
|
|
let event_data: String = row.get(0)?;
|
|
Ok(event_data)
|
|
}).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
|
|
let mut events = Vec::new();
|
|
for row in rows {
|
|
let event_data = row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
let event: BuildEvent = serde_json::from_str(&event_data)
|
|
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
|
|
events.push(event);
|
|
}
|
|
|
|
Ok(events)
|
|
}
|
|
|
|
async fn get_job_run_events(
|
|
&self,
|
|
job_run_id: &str
|
|
) -> Result<Vec<BuildEvent>> {
|
|
let conn = self.connection.lock().unwrap();
|
|
let mut stmt = conn.prepare(
|
|
"SELECT be.event_data FROM build_events be JOIN job_events je ON be.event_id = je.event_id WHERE je.job_run_id = ?1 ORDER BY be.timestamp ASC"
|
|
).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
|
|
let rows = stmt.query_map([job_run_id], |row| {
|
|
let event_data: String = row.get(0)?;
|
|
Ok(event_data)
|
|
}).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
|
|
let mut events = Vec::new();
|
|
for row in rows {
|
|
let event_data = row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
let event: BuildEvent = serde_json::from_str(&event_data)
|
|
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
|
|
events.push(event);
|
|
}
|
|
|
|
Ok(events)
|
|
}
|
|
|
|
async fn get_events_in_range(
|
|
&self,
|
|
start_time: i64,
|
|
end_time: i64
|
|
) -> Result<Vec<BuildEvent>> {
|
|
let conn = self.connection.lock().unwrap();
|
|
let mut stmt = conn.prepare(
|
|
"SELECT event_data FROM build_events WHERE timestamp >= ?1 AND timestamp <= ?2 ORDER BY timestamp ASC"
|
|
).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
|
|
let rows = stmt.query_map([start_time, end_time], |row| {
|
|
let event_data: String = row.get(0)?;
|
|
Ok(event_data)
|
|
}).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
|
|
let mut events = Vec::new();
|
|
for row in rows {
|
|
let event_data = row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
let event: BuildEvent = serde_json::from_str(&event_data)
|
|
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
|
|
events.push(event);
|
|
}
|
|
|
|
Ok(events)
|
|
}
|
|
|
|
async fn execute_query(&self, query: &str) -> Result<QueryResult> {
|
|
let conn = self.connection.lock().unwrap();
|
|
let mut stmt = conn.prepare(query)
|
|
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
|
|
let column_names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
|
|
|
|
let rows = stmt.query_map([], |row| {
|
|
let mut values = Vec::new();
|
|
for i in 0..column_names.len() {
|
|
let value: String = row.get::<_, Option<String>>(i)?.unwrap_or_default();
|
|
values.push(value);
|
|
}
|
|
Ok(values)
|
|
}).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
|
|
let mut result_rows = Vec::new();
|
|
for row in rows {
|
|
let values = row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
result_rows.push(values);
|
|
}
|
|
|
|
Ok(QueryResult {
|
|
columns: column_names,
|
|
rows: result_rows,
|
|
})
|
|
}
|
|
|
|
async fn get_latest_partition_status(
|
|
&self,
|
|
partition_ref: &str
|
|
) -> Result<Option<(PartitionStatus, i64)>> {
|
|
let conn = self.connection.lock().unwrap();
|
|
let mut stmt = conn.prepare(
|
|
"SELECT pe.status, be.timestamp FROM build_events be JOIN partition_events pe ON be.event_id = pe.event_id WHERE pe.partition_ref = ?1 ORDER BY be.timestamp DESC LIMIT 1"
|
|
).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
|
|
let result = stmt.query_row([partition_ref], |row| {
|
|
let status_str: String = row.get(0)?;
|
|
let timestamp: i64 = row.get(1)?;
|
|
let status: i32 = status_str.parse().unwrap_or(0);
|
|
Ok((status, timestamp))
|
|
});
|
|
|
|
match result {
|
|
Ok((status, timestamp)) => {
|
|
let partition_status = match status {
|
|
1 => PartitionStatus::PartitionRequested,
|
|
2 => PartitionStatus::PartitionAnalyzed,
|
|
3 => PartitionStatus::PartitionBuilding,
|
|
4 => PartitionStatus::PartitionAvailable,
|
|
5 => PartitionStatus::PartitionFailed,
|
|
6 => PartitionStatus::PartitionDelegated,
|
|
_ => PartitionStatus::PartitionUnknown,
|
|
};
|
|
Ok(Some((partition_status, timestamp)))
|
|
}
|
|
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
|
|
Err(e) => Err(BuildEventLogError::QueryError(e.to_string())),
|
|
}
|
|
}
|
|
|
|
async fn get_active_builds_for_partition(
|
|
&self,
|
|
partition_ref: &str
|
|
) -> Result<Vec<String>> {
|
|
let conn = self.connection.lock().unwrap();
|
|
let mut stmt = conn.prepare(
|
|
"SELECT DISTINCT be.build_request_id FROM build_events be JOIN partition_events pe ON be.event_id = pe.event_id WHERE pe.partition_ref = ?1 AND pe.status IN ('1', '2', '3') ORDER BY be.timestamp DESC"
|
|
).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
|
|
let rows = stmt.query_map([partition_ref], |row| {
|
|
let build_request_id: String = row.get(0)?;
|
|
Ok(build_request_id)
|
|
}).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
|
|
let mut build_ids = Vec::new();
|
|
for row in rows {
|
|
let build_id = row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
build_ids.push(build_id);
|
|
}
|
|
|
|
Ok(build_ids)
|
|
}
|
|
|
|
async fn initialize(&self) -> Result<()> {
|
|
let conn = self.connection.lock().unwrap();
|
|
|
|
// Create main events table
|
|
conn.execute(
|
|
"CREATE TABLE IF NOT EXISTS build_events (
|
|
event_id TEXT PRIMARY KEY,
|
|
timestamp INTEGER NOT NULL,
|
|
build_request_id TEXT NOT NULL,
|
|
event_type TEXT NOT NULL,
|
|
event_data TEXT NOT NULL
|
|
)",
|
|
[],
|
|
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
|
|
// Create specific event type tables
|
|
conn.execute(
|
|
"CREATE TABLE IF NOT EXISTS build_request_events (
|
|
event_id TEXT PRIMARY KEY,
|
|
status TEXT NOT NULL,
|
|
requested_partitions TEXT NOT NULL,
|
|
message TEXT NOT NULL
|
|
)",
|
|
[],
|
|
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
|
|
conn.execute(
|
|
"CREATE TABLE IF NOT EXISTS partition_events (
|
|
event_id TEXT PRIMARY KEY,
|
|
partition_ref TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
message TEXT NOT NULL,
|
|
job_run_id TEXT
|
|
)",
|
|
[],
|
|
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
|
|
conn.execute(
|
|
"CREATE TABLE IF NOT EXISTS job_events (
|
|
event_id TEXT PRIMARY KEY,
|
|
job_run_id TEXT NOT NULL,
|
|
job_label TEXT NOT NULL,
|
|
target_partitions TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
message TEXT NOT NULL,
|
|
config_json TEXT,
|
|
manifests_json TEXT NOT NULL
|
|
)",
|
|
[],
|
|
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
|
|
conn.execute(
|
|
"CREATE TABLE IF NOT EXISTS delegation_events (
|
|
event_id TEXT PRIMARY KEY,
|
|
partition_ref TEXT NOT NULL,
|
|
delegated_to_build_request_id TEXT NOT NULL,
|
|
message TEXT NOT NULL
|
|
)",
|
|
[],
|
|
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
|
|
conn.execute(
|
|
"CREATE TABLE IF NOT EXISTS job_graph_events (
|
|
event_id TEXT PRIMARY KEY,
|
|
job_graph_json TEXT NOT NULL,
|
|
message TEXT NOT NULL
|
|
)",
|
|
[],
|
|
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
|
|
// Create indexes for common queries
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_build_events_build_request_id ON build_events (build_request_id)",
|
|
[],
|
|
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_build_events_timestamp ON build_events (timestamp)",
|
|
[],
|
|
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_partition_events_partition_ref ON partition_events (partition_ref)",
|
|
[],
|
|
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_job_events_job_run_id ON job_events (job_run_id)",
|
|
[],
|
|
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn list_build_requests(
|
|
&self,
|
|
limit: u32,
|
|
offset: u32,
|
|
status_filter: Option<BuildRequestStatus>,
|
|
) -> Result<(Vec<BuildRequestSummary>, u32)> {
|
|
// For simplicity in the mock, return empty results
|
|
// Real implementation would query the database
|
|
Ok((vec![], 0))
|
|
}
|
|
|
|
async fn list_recent_partitions(
|
|
&self,
|
|
limit: u32,
|
|
offset: u32,
|
|
status_filter: Option<PartitionStatus>,
|
|
) -> Result<(Vec<PartitionSummary>, u32)> {
|
|
// For simplicity in the mock, return empty results
|
|
// Real implementation would query the database
|
|
Ok((vec![], 0))
|
|
}
|
|
|
|
async fn get_activity_summary(&self) -> Result<ActivitySummary> {
|
|
// For simplicity in the mock, return empty activity
|
|
Ok(ActivitySummary {
|
|
active_builds_count: 0,
|
|
recent_builds: vec![],
|
|
recent_partitions: vec![],
|
|
total_partitions_count: 0,
|
|
})
|
|
}
|
|
|
|
async fn get_build_request_for_available_partition(
|
|
&self,
|
|
partition_ref: &str
|
|
) -> Result<Option<String>> {
|
|
let conn = self.connection.lock().unwrap();
|
|
let mut stmt = conn.prepare(
|
|
"SELECT be.build_request_id FROM build_events be JOIN partition_events pe ON be.event_id = pe.event_id WHERE pe.partition_ref = ?1 AND pe.status = '4' ORDER BY be.timestamp DESC LIMIT 1"
|
|
).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
|
|
|
|
let result = stmt.query_row([partition_ref], |row| {
|
|
let build_request_id: String = row.get(0)?;
|
|
Ok(build_request_id)
|
|
});
|
|
|
|
match result {
|
|
Ok(build_request_id) => Ok(Some(build_request_id)),
|
|
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
|
|
Err(e) => Err(BuildEventLogError::QueryError(e.to_string())),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Utility functions for creating test events with sensible defaults
|
|
pub mod test_events {
|
|
use super::*;
|
|
use crate::event_log::{generate_event_id, current_timestamp_nanos};
|
|
use uuid::Uuid;
|
|
|
|
/// Create a build request received event with random defaults
|
|
pub fn build_request_received(
|
|
build_request_id: Option<String>,
|
|
partitions: Vec<PartitionRef>,
|
|
) -> BuildEvent {
|
|
BuildEvent {
|
|
event_id: generate_event_id(),
|
|
timestamp: current_timestamp_nanos(),
|
|
build_request_id: build_request_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
|
|
event_type: Some(build_event::EventType::BuildRequestEvent(BuildRequestEvent {
|
|
status_code: BuildRequestStatus::BuildRequestReceived as i32,
|
|
status_name: BuildRequestStatus::BuildRequestReceived.to_display_string(),
|
|
requested_partitions: partitions,
|
|
message: "Build request received".to_string(),
|
|
})),
|
|
}
|
|
}
|
|
|
|
/// Create a build request event with specific status
|
|
pub fn build_request_event(
|
|
build_request_id: Option<String>,
|
|
partitions: Vec<PartitionRef>,
|
|
status: BuildRequestStatus,
|
|
) -> BuildEvent {
|
|
BuildEvent {
|
|
event_id: generate_event_id(),
|
|
timestamp: current_timestamp_nanos(),
|
|
build_request_id: build_request_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
|
|
event_type: Some(build_event::EventType::BuildRequestEvent(BuildRequestEvent {
|
|
status_code: status as i32,
|
|
status_name: status.to_display_string(),
|
|
requested_partitions: partitions,
|
|
message: format!("Build request status: {:?}", status),
|
|
})),
|
|
}
|
|
}
|
|
|
|
/// Create a partition status event with random defaults
|
|
pub fn partition_status(
|
|
build_request_id: Option<String>,
|
|
partition_ref: PartitionRef,
|
|
status: PartitionStatus,
|
|
job_run_id: Option<String>,
|
|
) -> BuildEvent {
|
|
BuildEvent {
|
|
event_id: generate_event_id(),
|
|
timestamp: current_timestamp_nanos(),
|
|
build_request_id: build_request_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
|
|
event_type: Some(build_event::EventType::PartitionEvent(PartitionEvent {
|
|
partition_ref: Some(partition_ref),
|
|
status_code: status as i32,
|
|
status_name: status.to_display_string(),
|
|
message: format!("Partition status: {:?}", status),
|
|
job_run_id: job_run_id.unwrap_or_default(),
|
|
})),
|
|
}
|
|
}
|
|
|
|
/// Create a job event with random defaults
|
|
pub fn job_event(
|
|
build_request_id: Option<String>,
|
|
job_run_id: Option<String>,
|
|
job_label: JobLabel,
|
|
target_partitions: Vec<PartitionRef>,
|
|
status: JobStatus,
|
|
) -> BuildEvent {
|
|
BuildEvent {
|
|
event_id: generate_event_id(),
|
|
timestamp: current_timestamp_nanos(),
|
|
build_request_id: build_request_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
|
|
event_type: Some(build_event::EventType::JobEvent(JobEvent {
|
|
job_run_id: job_run_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
|
|
job_label: Some(job_label),
|
|
target_partitions,
|
|
status_code: status as i32,
|
|
status_name: status.to_display_string(),
|
|
message: format!("Job status: {:?}", status),
|
|
config: None,
|
|
manifests: vec![],
|
|
})),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use super::test_events::*;
|
|
|
|
#[tokio::test]
|
|
async fn test_mock_build_event_log_basic() {
|
|
let mock = MockBuildEventLog::new().await.unwrap();
|
|
|
|
// Initially empty
|
|
assert_eq!(mock.event_count().await.unwrap(), 0);
|
|
|
|
// Add an event
|
|
let build_id = "test-build-123".to_string();
|
|
let partition = PartitionRef { str: "test/partition".to_string() };
|
|
let event = build_request_received(Some(build_id.clone()), vec![partition]);
|
|
|
|
mock.append_event(event).await.unwrap();
|
|
|
|
// Check event count
|
|
assert_eq!(mock.event_count().await.unwrap(), 1);
|
|
|
|
// Query events by build request
|
|
let events = mock.get_build_request_events(&build_id, None).await.unwrap();
|
|
assert_eq!(events.len(), 1);
|
|
|
|
// Clear events
|
|
mock.clear().await.unwrap();
|
|
assert_eq!(mock.event_count().await.unwrap(), 0);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_mock_build_event_log_with_predefined_events() {
|
|
let build_id = "test-build-456".to_string();
|
|
let partition = PartitionRef { str: "data/users".to_string() };
|
|
|
|
let events = vec![
|
|
build_request_received(Some(build_id.clone()), vec![partition.clone()]),
|
|
partition_status(Some(build_id.clone()), partition.clone(), PartitionStatus::PartitionBuilding, None),
|
|
partition_status(Some(build_id.clone()), partition.clone(), PartitionStatus::PartitionAvailable, None),
|
|
];
|
|
|
|
let mock = MockBuildEventLog::with_events(events).await.unwrap();
|
|
|
|
// Should have 3 events
|
|
assert_eq!(mock.event_count().await.unwrap(), 3);
|
|
|
|
// Query partition events
|
|
let partition_events = mock.get_partition_events(&partition.str, None).await.unwrap();
|
|
assert_eq!(partition_events.len(), 2); // Two partition events
|
|
|
|
// Check latest partition status
|
|
let latest_status = mock.get_latest_partition_status(&partition.str).await.unwrap();
|
|
assert!(latest_status.is_some());
|
|
let (status, _timestamp) = latest_status.unwrap();
|
|
assert_eq!(status, PartitionStatus::PartitionAvailable);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_event_constructors() {
|
|
let partition = PartitionRef { str: "test/data".to_string() };
|
|
let job_label = JobLabel { label: "//:test_job".to_string() };
|
|
|
|
// Test build request event constructor
|
|
let br_event = build_request_received(None, vec![partition.clone()]);
|
|
assert!(matches!(br_event.event_type, Some(build_event::EventType::BuildRequestEvent(_))));
|
|
|
|
// Test partition event constructor
|
|
let p_event = partition_status(None, partition.clone(), PartitionStatus::PartitionAvailable, None);
|
|
assert!(matches!(p_event.event_type, Some(build_event::EventType::PartitionEvent(_))));
|
|
|
|
// Test job event constructor
|
|
let j_event = job_event(None, None, job_label, vec![partition], JobStatus::JobCompleted);
|
|
assert!(matches!(j_event.event_type, Some(build_event::EventType::JobEvent(_))));
|
|
}
|
|
} |