phase 3 - add missing new files

This commit is contained in:
Stuart Axelbrooke 2025-07-06 22:06:45 -07:00
parent cd221101de
commit 1bfda923b6
8 changed files with 1538 additions and 0 deletions

134
databuild/event_log/mod.rs Normal file
View file

@ -0,0 +1,134 @@
use crate::*;
use async_trait::async_trait;
use std::error::Error as StdError;
use uuid::Uuid;
pub mod stdout;
pub mod sqlite;
pub mod postgres;
#[derive(Debug)]
pub enum BuildEventLogError {
DatabaseError(String),
SerializationError(String),
ConnectionError(String),
QueryError(String),
}
impl std::fmt::Display for BuildEventLogError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BuildEventLogError::DatabaseError(msg) => write!(f, "Database error: {}", msg),
BuildEventLogError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
BuildEventLogError::ConnectionError(msg) => write!(f, "Connection error: {}", msg),
BuildEventLogError::QueryError(msg) => write!(f, "Query error: {}", msg),
}
}
}
impl StdError for BuildEventLogError {}
pub type Result<T> = std::result::Result<T, BuildEventLogError>;
#[derive(Debug, Clone)]
pub struct QueryResult {
pub columns: Vec<String>,
pub rows: Vec<Vec<String>>,
}
#[async_trait]
pub trait BuildEventLog: Send + Sync {
// Append new event to the log
async fn append_event(&self, event: BuildEvent) -> Result<()>;
// Query events by build request
async fn get_build_request_events(
&self,
build_request_id: &str,
since: Option<i64>
) -> Result<Vec<BuildEvent>>;
// Query events by partition
async fn get_partition_events(
&self,
partition_ref: &str,
since: Option<i64>
) -> Result<Vec<BuildEvent>>;
// Query events by job run
async fn get_job_run_events(
&self,
job_run_id: &str
) -> Result<Vec<BuildEvent>>;
// Query events in time range
async fn get_events_in_range(
&self,
start_time: i64,
end_time: i64
) -> Result<Vec<BuildEvent>>;
// Execute raw SQL queries (for dashboard and debugging)
async fn execute_query(&self, query: &str) -> Result<QueryResult>;
// Get latest partition availability status
async fn get_latest_partition_status(
&self,
partition_ref: &str
) -> Result<Option<(PartitionStatus, i64)>>; // status and timestamp
// Check if partition is being built by another request
async fn get_active_builds_for_partition(
&self,
partition_ref: &str
) -> Result<Vec<String>>; // build request IDs
// Initialize/setup the storage backend
async fn initialize(&self) -> Result<()>;
}
// Helper function to generate event ID
pub fn generate_event_id() -> String {
Uuid::new_v4().to_string()
}
// Helper function to get current timestamp in nanoseconds
pub fn current_timestamp_nanos() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as i64
}
// Helper function to create build event with metadata
pub fn create_build_event(
build_request_id: String,
event_type: crate::build_event::EventType,
) -> BuildEvent {
BuildEvent {
event_id: generate_event_id(),
timestamp: current_timestamp_nanos(),
build_request_id,
event_type: Some(event_type),
}
}
// Parse build event log URI and create appropriate implementation
pub async fn create_build_event_log(uri: &str) -> Result<Box<dyn BuildEventLog>> {
if uri == "stdout" {
Ok(Box::new(stdout::StdoutBuildEventLog::new()))
} else if uri.starts_with("sqlite://") {
let path = &uri[9..]; // Remove "sqlite://" prefix
let log = sqlite::SqliteBuildEventLog::new(path).await?;
log.initialize().await?;
Ok(Box::new(log))
} else if uri.starts_with("postgres://") {
let log = postgres::PostgresBuildEventLog::new(uri).await?;
log.initialize().await?;
Ok(Box::new(log))
} else {
Err(BuildEventLogError::ConnectionError(
format!("Unsupported build event log URI: {}", uri)
))
}
}

View file

@ -0,0 +1,95 @@
use super::*;
use async_trait::async_trait;
pub struct PostgresBuildEventLog {
_connection_string: String,
}
impl PostgresBuildEventLog {
pub async fn new(connection_string: &str) -> Result<Self> {
// For now, just store the connection string
// In a real implementation, we'd establish a connection pool here
Ok(Self {
_connection_string: connection_string.to_string(),
})
}
}
#[async_trait]
impl BuildEventLog for PostgresBuildEventLog {
async fn append_event(&self, _event: BuildEvent) -> Result<()> {
// TODO: Implement PostgreSQL event storage
Err(BuildEventLogError::DatabaseError(
"PostgreSQL implementation not yet available".to_string()
))
}
async fn get_build_request_events(
&self,
_build_request_id: &str,
_since: Option<i64>
) -> Result<Vec<BuildEvent>> {
Err(BuildEventLogError::DatabaseError(
"PostgreSQL implementation not yet available".to_string()
))
}
async fn get_partition_events(
&self,
_partition_ref: &str,
_since: Option<i64>
) -> Result<Vec<BuildEvent>> {
Err(BuildEventLogError::DatabaseError(
"PostgreSQL implementation not yet available".to_string()
))
}
async fn get_job_run_events(
&self,
_job_run_id: &str
) -> Result<Vec<BuildEvent>> {
Err(BuildEventLogError::DatabaseError(
"PostgreSQL implementation not yet available".to_string()
))
}
async fn get_events_in_range(
&self,
_start_time: i64,
_end_time: i64
) -> Result<Vec<BuildEvent>> {
Err(BuildEventLogError::DatabaseError(
"PostgreSQL implementation not yet available".to_string()
))
}
async fn execute_query(&self, _query: &str) -> Result<QueryResult> {
Err(BuildEventLogError::DatabaseError(
"PostgreSQL implementation not yet available".to_string()
))
}
async fn get_latest_partition_status(
&self,
_partition_ref: &str
) -> Result<Option<(PartitionStatus, i64)>> {
Err(BuildEventLogError::DatabaseError(
"PostgreSQL implementation not yet available".to_string()
))
}
async fn get_active_builds_for_partition(
&self,
_partition_ref: &str
) -> Result<Vec<String>> {
Err(BuildEventLogError::DatabaseError(
"PostgreSQL implementation not yet available".to_string()
))
}
async fn initialize(&self) -> Result<()> {
Err(BuildEventLogError::DatabaseError(
"PostgreSQL implementation not yet available".to_string()
))
}
}

View file

@ -0,0 +1,478 @@
use super::*;
use async_trait::async_trait;
use rusqlite::{params, Connection, Row};
use serde_json;
use std::sync::{Arc, Mutex};
pub struct SqliteBuildEventLog {
connection: Arc<Mutex<Connection>>,
}
impl SqliteBuildEventLog {
pub async fn new(path: &str) -> Result<Self> {
let conn = Connection::open(path)
.map_err(|e| BuildEventLogError::ConnectionError(e.to_string()))?;
Ok(Self {
connection: Arc::new(Mutex::new(conn)),
})
}
fn row_to_build_event(row: &Row) -> rusqlite::Result<BuildEvent> {
let event_id: String = row.get(0)?;
let timestamp: i64 = row.get(1)?;
let build_request_id: String = row.get(2)?;
let event_type_name: String = row.get(3)?;
// Determine event type and fetch additional data
let event_type = match event_type_name.as_str() {
"build_request" => {
// For now, create a basic event - in a real implementation we'd join with other tables
Some(crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: 0, // BUILD_REQUEST_UNKNOWN
requested_partitions: vec![],
message: String::new(),
}))
}
"partition" => {
Some(crate::build_event::EventType::PartitionEvent(PartitionEvent {
partition_ref: Some(PartitionRef { str: String::new() }),
status: 0, // PARTITION_UNKNOWN
message: String::new(),
job_run_id: String::new(),
}))
}
"job" => {
Some(crate::build_event::EventType::JobEvent(JobEvent {
job_run_id: String::new(),
job_label: Some(JobLabel { label: String::new() }),
target_partitions: vec![],
status: 0, // JOB_UNKNOWN
message: String::new(),
config: None,
manifests: vec![],
}))
}
"delegation" => {
Some(crate::build_event::EventType::DelegationEvent(DelegationEvent {
partition_ref: Some(PartitionRef { str: String::new() }),
delegated_to_build_request_id: String::new(),
message: String::new(),
}))
}
_ => None,
};
Ok(BuildEvent {
event_id,
timestamp,
build_request_id,
event_type,
})
}
}
#[async_trait]
impl BuildEventLog for SqliteBuildEventLog {
async fn append_event(&self, event: BuildEvent) -> Result<()> {
let conn = self.connection.lock().unwrap();
// First insert into build_events table
conn.execute(
"INSERT INTO build_events (event_id, timestamp, build_request_id, event_type) VALUES (?1, ?2, ?3, ?4)",
params![
event.event_id,
event.timestamp,
event.build_request_id,
match &event.event_type {
Some(crate::build_event::EventType::BuildRequestEvent(_)) => "build_request",
Some(crate::build_event::EventType::PartitionEvent(_)) => "partition",
Some(crate::build_event::EventType::JobEvent(_)) => "job",
Some(crate::build_event::EventType::DelegationEvent(_)) => "delegation",
None => "unknown",
}
],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
// Insert into specific event type table
match &event.event_type {
Some(crate::build_event::EventType::BuildRequestEvent(br_event)) => {
let partitions_json = serde_json::to_string(&br_event.requested_partitions)
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
conn.execute(
"INSERT INTO build_request_events (event_id, status, requested_partitions, message) VALUES (?1, ?2, ?3, ?4)",
params![
event.event_id,
br_event.status.to_string(),
partitions_json,
br_event.message
],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
}
Some(crate::build_event::EventType::PartitionEvent(p_event)) => {
conn.execute(
"INSERT INTO partition_events (event_id, partition_ref, status, message, job_run_id) VALUES (?1, ?2, ?3, ?4, ?5)",
params![
event.event_id,
p_event.partition_ref.as_ref().map(|r| &r.str).unwrap_or(&String::new()),
p_event.status.to_string(),
p_event.message,
if p_event.job_run_id.is_empty() { None } else { Some(&p_event.job_run_id) }
],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
}
Some(crate::build_event::EventType::JobEvent(j_event)) => {
let partitions_json = serde_json::to_string(&j_event.target_partitions)
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
let config_json = j_event.config.as_ref()
.map(|c| serde_json::to_string(c))
.transpose()
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
let manifests_json = serde_json::to_string(&j_event.manifests)
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
conn.execute(
"INSERT INTO job_events (event_id, job_run_id, job_label, target_partitions, status, message, config_json, manifests_json) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
params![
event.event_id,
j_event.job_run_id,
j_event.job_label.as_ref().map(|l| &l.label).unwrap_or(&String::new()),
partitions_json,
j_event.status.to_string(),
j_event.message,
config_json,
manifests_json
],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
}
Some(crate::build_event::EventType::DelegationEvent(d_event)) => {
conn.execute(
"INSERT INTO delegation_events (event_id, partition_ref, delegated_to_build_request_id, message) VALUES (?1, ?2, ?3, ?4)",
params![
event.event_id,
d_event.partition_ref.as_ref().map(|r| &r.str).unwrap_or(&String::new()),
d_event.delegated_to_build_request_id,
d_event.message
],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
}
None => {}
}
Ok(())
}
async fn get_build_request_events(
&self,
build_request_id: &str,
since: Option<i64>
) -> Result<Vec<BuildEvent>> {
let conn = self.connection.lock().unwrap();
let mut stmt = if let Some(since_timestamp) = since {
let mut stmt = conn.prepare("SELECT event_id, timestamp, build_request_id, event_type FROM build_events WHERE build_request_id = ?1 AND timestamp > ?2 ORDER BY timestamp")
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let rows = stmt.query_map(params![build_request_id, since_timestamp], Self::row_to_build_event)
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let mut events = Vec::new();
for row in rows {
events.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?);
}
return Ok(events);
} else {
conn.prepare("SELECT event_id, timestamp, build_request_id, event_type FROM build_events WHERE build_request_id = ?1 ORDER BY timestamp")
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?
};
let rows = stmt.query_map([build_request_id], Self::row_to_build_event)
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let mut events = Vec::new();
for row in rows {
events.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?);
}
Ok(events)
}
async fn get_partition_events(
&self,
partition_ref: &str,
since: Option<i64>
) -> Result<Vec<BuildEvent>> {
let conn = self.connection.lock().unwrap();
let mut stmt = if let Some(since_timestamp) = since {
let mut stmt = conn.prepare("SELECT be.event_id, be.timestamp, be.build_request_id, be.event_type
FROM build_events be
JOIN partition_events pe ON be.event_id = pe.event_id
WHERE pe.partition_ref = ?1 AND be.timestamp > ?2
ORDER BY be.timestamp")
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let rows = stmt.query_map(params![partition_ref, since_timestamp], Self::row_to_build_event)
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let mut events = Vec::new();
for row in rows {
events.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?);
}
return Ok(events);
} else {
conn.prepare("SELECT be.event_id, be.timestamp, be.build_request_id, be.event_type
FROM build_events be
JOIN partition_events pe ON be.event_id = pe.event_id
WHERE pe.partition_ref = ?1
ORDER BY be.timestamp")
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?
};
let rows = stmt.query_map([partition_ref], Self::row_to_build_event)
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let mut events = Vec::new();
for row in rows {
events.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?);
}
Ok(events)
}
async fn get_job_run_events(
&self,
job_run_id: &str
) -> Result<Vec<BuildEvent>> {
let conn = self.connection.lock().unwrap();
let query = "SELECT be.event_id, be.timestamp, be.build_request_id, be.event_type
FROM build_events be
JOIN job_events je ON be.event_id = je.event_id
WHERE je.job_run_id = ?1
ORDER BY be.timestamp";
let mut stmt = conn.prepare(query)
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let rows = stmt.query_map([job_run_id], Self::row_to_build_event)
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let mut events = Vec::new();
for row in rows {
events.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?);
}
Ok(events)
}
async fn get_events_in_range(
&self,
start_time: i64,
end_time: i64
) -> Result<Vec<BuildEvent>> {
let conn = self.connection.lock().unwrap();
let query = "SELECT event_id, timestamp, build_request_id, event_type
FROM build_events
WHERE timestamp >= ?1 AND timestamp <= ?2
ORDER BY timestamp";
let mut stmt = conn.prepare(query)
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let rows = stmt.query_map([start_time, end_time], Self::row_to_build_event)
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let mut events = Vec::new();
for row in rows {
events.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?);
}
Ok(events)
}
async fn execute_query(&self, query: &str) -> Result<QueryResult> {
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare(query)
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let column_count = stmt.column_count();
let columns: Vec<String> = (0..column_count)
.map(|i| stmt.column_name(i).unwrap_or("unknown").to_string())
.collect();
let rows = stmt.query_map([], |row| {
let mut row_data = Vec::new();
for i in 0..column_count {
let value: String = row.get(i).unwrap_or_default();
row_data.push(value);
}
Ok(row_data)
}).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let mut result_rows = Vec::new();
for row in rows {
result_rows.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?);
}
Ok(QueryResult {
columns,
rows: result_rows,
})
}
async fn get_latest_partition_status(
&self,
partition_ref: &str
) -> Result<Option<(PartitionStatus, i64)>> {
let conn = self.connection.lock().unwrap();
let query = "SELECT pe.status, be.timestamp
FROM partition_events pe
JOIN build_events be ON pe.event_id = be.event_id
WHERE pe.partition_ref = ?1
ORDER BY be.timestamp DESC
LIMIT 1";
let mut stmt = conn.prepare(query)
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let result = stmt.query_row([partition_ref], |row| {
let status_str: String = row.get(0)?;
let timestamp: i64 = row.get(1)?;
let status = status_str.parse::<i32>().unwrap_or(0);
Ok((status, timestamp))
});
match result {
Ok((status, timestamp)) => {
let partition_status = PartitionStatus::try_from(status).unwrap_or(PartitionStatus::PartitionUnknown);
Ok(Some((partition_status, timestamp)))
}
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(BuildEventLogError::QueryError(e.to_string())),
}
}
async fn get_active_builds_for_partition(
&self,
partition_ref: &str
) -> Result<Vec<String>> {
let conn = self.connection.lock().unwrap();
// Look for build requests that are actively building this partition
// A build is considered active if:
// 1. It has scheduled/building events for this partition, AND
// 2. The build request itself has not completed (status 4=COMPLETED or 5=FAILED)
let query = "SELECT DISTINCT be.build_request_id
FROM partition_events pe
JOIN build_events be ON pe.event_id = be.event_id
WHERE pe.partition_ref = ?1
AND pe.status IN ('2', '3') -- PARTITION_SCHEDULED or PARTITION_BUILDING
AND be.build_request_id NOT IN (
SELECT DISTINCT be3.build_request_id
FROM build_request_events bre
JOIN build_events be3 ON bre.event_id = be3.event_id
WHERE bre.status IN ('4', '5') -- BUILD_REQUEST_COMPLETED or BUILD_REQUEST_FAILED
)";
let mut stmt = conn.prepare(query)
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let rows = stmt.query_map([partition_ref], |row| {
let build_request_id: String = row.get(0)?;
Ok(build_request_id)
}).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let mut build_request_ids = Vec::new();
for row in rows {
build_request_ids.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?);
}
Ok(build_request_ids)
}
async fn initialize(&self) -> Result<()> {
let conn = self.connection.lock().unwrap();
// Create tables
conn.execute(
"CREATE TABLE IF NOT EXISTS build_events (
event_id TEXT PRIMARY KEY,
timestamp INTEGER NOT NULL,
build_request_id TEXT NOT NULL,
event_type TEXT NOT NULL
)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute(
"CREATE TABLE IF NOT EXISTS build_request_events (
event_id TEXT PRIMARY KEY REFERENCES build_events(event_id),
status TEXT NOT NULL,
requested_partitions TEXT NOT NULL,
message TEXT
)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute(
"CREATE TABLE IF NOT EXISTS partition_events (
event_id TEXT PRIMARY KEY REFERENCES build_events(event_id),
partition_ref TEXT NOT NULL,
status TEXT NOT NULL,
message TEXT,
job_run_id TEXT
)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute(
"CREATE TABLE IF NOT EXISTS job_events (
event_id TEXT PRIMARY KEY REFERENCES build_events(event_id),
job_run_id TEXT NOT NULL,
job_label TEXT NOT NULL,
target_partitions TEXT NOT NULL,
status TEXT NOT NULL,
message TEXT,
config_json TEXT,
manifests_json TEXT
)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute(
"CREATE TABLE IF NOT EXISTS delegation_events (
event_id TEXT PRIMARY KEY REFERENCES build_events(event_id),
partition_ref TEXT NOT NULL,
delegated_to_build_request_id TEXT NOT NULL,
message TEXT
)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
// Create indexes
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_build_events_build_request ON build_events(build_request_id, timestamp)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_build_events_timestamp ON build_events(timestamp)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_partition_events_partition ON partition_events(partition_ref)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_job_events_job_run ON job_events(job_run_id)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
Ok(())
}
}

View file

@ -0,0 +1,98 @@
use super::*;
use async_trait::async_trait;
use serde_json;
pub struct StdoutBuildEventLog;
impl StdoutBuildEventLog {
pub fn new() -> Self {
Self
}
}
#[async_trait]
impl BuildEventLog for StdoutBuildEventLog {
async fn append_event(&self, event: BuildEvent) -> Result<()> {
// Serialize the event to JSON and print to stdout
let json = serde_json::to_string(&event)
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
println!("BUILD_EVENT: {}", json);
Ok(())
}
async fn get_build_request_events(
&self,
_build_request_id: &str,
_since: Option<i64>
) -> Result<Vec<BuildEvent>> {
// Stdout implementation doesn't support querying
Err(BuildEventLogError::QueryError(
"Stdout build event log does not support querying".to_string()
))
}
async fn get_partition_events(
&self,
_partition_ref: &str,
_since: Option<i64>
) -> Result<Vec<BuildEvent>> {
// Stdout implementation doesn't support querying
Err(BuildEventLogError::QueryError(
"Stdout build event log does not support querying".to_string()
))
}
async fn get_job_run_events(
&self,
_job_run_id: &str
) -> Result<Vec<BuildEvent>> {
// Stdout implementation doesn't support querying
Err(BuildEventLogError::QueryError(
"Stdout build event log does not support querying".to_string()
))
}
async fn get_events_in_range(
&self,
_start_time: i64,
_end_time: i64
) -> Result<Vec<BuildEvent>> {
// Stdout implementation doesn't support querying
Err(BuildEventLogError::QueryError(
"Stdout build event log does not support querying".to_string()
))
}
async fn execute_query(&self, _query: &str) -> Result<QueryResult> {
// Stdout implementation doesn't support raw queries
Err(BuildEventLogError::QueryError(
"Stdout build event log does not support raw queries".to_string()
))
}
async fn get_latest_partition_status(
&self,
_partition_ref: &str
) -> Result<Option<(PartitionStatus, i64)>> {
// Stdout implementation doesn't support querying
Err(BuildEventLogError::QueryError(
"Stdout build event log does not support querying".to_string()
))
}
async fn get_active_builds_for_partition(
&self,
_partition_ref: &str
) -> Result<Vec<String>> {
// Stdout implementation doesn't support querying
Err(BuildEventLogError::QueryError(
"Stdout build event log does not support querying".to_string()
))
}
async fn initialize(&self) -> Result<()> {
// No initialization needed for stdout
Ok(())
}
}

11
databuild/lib.rs Normal file
View file

@ -0,0 +1,11 @@
// Include generated protobuf code
include!("databuild.rs");
// Event log module
pub mod event_log;
// Service module
pub mod service;
// Re-export commonly used types from event_log
pub use event_log::{BuildEventLog, BuildEventLogError, create_build_event_log};

View file

@ -0,0 +1,475 @@
use super::*;
use crate::event_log::{current_timestamp_nanos, create_build_event};
use axum::{
extract::{Path, State},
http::StatusCode,
response::Json,
};
use log::{error, info};
use std::process::Command;
use std::env;
pub async fn submit_build_request(
State(service): State<ServiceState>,
Json(request): Json<BuildRequest>,
) -> Result<Json<BuildRequestResponse>, (StatusCode, Json<ErrorResponse>)> {
let build_request_id = BuildGraphService::generate_build_request_id();
let timestamp = current_timestamp_nanos();
info!("Received build request {} for partitions: {:?}", build_request_id, request.partitions);
// Create build request state
let build_state = BuildRequestState {
build_request_id: build_request_id.clone(),
status: BuildRequestStatus::BuildRequestReceived,
requested_partitions: request.partitions.clone(),
created_at: timestamp,
updated_at: timestamp,
};
// Store in active builds
{
let mut active_builds = service.active_builds.write().await;
active_builds.insert(build_request_id.clone(), build_state);
}
// Log build request received event
let event = create_build_event(
build_request_id.clone(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestReceived as i32,
requested_partitions: request.partitions.iter()
.map(|p| PartitionRef { str: p.clone() })
.collect(),
message: "Build request received".to_string(),
}),
);
if let Err(e) = service.event_log.append_event(event).await {
error!("Failed to log build request received event: {}", e);
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: format!("Failed to log build request: {}", e),
}),
));
}
// Start build execution in background
let service_clone = service.clone();
let build_request_id_clone = build_request_id.clone();
let partitions_clone = request.partitions.clone();
tokio::spawn(async move {
if let Err(e) = execute_build_request(
service_clone,
build_request_id_clone,
partitions_clone,
).await {
error!("Build request execution failed: {}", e);
}
});
Ok(Json(BuildRequestResponse { build_request_id }))
}
pub async fn get_build_status(
State(service): State<ServiceState>,
Path(build_request_id): Path<String>,
) -> Result<Json<BuildStatusResponse>, (StatusCode, Json<ErrorResponse>)> {
// Get build request state
let build_state = {
let active_builds = service.active_builds.read().await;
active_builds.get(&build_request_id).cloned()
};
let build_state = match build_state {
Some(state) => state,
None => {
return Err((
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: "Build request not found".to_string(),
}),
));
}
};
// Get events for this build request
let events = match service.event_log.get_build_request_events(&build_request_id, None).await {
Ok(events) => events.into_iter().map(|e| BuildEventSummary {
event_id: e.event_id,
timestamp: e.timestamp,
event_type: event_type_to_string(&e.event_type),
message: event_to_message(&e.event_type),
}).collect(),
Err(e) => {
error!("Failed to get build request events: {}", e);
vec![]
}
};
Ok(Json(BuildStatusResponse {
build_request_id,
status: BuildGraphService::status_to_string(build_state.status),
requested_partitions: build_state.requested_partitions,
created_at: build_state.created_at,
updated_at: build_state.updated_at,
events,
}))
}
pub async fn cancel_build_request(
State(service): State<ServiceState>,
Path(build_request_id): Path<String>,
) -> Result<Json<serde_json::Value>, (StatusCode, Json<ErrorResponse>)> {
// Update build request state
{
let mut active_builds = service.active_builds.write().await;
if let Some(build_state) = active_builds.get_mut(&build_request_id) {
build_state.status = BuildRequestStatus::BuildRequestCancelled;
build_state.updated_at = current_timestamp_nanos();
} else {
return Err((
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: "Build request not found".to_string(),
}),
));
}
}
// Log cancellation event
let event = create_build_event(
build_request_id.clone(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestCancelled as i32,
requested_partitions: vec![],
message: "Build request cancelled".to_string(),
}),
);
if let Err(e) = service.event_log.append_event(event).await {
error!("Failed to log build request cancelled event: {}", e);
}
info!("Build request {} cancelled", build_request_id);
Ok(Json(serde_json::json!({
"cancelled": true,
"build_request_id": build_request_id
})))
}
pub async fn get_partition_status(
State(service): State<ServiceState>,
Path(partition_ref): Path<String>,
) -> Result<Json<PartitionStatusResponse>, (StatusCode, Json<ErrorResponse>)> {
// Get latest partition status
let (status, last_updated) = match service.event_log.get_latest_partition_status(&partition_ref).await {
Ok(Some((status, timestamp))) => (status, Some(timestamp)),
Ok(None) => (PartitionStatus::PartitionUnknown, None),
Err(e) => {
error!("Failed to get partition status: {}", e);
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: format!("Failed to get partition status: {}", e),
}),
));
}
};
// Get active builds for this partition
let build_requests = match service.event_log.get_active_builds_for_partition(&partition_ref).await {
Ok(builds) => builds,
Err(e) => {
error!("Failed to get active builds for partition: {}", e);
vec![]
}
};
Ok(Json(PartitionStatusResponse {
partition_ref,
status: BuildGraphService::partition_status_to_string(status),
last_updated,
build_requests,
}))
}
pub async fn get_partition_events(
State(service): State<ServiceState>,
Path(partition_ref): Path<String>,
) -> Result<Json<PartitionEventsResponse>, (StatusCode, Json<ErrorResponse>)> {
let events = match service.event_log.get_partition_events(&partition_ref, None).await {
Ok(events) => events.into_iter().map(|e| BuildEventSummary {
event_id: e.event_id,
timestamp: e.timestamp,
event_type: event_type_to_string(&e.event_type),
message: event_to_message(&e.event_type),
}).collect(),
Err(e) => {
error!("Failed to get partition events: {}", e);
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: format!("Failed to get partition events: {}", e),
}),
));
}
};
Ok(Json(PartitionEventsResponse {
partition_ref,
events,
}))
}
pub async fn analyze_build_graph(
State(service): State<ServiceState>,
Json(request): Json<AnalyzeRequest>,
) -> Result<Json<AnalyzeResponse>, (StatusCode, Json<ErrorResponse>)> {
// Call the analyze command (use temporary ID for analyze-only requests)
let temp_build_request_id = BuildGraphService::generate_build_request_id();
let analyze_result = run_analyze_command(&service, &temp_build_request_id, &request.partitions).await;
match analyze_result {
Ok(job_graph) => {
let job_graph_json = match serde_json::to_value(&job_graph) {
Ok(json) => json,
Err(e) => {
error!("Failed to serialize job graph: {}", e);
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: format!("Failed to serialize job graph: {}", e),
}),
));
}
};
Ok(Json(AnalyzeResponse {
job_graph: job_graph_json,
}))
}
Err(e) => {
error!("Failed to analyze build graph: {}", e);
Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: format!("Failed to analyze build graph: {}", e),
}),
))
}
}
}
async fn execute_build_request(
service: ServiceState,
build_request_id: String,
partitions: Vec<String>,
) -> Result<(), String> {
info!("Starting build execution for request {}", build_request_id);
// Update status to planning
update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestPlanning).await;
// Log planning event
let event = create_build_event(
build_request_id.clone(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestPlanning as i32,
requested_partitions: partitions.iter()
.map(|p| PartitionRef { str: p.clone() })
.collect(),
message: "Starting build planning".to_string(),
}),
);
if let Err(e) = service.event_log.append_event(event).await {
error!("Failed to log planning event: {}", e);
}
// Analyze the build graph
let job_graph = match run_analyze_command(&service, &build_request_id, &partitions).await {
Ok(graph) => graph,
Err(e) => {
error!("Failed to analyze build graph: {}", e);
update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestFailed).await;
return Err(e);
}
};
// Update status to executing
update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestExecuting).await;
// Log executing event
let event = create_build_event(
build_request_id.clone(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestExecuting as i32,
requested_partitions: partitions.iter()
.map(|p| PartitionRef { str: p.clone() })
.collect(),
message: "Starting build execution".to_string(),
}),
);
if let Err(e) = service.event_log.append_event(event).await {
error!("Failed to log executing event: {}", e);
}
// Execute the build graph
match run_execute_command(&service, &build_request_id, &job_graph).await {
Ok(_) => {
info!("Build request {} completed successfully", build_request_id);
update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestCompleted).await;
// Log completion event
let event = create_build_event(
build_request_id.clone(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestCompleted as i32,
requested_partitions: partitions.iter()
.map(|p| PartitionRef { str: p.clone() })
.collect(),
message: "Build request completed successfully".to_string(),
}),
);
if let Err(e) = service.event_log.append_event(event).await {
error!("Failed to log completion event: {}", e);
}
Ok(())
}
Err(e) => {
error!("Build request {} failed: {}", build_request_id, e);
update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestFailed).await;
// Log failure event
let event = create_build_event(
build_request_id.clone(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestFailed as i32,
requested_partitions: partitions.iter()
.map(|p| PartitionRef { str: p.clone() })
.collect(),
message: format!("Build request failed: {}", e),
}),
);
if let Err(e) = service.event_log.append_event(event).await {
error!("Failed to log failure event: {}", e);
}
Err(e)
}
}
}
async fn update_build_request_status(
service: &ServiceState,
build_request_id: &str,
status: BuildRequestStatus,
) {
let mut active_builds = service.active_builds.write().await;
if let Some(build_state) = active_builds.get_mut(build_request_id) {
build_state.status = status;
build_state.updated_at = current_timestamp_nanos();
}
}
async fn run_analyze_command(
service: &ServiceState,
build_request_id: &str,
partitions: &[String],
) -> Result<JobGraph, String> {
// Run analyze command
let analyze_binary = env::var("DATABUILD_ANALYZE_BINARY")
.unwrap_or_else(|_| "databuild_analyze".to_string());
let output = Command::new(&analyze_binary)
.args(partitions)
.env("DATABUILD_JOB_LOOKUP_PATH", &service.job_lookup_path)
.env("DATABUILD_CANDIDATE_JOBS", serde_json::to_string(&service.candidate_jobs).unwrap())
.env("DATABUILD_BUILD_EVENT_LOG", &service.event_log_uri)
.env("DATABUILD_BUILD_REQUEST_ID", build_request_id)
.output()
.map_err(|e| format!("Failed to execute analyze command: {}", e))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!("Analyze command failed: {}", stderr));
}
let stdout = String::from_utf8_lossy(&output.stdout);
let job_graph: JobGraph = serde_json::from_str(&stdout)
.map_err(|e| format!("Failed to parse analyze result: {}", e))?;
Ok(job_graph)
}
async fn run_execute_command(
service: &ServiceState,
build_request_id: &str,
job_graph: &JobGraph,
) -> Result<(), String> {
// Serialize job graph
let job_graph_json = serde_json::to_string(job_graph)
.map_err(|e| format!("Failed to serialize job graph: {}", e))?;
// Run execute command
let execute_binary = env::var("DATABUILD_EXECUTE_BINARY")
.unwrap_or_else(|_| "databuild_execute".to_string());
let mut child = Command::new(&execute_binary)
.env("DATABUILD_JOB_LOOKUP_PATH", &service.job_lookup_path)
.env("DATABUILD_CANDIDATE_JOBS", serde_json::to_string(&service.candidate_jobs).unwrap())
.env("DATABUILD_BUILD_EVENT_LOG", &service.event_log_uri)
.env("DATABUILD_BUILD_REQUEST_ID", build_request_id)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.map_err(|e| format!("Failed to spawn execute command: {}", e))?;
// Write job graph to stdin
if let Some(stdin) = child.stdin.take() {
use std::io::Write;
let mut stdin = stdin;
stdin.write_all(job_graph_json.as_bytes())
.map_err(|e| format!("Failed to write job graph to stdin: {}", e))?;
}
// Wait for completion
let output = child.wait_with_output()
.map_err(|e| format!("Failed to wait for execute command: {}", e))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(format!("Execute command failed: {}", stderr));
}
Ok(())
}
fn event_type_to_string(event_type: &Option<crate::build_event::EventType>) -> String {
match event_type {
Some(crate::build_event::EventType::BuildRequestEvent(_)) => "build_request".to_string(),
Some(crate::build_event::EventType::PartitionEvent(_)) => "partition".to_string(),
Some(crate::build_event::EventType::JobEvent(_)) => "job".to_string(),
Some(crate::build_event::EventType::DelegationEvent(_)) => "delegation".to_string(),
None => "unknown".to_string(),
}
}
fn event_to_message(event_type: &Option<crate::build_event::EventType>) -> String {
match event_type {
Some(crate::build_event::EventType::BuildRequestEvent(event)) => event.message.clone(),
Some(crate::build_event::EventType::PartitionEvent(event)) => event.message.clone(),
Some(crate::build_event::EventType::JobEvent(event)) => event.message.clone(),
Some(crate::build_event::EventType::DelegationEvent(event)) => event.message.clone(),
None => "Unknown event".to_string(),
}
}

97
databuild/service/main.rs Normal file
View file

@ -0,0 +1,97 @@
use databuild::service::BuildGraphService;
use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
use clap::{Arg, Command};
use log::info;
use simple_logger::SimpleLogger;
#[tokio::main]
async fn main() {
SimpleLogger::new().init().unwrap();
let matches = Command::new("build-graph-service")
.version("1.0")
.about("DataBuild Build Graph Service")
.arg(
Arg::new("port")
.short('p')
.long("port")
.value_name("PORT")
.help("Port to listen on")
.default_value("8080")
)
.arg(
Arg::new("host")
.long("host")
.value_name("HOST")
.help("Host to bind to")
.default_value("0.0.0.0")
)
.arg(
Arg::new("event-log")
.long("event-log")
.value_name("URI")
.help("Build event log URI")
.default_value("sqlite:///tmp/databuild.db")
)
.arg(
Arg::new("graph-label")
.long("graph-label")
.value_name("LABEL")
.help("Graph label")
.default_value("//example:graph")
)
.arg(
Arg::new("job-lookup-path")
.long("job-lookup-path")
.value_name("PATH")
.help("Job lookup binary path")
.default_value("job_lookup")
)
.get_matches();
let port: u16 = matches.get_one::<String>("port").unwrap()
.parse().expect("Invalid port number");
let host = matches.get_one::<String>("host").unwrap();
let event_log_uri = matches.get_one::<String>("event-log").unwrap();
let graph_label = matches.get_one::<String>("graph-label").unwrap().to_string();
let job_lookup_path = matches.get_one::<String>("job-lookup-path").unwrap().to_string();
// Get candidate jobs from environment
let candidate_jobs: HashMap<String, String> = env::var("DATABUILD_CANDIDATE_JOBS")
.map(|s| serde_json::from_str(&s).unwrap_or_else(|_| HashMap::new()))
.unwrap_or_else(|_| HashMap::new());
info!("Starting Build Graph Service on {}:{}", host, port);
info!("Event log URI: {}", event_log_uri);
info!("Graph label: {}", graph_label);
info!("Job lookup path: {}", job_lookup_path);
info!("Candidate jobs: {} configured", candidate_jobs.len());
// Create service
let service = match BuildGraphService::new(
event_log_uri,
graph_label,
job_lookup_path,
candidate_jobs,
).await {
Ok(service) => service,
Err(e) => {
eprintln!("Failed to create service: {}", e);
std::process::exit(1);
}
};
// Create router
let app = service.create_router();
// Start server
let addr: SocketAddr = format!("{}:{}", host, port).parse().unwrap();
info!("Build Graph Service listening on {}", addr);
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
axum::serve(listener, app.into_make_service())
.await
.unwrap();
}

150
databuild/service/mod.rs Normal file
View file

@ -0,0 +1,150 @@
use crate::*;
use crate::event_log::{BuildEventLog, BuildEventLogError, create_build_event_log};
use axum::{
routing::{get, post, delete},
Router,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use uuid::Uuid;
pub mod handlers;
#[derive(Clone)]
pub struct BuildGraphService {
pub event_log: Arc<dyn BuildEventLog>,
pub event_log_uri: String,
pub active_builds: Arc<RwLock<HashMap<String, BuildRequestState>>>,
pub graph_label: String,
pub job_lookup_path: String,
pub candidate_jobs: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub struct BuildRequestState {
pub build_request_id: String,
pub status: BuildRequestStatus,
pub requested_partitions: Vec<String>,
pub created_at: i64,
pub updated_at: i64,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct BuildRequest {
pub partitions: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct BuildRequestResponse {
pub build_request_id: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct BuildStatusResponse {
pub build_request_id: String,
pub status: String,
pub requested_partitions: Vec<String>,
pub created_at: i64,
pub updated_at: i64,
pub events: Vec<BuildEventSummary>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct BuildEventSummary {
pub event_id: String,
pub timestamp: i64,
pub event_type: String,
pub message: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PartitionStatusResponse {
pub partition_ref: String,
pub status: String,
pub last_updated: Option<i64>,
pub build_requests: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PartitionEventsResponse {
pub partition_ref: String,
pub events: Vec<BuildEventSummary>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct AnalyzeRequest {
pub partitions: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct AnalyzeResponse {
pub job_graph: serde_json::Value,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ErrorResponse {
pub error: String,
}
impl BuildGraphService {
pub async fn new(
event_log_uri: &str,
graph_label: String,
job_lookup_path: String,
candidate_jobs: HashMap<String, String>,
) -> Result<Self, BuildEventLogError> {
let event_log = create_build_event_log(event_log_uri).await?;
Ok(Self {
event_log: Arc::from(event_log),
event_log_uri: event_log_uri.to_string(),
active_builds: Arc::new(RwLock::new(HashMap::new())),
graph_label,
job_lookup_path,
candidate_jobs,
})
}
pub fn create_router(self) -> Router {
Router::new()
.route("/api/v1/builds", post(handlers::submit_build_request))
.route("/api/v1/builds/:id", get(handlers::get_build_status))
.route("/api/v1/builds/:id", delete(handlers::cancel_build_request))
.route("/api/v1/partitions/:ref/status", get(handlers::get_partition_status))
.route("/api/v1/partitions/:ref/events", get(handlers::get_partition_events))
.route("/api/v1/analyze", post(handlers::analyze_build_graph))
.with_state(Arc::new(self))
}
pub fn generate_build_request_id() -> String {
Uuid::new_v4().to_string()
}
pub fn status_to_string(status: BuildRequestStatus) -> String {
match status {
BuildRequestStatus::BuildRequestUnknown => "unknown".to_string(),
BuildRequestStatus::BuildRequestReceived => "received".to_string(),
BuildRequestStatus::BuildRequestPlanning => "planning".to_string(),
BuildRequestStatus::BuildRequestExecuting => "executing".to_string(),
BuildRequestStatus::BuildRequestCompleted => "completed".to_string(),
BuildRequestStatus::BuildRequestFailed => "failed".to_string(),
BuildRequestStatus::BuildRequestCancelled => "cancelled".to_string(),
}
}
pub fn partition_status_to_string(status: PartitionStatus) -> String {
match status {
PartitionStatus::PartitionUnknown => "unknown".to_string(),
PartitionStatus::PartitionRequested => "requested".to_string(),
PartitionStatus::PartitionScheduled => "scheduled".to_string(),
PartitionStatus::PartitionBuilding => "building".to_string(),
PartitionStatus::PartitionAvailable => "available".to_string(),
PartitionStatus::PartitionFailed => "failed".to_string(),
PartitionStatus::PartitionDelegated => "delegated".to_string(),
}
}
}
pub type ServiceState = Arc<BuildGraphService>;