From 1bfda923b62fff87014ff72a10a3fafd52f6507c Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Sun, 6 Jul 2025 22:06:45 -0700 Subject: [PATCH] phase 3 - add missing new files --- databuild/event_log/mod.rs | 134 +++++++++ databuild/event_log/postgres.rs | 95 +++++++ databuild/event_log/sqlite.rs | 478 ++++++++++++++++++++++++++++++++ databuild/event_log/stdout.rs | 98 +++++++ databuild/lib.rs | 11 + databuild/service/handlers.rs | 475 +++++++++++++++++++++++++++++++ databuild/service/main.rs | 97 +++++++ databuild/service/mod.rs | 150 ++++++++++ 8 files changed, 1538 insertions(+) create mode 100644 databuild/event_log/mod.rs create mode 100644 databuild/event_log/postgres.rs create mode 100644 databuild/event_log/sqlite.rs create mode 100644 databuild/event_log/stdout.rs create mode 100644 databuild/lib.rs create mode 100644 databuild/service/handlers.rs create mode 100644 databuild/service/main.rs create mode 100644 databuild/service/mod.rs diff --git a/databuild/event_log/mod.rs b/databuild/event_log/mod.rs new file mode 100644 index 0000000..716f507 --- /dev/null +++ b/databuild/event_log/mod.rs @@ -0,0 +1,134 @@ +use crate::*; +use async_trait::async_trait; +use std::error::Error as StdError; +use uuid::Uuid; + +pub mod stdout; +pub mod sqlite; +pub mod postgres; + +#[derive(Debug)] +pub enum BuildEventLogError { + DatabaseError(String), + SerializationError(String), + ConnectionError(String), + QueryError(String), +} + +impl std::fmt::Display for BuildEventLogError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + BuildEventLogError::DatabaseError(msg) => write!(f, "Database error: {}", msg), + BuildEventLogError::SerializationError(msg) => write!(f, "Serialization error: {}", msg), + BuildEventLogError::ConnectionError(msg) => write!(f, "Connection error: {}", msg), + BuildEventLogError::QueryError(msg) => write!(f, "Query error: {}", msg), + } + } +} + +impl StdError for BuildEventLogError {} + +pub type Result = std::result::Result; + +#[derive(Debug, Clone)] +pub struct QueryResult { + pub columns: Vec, + pub rows: Vec>, +} + +#[async_trait] +pub trait BuildEventLog: Send + Sync { + // Append new event to the log + async fn append_event(&self, event: BuildEvent) -> Result<()>; + + // Query events by build request + async fn get_build_request_events( + &self, + build_request_id: &str, + since: Option + ) -> Result>; + + // Query events by partition + async fn get_partition_events( + &self, + partition_ref: &str, + since: Option + ) -> Result>; + + // Query events by job run + async fn get_job_run_events( + &self, + job_run_id: &str + ) -> Result>; + + // Query events in time range + async fn get_events_in_range( + &self, + start_time: i64, + end_time: i64 + ) -> Result>; + + // Execute raw SQL queries (for dashboard and debugging) + async fn execute_query(&self, query: &str) -> Result; + + // Get latest partition availability status + async fn get_latest_partition_status( + &self, + partition_ref: &str + ) -> Result>; // status and timestamp + + // Check if partition is being built by another request + async fn get_active_builds_for_partition( + &self, + partition_ref: &str + ) -> Result>; // build request IDs + + // Initialize/setup the storage backend + async fn initialize(&self) -> Result<()>; +} + +// Helper function to generate event ID +pub fn generate_event_id() -> String { + Uuid::new_v4().to_string() +} + +// Helper function to get current timestamp in nanoseconds +pub fn current_timestamp_nanos() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() as i64 +} + +// Helper function to create build event with metadata +pub fn create_build_event( + build_request_id: String, + event_type: crate::build_event::EventType, +) -> BuildEvent { + BuildEvent { + event_id: generate_event_id(), + timestamp: current_timestamp_nanos(), + build_request_id, + event_type: Some(event_type), + } +} + +// Parse build event log URI and create appropriate implementation +pub async fn create_build_event_log(uri: &str) -> Result> { + if uri == "stdout" { + Ok(Box::new(stdout::StdoutBuildEventLog::new())) + } else if uri.starts_with("sqlite://") { + let path = &uri[9..]; // Remove "sqlite://" prefix + let log = sqlite::SqliteBuildEventLog::new(path).await?; + log.initialize().await?; + Ok(Box::new(log)) + } else if uri.starts_with("postgres://") { + let log = postgres::PostgresBuildEventLog::new(uri).await?; + log.initialize().await?; + Ok(Box::new(log)) + } else { + Err(BuildEventLogError::ConnectionError( + format!("Unsupported build event log URI: {}", uri) + )) + } +} \ No newline at end of file diff --git a/databuild/event_log/postgres.rs b/databuild/event_log/postgres.rs new file mode 100644 index 0000000..e637a99 --- /dev/null +++ b/databuild/event_log/postgres.rs @@ -0,0 +1,95 @@ +use super::*; +use async_trait::async_trait; + +pub struct PostgresBuildEventLog { + _connection_string: String, +} + +impl PostgresBuildEventLog { + pub async fn new(connection_string: &str) -> Result { + // For now, just store the connection string + // In a real implementation, we'd establish a connection pool here + Ok(Self { + _connection_string: connection_string.to_string(), + }) + } +} + +#[async_trait] +impl BuildEventLog for PostgresBuildEventLog { + async fn append_event(&self, _event: BuildEvent) -> Result<()> { + // TODO: Implement PostgreSQL event storage + Err(BuildEventLogError::DatabaseError( + "PostgreSQL implementation not yet available".to_string() + )) + } + + async fn get_build_request_events( + &self, + _build_request_id: &str, + _since: Option + ) -> Result> { + Err(BuildEventLogError::DatabaseError( + "PostgreSQL implementation not yet available".to_string() + )) + } + + async fn get_partition_events( + &self, + _partition_ref: &str, + _since: Option + ) -> Result> { + Err(BuildEventLogError::DatabaseError( + "PostgreSQL implementation not yet available".to_string() + )) + } + + async fn get_job_run_events( + &self, + _job_run_id: &str + ) -> Result> { + Err(BuildEventLogError::DatabaseError( + "PostgreSQL implementation not yet available".to_string() + )) + } + + async fn get_events_in_range( + &self, + _start_time: i64, + _end_time: i64 + ) -> Result> { + Err(BuildEventLogError::DatabaseError( + "PostgreSQL implementation not yet available".to_string() + )) + } + + async fn execute_query(&self, _query: &str) -> Result { + Err(BuildEventLogError::DatabaseError( + "PostgreSQL implementation not yet available".to_string() + )) + } + + async fn get_latest_partition_status( + &self, + _partition_ref: &str + ) -> Result> { + Err(BuildEventLogError::DatabaseError( + "PostgreSQL implementation not yet available".to_string() + )) + } + + async fn get_active_builds_for_partition( + &self, + _partition_ref: &str + ) -> Result> { + Err(BuildEventLogError::DatabaseError( + "PostgreSQL implementation not yet available".to_string() + )) + } + + async fn initialize(&self) -> Result<()> { + Err(BuildEventLogError::DatabaseError( + "PostgreSQL implementation not yet available".to_string() + )) + } +} \ No newline at end of file diff --git a/databuild/event_log/sqlite.rs b/databuild/event_log/sqlite.rs new file mode 100644 index 0000000..68b8a63 --- /dev/null +++ b/databuild/event_log/sqlite.rs @@ -0,0 +1,478 @@ +use super::*; +use async_trait::async_trait; +use rusqlite::{params, Connection, Row}; +use serde_json; +use std::sync::{Arc, Mutex}; + +pub struct SqliteBuildEventLog { + connection: Arc>, +} + +impl SqliteBuildEventLog { + pub async fn new(path: &str) -> Result { + let conn = Connection::open(path) + .map_err(|e| BuildEventLogError::ConnectionError(e.to_string()))?; + + Ok(Self { + connection: Arc::new(Mutex::new(conn)), + }) + } + + fn row_to_build_event(row: &Row) -> rusqlite::Result { + let event_id: String = row.get(0)?; + let timestamp: i64 = row.get(1)?; + let build_request_id: String = row.get(2)?; + let event_type_name: String = row.get(3)?; + + // Determine event type and fetch additional data + let event_type = match event_type_name.as_str() { + "build_request" => { + // For now, create a basic event - in a real implementation we'd join with other tables + Some(crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent { + status: 0, // BUILD_REQUEST_UNKNOWN + requested_partitions: vec![], + message: String::new(), + })) + } + "partition" => { + Some(crate::build_event::EventType::PartitionEvent(PartitionEvent { + partition_ref: Some(PartitionRef { str: String::new() }), + status: 0, // PARTITION_UNKNOWN + message: String::new(), + job_run_id: String::new(), + })) + } + "job" => { + Some(crate::build_event::EventType::JobEvent(JobEvent { + job_run_id: String::new(), + job_label: Some(JobLabel { label: String::new() }), + target_partitions: vec![], + status: 0, // JOB_UNKNOWN + message: String::new(), + config: None, + manifests: vec![], + })) + } + "delegation" => { + Some(crate::build_event::EventType::DelegationEvent(DelegationEvent { + partition_ref: Some(PartitionRef { str: String::new() }), + delegated_to_build_request_id: String::new(), + message: String::new(), + })) + } + _ => None, + }; + + Ok(BuildEvent { + event_id, + timestamp, + build_request_id, + event_type, + }) + } +} + +#[async_trait] +impl BuildEventLog for SqliteBuildEventLog { + async fn append_event(&self, event: BuildEvent) -> Result<()> { + let conn = self.connection.lock().unwrap(); + + // First insert into build_events table + conn.execute( + "INSERT INTO build_events (event_id, timestamp, build_request_id, event_type) VALUES (?1, ?2, ?3, ?4)", + params![ + event.event_id, + event.timestamp, + event.build_request_id, + match &event.event_type { + Some(crate::build_event::EventType::BuildRequestEvent(_)) => "build_request", + Some(crate::build_event::EventType::PartitionEvent(_)) => "partition", + Some(crate::build_event::EventType::JobEvent(_)) => "job", + Some(crate::build_event::EventType::DelegationEvent(_)) => "delegation", + None => "unknown", + } + ], + ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; + + // Insert into specific event type table + match &event.event_type { + Some(crate::build_event::EventType::BuildRequestEvent(br_event)) => { + let partitions_json = serde_json::to_string(&br_event.requested_partitions) + .map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?; + + conn.execute( + "INSERT INTO build_request_events (event_id, status, requested_partitions, message) VALUES (?1, ?2, ?3, ?4)", + params![ + event.event_id, + br_event.status.to_string(), + partitions_json, + br_event.message + ], + ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; + } + Some(crate::build_event::EventType::PartitionEvent(p_event)) => { + conn.execute( + "INSERT INTO partition_events (event_id, partition_ref, status, message, job_run_id) VALUES (?1, ?2, ?3, ?4, ?5)", + params![ + event.event_id, + p_event.partition_ref.as_ref().map(|r| &r.str).unwrap_or(&String::new()), + p_event.status.to_string(), + p_event.message, + if p_event.job_run_id.is_empty() { None } else { Some(&p_event.job_run_id) } + ], + ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; + } + Some(crate::build_event::EventType::JobEvent(j_event)) => { + let partitions_json = serde_json::to_string(&j_event.target_partitions) + .map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?; + let config_json = j_event.config.as_ref() + .map(|c| serde_json::to_string(c)) + .transpose() + .map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?; + let manifests_json = serde_json::to_string(&j_event.manifests) + .map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?; + + conn.execute( + "INSERT INTO job_events (event_id, job_run_id, job_label, target_partitions, status, message, config_json, manifests_json) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", + params![ + event.event_id, + j_event.job_run_id, + j_event.job_label.as_ref().map(|l| &l.label).unwrap_or(&String::new()), + partitions_json, + j_event.status.to_string(), + j_event.message, + config_json, + manifests_json + ], + ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; + } + Some(crate::build_event::EventType::DelegationEvent(d_event)) => { + conn.execute( + "INSERT INTO delegation_events (event_id, partition_ref, delegated_to_build_request_id, message) VALUES (?1, ?2, ?3, ?4)", + params![ + event.event_id, + d_event.partition_ref.as_ref().map(|r| &r.str).unwrap_or(&String::new()), + d_event.delegated_to_build_request_id, + d_event.message + ], + ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; + } + None => {} + } + + Ok(()) + } + + async fn get_build_request_events( + &self, + build_request_id: &str, + since: Option + ) -> Result> { + let conn = self.connection.lock().unwrap(); + + let mut stmt = if let Some(since_timestamp) = since { + let mut stmt = conn.prepare("SELECT event_id, timestamp, build_request_id, event_type FROM build_events WHERE build_request_id = ?1 AND timestamp > ?2 ORDER BY timestamp") + .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; + let rows = stmt.query_map(params![build_request_id, since_timestamp], Self::row_to_build_event) + .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; + + let mut events = Vec::new(); + for row in rows { + events.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?); + } + return Ok(events); + } else { + conn.prepare("SELECT event_id, timestamp, build_request_id, event_type FROM build_events WHERE build_request_id = ?1 ORDER BY timestamp") + .map_err(|e| BuildEventLogError::QueryError(e.to_string()))? + }; + + let rows = stmt.query_map([build_request_id], Self::row_to_build_event) + .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; + + let mut events = Vec::new(); + for row in rows { + events.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?); + } + + Ok(events) + } + + async fn get_partition_events( + &self, + partition_ref: &str, + since: Option + ) -> Result> { + let conn = self.connection.lock().unwrap(); + + let mut stmt = if let Some(since_timestamp) = since { + let mut stmt = conn.prepare("SELECT be.event_id, be.timestamp, be.build_request_id, be.event_type + FROM build_events be + JOIN partition_events pe ON be.event_id = pe.event_id + WHERE pe.partition_ref = ?1 AND be.timestamp > ?2 + ORDER BY be.timestamp") + .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; + let rows = stmt.query_map(params![partition_ref, since_timestamp], Self::row_to_build_event) + .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; + + let mut events = Vec::new(); + for row in rows { + events.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?); + } + return Ok(events); + } else { + conn.prepare("SELECT be.event_id, be.timestamp, be.build_request_id, be.event_type + FROM build_events be + JOIN partition_events pe ON be.event_id = pe.event_id + WHERE pe.partition_ref = ?1 + ORDER BY be.timestamp") + .map_err(|e| BuildEventLogError::QueryError(e.to_string()))? + }; + + let rows = stmt.query_map([partition_ref], Self::row_to_build_event) + .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; + + let mut events = Vec::new(); + for row in rows { + events.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?); + } + + Ok(events) + } + + async fn get_job_run_events( + &self, + job_run_id: &str + ) -> Result> { + let conn = self.connection.lock().unwrap(); + + let query = "SELECT be.event_id, be.timestamp, be.build_request_id, be.event_type + FROM build_events be + JOIN job_events je ON be.event_id = je.event_id + WHERE je.job_run_id = ?1 + ORDER BY be.timestamp"; + + let mut stmt = conn.prepare(query) + .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; + + let rows = stmt.query_map([job_run_id], Self::row_to_build_event) + .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; + + let mut events = Vec::new(); + for row in rows { + events.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?); + } + + Ok(events) + } + + async fn get_events_in_range( + &self, + start_time: i64, + end_time: i64 + ) -> Result> { + let conn = self.connection.lock().unwrap(); + + let query = "SELECT event_id, timestamp, build_request_id, event_type + FROM build_events + WHERE timestamp >= ?1 AND timestamp <= ?2 + ORDER BY timestamp"; + + let mut stmt = conn.prepare(query) + .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; + + let rows = stmt.query_map([start_time, end_time], Self::row_to_build_event) + .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; + + let mut events = Vec::new(); + for row in rows { + events.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?); + } + + Ok(events) + } + + async fn execute_query(&self, query: &str) -> Result { + let conn = self.connection.lock().unwrap(); + + let mut stmt = conn.prepare(query) + .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; + + let column_count = stmt.column_count(); + let columns: Vec = (0..column_count) + .map(|i| stmt.column_name(i).unwrap_or("unknown").to_string()) + .collect(); + + let rows = stmt.query_map([], |row| { + let mut row_data = Vec::new(); + for i in 0..column_count { + let value: String = row.get(i).unwrap_or_default(); + row_data.push(value); + } + Ok(row_data) + }).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; + + let mut result_rows = Vec::new(); + for row in rows { + result_rows.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?); + } + + Ok(QueryResult { + columns, + rows: result_rows, + }) + } + + async fn get_latest_partition_status( + &self, + partition_ref: &str + ) -> Result> { + let conn = self.connection.lock().unwrap(); + + let query = "SELECT pe.status, be.timestamp + FROM partition_events pe + JOIN build_events be ON pe.event_id = be.event_id + WHERE pe.partition_ref = ?1 + ORDER BY be.timestamp DESC + LIMIT 1"; + + let mut stmt = conn.prepare(query) + .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; + + let result = stmt.query_row([partition_ref], |row| { + let status_str: String = row.get(0)?; + let timestamp: i64 = row.get(1)?; + let status = status_str.parse::().unwrap_or(0); + Ok((status, timestamp)) + }); + + match result { + Ok((status, timestamp)) => { + let partition_status = PartitionStatus::try_from(status).unwrap_or(PartitionStatus::PartitionUnknown); + Ok(Some((partition_status, timestamp))) + } + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), + Err(e) => Err(BuildEventLogError::QueryError(e.to_string())), + } + } + + async fn get_active_builds_for_partition( + &self, + partition_ref: &str + ) -> Result> { + let conn = self.connection.lock().unwrap(); + + // Look for build requests that are actively building this partition + // A build is considered active if: + // 1. It has scheduled/building events for this partition, AND + // 2. The build request itself has not completed (status 4=COMPLETED or 5=FAILED) + let query = "SELECT DISTINCT be.build_request_id + FROM partition_events pe + JOIN build_events be ON pe.event_id = be.event_id + WHERE pe.partition_ref = ?1 + AND pe.status IN ('2', '3') -- PARTITION_SCHEDULED or PARTITION_BUILDING + AND be.build_request_id NOT IN ( + SELECT DISTINCT be3.build_request_id + FROM build_request_events bre + JOIN build_events be3 ON bre.event_id = be3.event_id + WHERE bre.status IN ('4', '5') -- BUILD_REQUEST_COMPLETED or BUILD_REQUEST_FAILED + )"; + + let mut stmt = conn.prepare(query) + .map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; + + let rows = stmt.query_map([partition_ref], |row| { + let build_request_id: String = row.get(0)?; + Ok(build_request_id) + }).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?; + + let mut build_request_ids = Vec::new(); + for row in rows { + build_request_ids.push(row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?); + } + + Ok(build_request_ids) + } + + async fn initialize(&self) -> Result<()> { + let conn = self.connection.lock().unwrap(); + + // Create tables + conn.execute( + "CREATE TABLE IF NOT EXISTS build_events ( + event_id TEXT PRIMARY KEY, + timestamp INTEGER NOT NULL, + build_request_id TEXT NOT NULL, + event_type TEXT NOT NULL + )", + [], + ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; + + conn.execute( + "CREATE TABLE IF NOT EXISTS build_request_events ( + event_id TEXT PRIMARY KEY REFERENCES build_events(event_id), + status TEXT NOT NULL, + requested_partitions TEXT NOT NULL, + message TEXT + )", + [], + ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; + + conn.execute( + "CREATE TABLE IF NOT EXISTS partition_events ( + event_id TEXT PRIMARY KEY REFERENCES build_events(event_id), + partition_ref TEXT NOT NULL, + status TEXT NOT NULL, + message TEXT, + job_run_id TEXT + )", + [], + ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; + + conn.execute( + "CREATE TABLE IF NOT EXISTS job_events ( + event_id TEXT PRIMARY KEY REFERENCES build_events(event_id), + job_run_id TEXT NOT NULL, + job_label TEXT NOT NULL, + target_partitions TEXT NOT NULL, + status TEXT NOT NULL, + message TEXT, + config_json TEXT, + manifests_json TEXT + )", + [], + ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; + + conn.execute( + "CREATE TABLE IF NOT EXISTS delegation_events ( + event_id TEXT PRIMARY KEY REFERENCES build_events(event_id), + partition_ref TEXT NOT NULL, + delegated_to_build_request_id TEXT NOT NULL, + message TEXT + )", + [], + ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; + + // Create indexes + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_build_events_build_request ON build_events(build_request_id, timestamp)", + [], + ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; + + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_build_events_timestamp ON build_events(timestamp)", + [], + ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; + + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_partition_events_partition ON partition_events(partition_ref)", + [], + ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; + + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_job_events_job_run ON job_events(job_run_id)", + [], + ).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?; + + Ok(()) + } +} \ No newline at end of file diff --git a/databuild/event_log/stdout.rs b/databuild/event_log/stdout.rs new file mode 100644 index 0000000..2f239eb --- /dev/null +++ b/databuild/event_log/stdout.rs @@ -0,0 +1,98 @@ +use super::*; +use async_trait::async_trait; +use serde_json; + +pub struct StdoutBuildEventLog; + +impl StdoutBuildEventLog { + pub fn new() -> Self { + Self + } +} + +#[async_trait] +impl BuildEventLog for StdoutBuildEventLog { + async fn append_event(&self, event: BuildEvent) -> Result<()> { + // Serialize the event to JSON and print to stdout + let json = serde_json::to_string(&event) + .map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?; + + println!("BUILD_EVENT: {}", json); + Ok(()) + } + + async fn get_build_request_events( + &self, + _build_request_id: &str, + _since: Option + ) -> Result> { + // Stdout implementation doesn't support querying + Err(BuildEventLogError::QueryError( + "Stdout build event log does not support querying".to_string() + )) + } + + async fn get_partition_events( + &self, + _partition_ref: &str, + _since: Option + ) -> Result> { + // Stdout implementation doesn't support querying + Err(BuildEventLogError::QueryError( + "Stdout build event log does not support querying".to_string() + )) + } + + async fn get_job_run_events( + &self, + _job_run_id: &str + ) -> Result> { + // Stdout implementation doesn't support querying + Err(BuildEventLogError::QueryError( + "Stdout build event log does not support querying".to_string() + )) + } + + async fn get_events_in_range( + &self, + _start_time: i64, + _end_time: i64 + ) -> Result> { + // Stdout implementation doesn't support querying + Err(BuildEventLogError::QueryError( + "Stdout build event log does not support querying".to_string() + )) + } + + async fn execute_query(&self, _query: &str) -> Result { + // Stdout implementation doesn't support raw queries + Err(BuildEventLogError::QueryError( + "Stdout build event log does not support raw queries".to_string() + )) + } + + async fn get_latest_partition_status( + &self, + _partition_ref: &str + ) -> Result> { + // Stdout implementation doesn't support querying + Err(BuildEventLogError::QueryError( + "Stdout build event log does not support querying".to_string() + )) + } + + async fn get_active_builds_for_partition( + &self, + _partition_ref: &str + ) -> Result> { + // Stdout implementation doesn't support querying + Err(BuildEventLogError::QueryError( + "Stdout build event log does not support querying".to_string() + )) + } + + async fn initialize(&self) -> Result<()> { + // No initialization needed for stdout + Ok(()) + } +} \ No newline at end of file diff --git a/databuild/lib.rs b/databuild/lib.rs new file mode 100644 index 0000000..8c2c914 --- /dev/null +++ b/databuild/lib.rs @@ -0,0 +1,11 @@ +// Include generated protobuf code +include!("databuild.rs"); + +// Event log module +pub mod event_log; + +// Service module +pub mod service; + +// Re-export commonly used types from event_log +pub use event_log::{BuildEventLog, BuildEventLogError, create_build_event_log}; \ No newline at end of file diff --git a/databuild/service/handlers.rs b/databuild/service/handlers.rs new file mode 100644 index 0000000..aa0fb10 --- /dev/null +++ b/databuild/service/handlers.rs @@ -0,0 +1,475 @@ +use super::*; +use crate::event_log::{current_timestamp_nanos, create_build_event}; +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::Json, +}; +use log::{error, info}; +use std::process::Command; +use std::env; + +pub async fn submit_build_request( + State(service): State, + Json(request): Json, +) -> Result, (StatusCode, Json)> { + let build_request_id = BuildGraphService::generate_build_request_id(); + let timestamp = current_timestamp_nanos(); + + info!("Received build request {} for partitions: {:?}", build_request_id, request.partitions); + + // Create build request state + let build_state = BuildRequestState { + build_request_id: build_request_id.clone(), + status: BuildRequestStatus::BuildRequestReceived, + requested_partitions: request.partitions.clone(), + created_at: timestamp, + updated_at: timestamp, + }; + + // Store in active builds + { + let mut active_builds = service.active_builds.write().await; + active_builds.insert(build_request_id.clone(), build_state); + } + + // Log build request received event + let event = create_build_event( + build_request_id.clone(), + crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent { + status: BuildRequestStatus::BuildRequestReceived as i32, + requested_partitions: request.partitions.iter() + .map(|p| PartitionRef { str: p.clone() }) + .collect(), + message: "Build request received".to_string(), + }), + ); + + if let Err(e) = service.event_log.append_event(event).await { + error!("Failed to log build request received event: {}", e); + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { + error: format!("Failed to log build request: {}", e), + }), + )); + } + + // Start build execution in background + let service_clone = service.clone(); + let build_request_id_clone = build_request_id.clone(); + let partitions_clone = request.partitions.clone(); + + tokio::spawn(async move { + if let Err(e) = execute_build_request( + service_clone, + build_request_id_clone, + partitions_clone, + ).await { + error!("Build request execution failed: {}", e); + } + }); + + Ok(Json(BuildRequestResponse { build_request_id })) +} + +pub async fn get_build_status( + State(service): State, + Path(build_request_id): Path, +) -> Result, (StatusCode, Json)> { + // Get build request state + let build_state = { + let active_builds = service.active_builds.read().await; + active_builds.get(&build_request_id).cloned() + }; + + let build_state = match build_state { + Some(state) => state, + None => { + return Err(( + StatusCode::NOT_FOUND, + Json(ErrorResponse { + error: "Build request not found".to_string(), + }), + )); + } + }; + + // Get events for this build request + let events = match service.event_log.get_build_request_events(&build_request_id, None).await { + Ok(events) => events.into_iter().map(|e| BuildEventSummary { + event_id: e.event_id, + timestamp: e.timestamp, + event_type: event_type_to_string(&e.event_type), + message: event_to_message(&e.event_type), + }).collect(), + Err(e) => { + error!("Failed to get build request events: {}", e); + vec![] + } + }; + + Ok(Json(BuildStatusResponse { + build_request_id, + status: BuildGraphService::status_to_string(build_state.status), + requested_partitions: build_state.requested_partitions, + created_at: build_state.created_at, + updated_at: build_state.updated_at, + events, + })) +} + +pub async fn cancel_build_request( + State(service): State, + Path(build_request_id): Path, +) -> Result, (StatusCode, Json)> { + // Update build request state + { + let mut active_builds = service.active_builds.write().await; + if let Some(build_state) = active_builds.get_mut(&build_request_id) { + build_state.status = BuildRequestStatus::BuildRequestCancelled; + build_state.updated_at = current_timestamp_nanos(); + } else { + return Err(( + StatusCode::NOT_FOUND, + Json(ErrorResponse { + error: "Build request not found".to_string(), + }), + )); + } + } + + // Log cancellation event + let event = create_build_event( + build_request_id.clone(), + crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent { + status: BuildRequestStatus::BuildRequestCancelled as i32, + requested_partitions: vec![], + message: "Build request cancelled".to_string(), + }), + ); + + if let Err(e) = service.event_log.append_event(event).await { + error!("Failed to log build request cancelled event: {}", e); + } + + info!("Build request {} cancelled", build_request_id); + + Ok(Json(serde_json::json!({ + "cancelled": true, + "build_request_id": build_request_id + }))) +} + +pub async fn get_partition_status( + State(service): State, + Path(partition_ref): Path, +) -> Result, (StatusCode, Json)> { + // Get latest partition status + let (status, last_updated) = match service.event_log.get_latest_partition_status(&partition_ref).await { + Ok(Some((status, timestamp))) => (status, Some(timestamp)), + Ok(None) => (PartitionStatus::PartitionUnknown, None), + Err(e) => { + error!("Failed to get partition status: {}", e); + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { + error: format!("Failed to get partition status: {}", e), + }), + )); + } + }; + + // Get active builds for this partition + let build_requests = match service.event_log.get_active_builds_for_partition(&partition_ref).await { + Ok(builds) => builds, + Err(e) => { + error!("Failed to get active builds for partition: {}", e); + vec![] + } + }; + + Ok(Json(PartitionStatusResponse { + partition_ref, + status: BuildGraphService::partition_status_to_string(status), + last_updated, + build_requests, + })) +} + +pub async fn get_partition_events( + State(service): State, + Path(partition_ref): Path, +) -> Result, (StatusCode, Json)> { + let events = match service.event_log.get_partition_events(&partition_ref, None).await { + Ok(events) => events.into_iter().map(|e| BuildEventSummary { + event_id: e.event_id, + timestamp: e.timestamp, + event_type: event_type_to_string(&e.event_type), + message: event_to_message(&e.event_type), + }).collect(), + Err(e) => { + error!("Failed to get partition events: {}", e); + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { + error: format!("Failed to get partition events: {}", e), + }), + )); + } + }; + + Ok(Json(PartitionEventsResponse { + partition_ref, + events, + })) +} + +pub async fn analyze_build_graph( + State(service): State, + Json(request): Json, +) -> Result, (StatusCode, Json)> { + // Call the analyze command (use temporary ID for analyze-only requests) + let temp_build_request_id = BuildGraphService::generate_build_request_id(); + let analyze_result = run_analyze_command(&service, &temp_build_request_id, &request.partitions).await; + + match analyze_result { + Ok(job_graph) => { + let job_graph_json = match serde_json::to_value(&job_graph) { + Ok(json) => json, + Err(e) => { + error!("Failed to serialize job graph: {}", e); + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { + error: format!("Failed to serialize job graph: {}", e), + }), + )); + } + }; + + Ok(Json(AnalyzeResponse { + job_graph: job_graph_json, + })) + } + Err(e) => { + error!("Failed to analyze build graph: {}", e); + Err(( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { + error: format!("Failed to analyze build graph: {}", e), + }), + )) + } + } +} + +async fn execute_build_request( + service: ServiceState, + build_request_id: String, + partitions: Vec, +) -> Result<(), String> { + info!("Starting build execution for request {}", build_request_id); + + // Update status to planning + update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestPlanning).await; + + // Log planning event + let event = create_build_event( + build_request_id.clone(), + crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent { + status: BuildRequestStatus::BuildRequestPlanning as i32, + requested_partitions: partitions.iter() + .map(|p| PartitionRef { str: p.clone() }) + .collect(), + message: "Starting build planning".to_string(), + }), + ); + + if let Err(e) = service.event_log.append_event(event).await { + error!("Failed to log planning event: {}", e); + } + + // Analyze the build graph + let job_graph = match run_analyze_command(&service, &build_request_id, &partitions).await { + Ok(graph) => graph, + Err(e) => { + error!("Failed to analyze build graph: {}", e); + update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestFailed).await; + return Err(e); + } + }; + + // Update status to executing + update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestExecuting).await; + + // Log executing event + let event = create_build_event( + build_request_id.clone(), + crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent { + status: BuildRequestStatus::BuildRequestExecuting as i32, + requested_partitions: partitions.iter() + .map(|p| PartitionRef { str: p.clone() }) + .collect(), + message: "Starting build execution".to_string(), + }), + ); + + if let Err(e) = service.event_log.append_event(event).await { + error!("Failed to log executing event: {}", e); + } + + // Execute the build graph + match run_execute_command(&service, &build_request_id, &job_graph).await { + Ok(_) => { + info!("Build request {} completed successfully", build_request_id); + update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestCompleted).await; + + // Log completion event + let event = create_build_event( + build_request_id.clone(), + crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent { + status: BuildRequestStatus::BuildRequestCompleted as i32, + requested_partitions: partitions.iter() + .map(|p| PartitionRef { str: p.clone() }) + .collect(), + message: "Build request completed successfully".to_string(), + }), + ); + + if let Err(e) = service.event_log.append_event(event).await { + error!("Failed to log completion event: {}", e); + } + + Ok(()) + } + Err(e) => { + error!("Build request {} failed: {}", build_request_id, e); + update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestFailed).await; + + // Log failure event + let event = create_build_event( + build_request_id.clone(), + crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent { + status: BuildRequestStatus::BuildRequestFailed as i32, + requested_partitions: partitions.iter() + .map(|p| PartitionRef { str: p.clone() }) + .collect(), + message: format!("Build request failed: {}", e), + }), + ); + + if let Err(e) = service.event_log.append_event(event).await { + error!("Failed to log failure event: {}", e); + } + + Err(e) + } + } +} + +async fn update_build_request_status( + service: &ServiceState, + build_request_id: &str, + status: BuildRequestStatus, +) { + let mut active_builds = service.active_builds.write().await; + if let Some(build_state) = active_builds.get_mut(build_request_id) { + build_state.status = status; + build_state.updated_at = current_timestamp_nanos(); + } +} + +async fn run_analyze_command( + service: &ServiceState, + build_request_id: &str, + partitions: &[String], +) -> Result { + // Run analyze command + let analyze_binary = env::var("DATABUILD_ANALYZE_BINARY") + .unwrap_or_else(|_| "databuild_analyze".to_string()); + + let output = Command::new(&analyze_binary) + .args(partitions) + .env("DATABUILD_JOB_LOOKUP_PATH", &service.job_lookup_path) + .env("DATABUILD_CANDIDATE_JOBS", serde_json::to_string(&service.candidate_jobs).unwrap()) + .env("DATABUILD_BUILD_EVENT_LOG", &service.event_log_uri) + .env("DATABUILD_BUILD_REQUEST_ID", build_request_id) + .output() + .map_err(|e| format!("Failed to execute analyze command: {}", e))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(format!("Analyze command failed: {}", stderr)); + } + + let stdout = String::from_utf8_lossy(&output.stdout); + let job_graph: JobGraph = serde_json::from_str(&stdout) + .map_err(|e| format!("Failed to parse analyze result: {}", e))?; + + Ok(job_graph) +} + +async fn run_execute_command( + service: &ServiceState, + build_request_id: &str, + job_graph: &JobGraph, +) -> Result<(), String> { + // Serialize job graph + let job_graph_json = serde_json::to_string(job_graph) + .map_err(|e| format!("Failed to serialize job graph: {}", e))?; + + // Run execute command + let execute_binary = env::var("DATABUILD_EXECUTE_BINARY") + .unwrap_or_else(|_| "databuild_execute".to_string()); + + let mut child = Command::new(&execute_binary) + .env("DATABUILD_JOB_LOOKUP_PATH", &service.job_lookup_path) + .env("DATABUILD_CANDIDATE_JOBS", serde_json::to_string(&service.candidate_jobs).unwrap()) + .env("DATABUILD_BUILD_EVENT_LOG", &service.event_log_uri) + .env("DATABUILD_BUILD_REQUEST_ID", build_request_id) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .map_err(|e| format!("Failed to spawn execute command: {}", e))?; + + // Write job graph to stdin + if let Some(stdin) = child.stdin.take() { + use std::io::Write; + let mut stdin = stdin; + stdin.write_all(job_graph_json.as_bytes()) + .map_err(|e| format!("Failed to write job graph to stdin: {}", e))?; + } + + // Wait for completion + let output = child.wait_with_output() + .map_err(|e| format!("Failed to wait for execute command: {}", e))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(format!("Execute command failed: {}", stderr)); + } + + Ok(()) +} + +fn event_type_to_string(event_type: &Option) -> String { + match event_type { + Some(crate::build_event::EventType::BuildRequestEvent(_)) => "build_request".to_string(), + Some(crate::build_event::EventType::PartitionEvent(_)) => "partition".to_string(), + Some(crate::build_event::EventType::JobEvent(_)) => "job".to_string(), + Some(crate::build_event::EventType::DelegationEvent(_)) => "delegation".to_string(), + None => "unknown".to_string(), + } +} + +fn event_to_message(event_type: &Option) -> String { + match event_type { + Some(crate::build_event::EventType::BuildRequestEvent(event)) => event.message.clone(), + Some(crate::build_event::EventType::PartitionEvent(event)) => event.message.clone(), + Some(crate::build_event::EventType::JobEvent(event)) => event.message.clone(), + Some(crate::build_event::EventType::DelegationEvent(event)) => event.message.clone(), + None => "Unknown event".to_string(), + } +} \ No newline at end of file diff --git a/databuild/service/main.rs b/databuild/service/main.rs new file mode 100644 index 0000000..783ff00 --- /dev/null +++ b/databuild/service/main.rs @@ -0,0 +1,97 @@ +use databuild::service::BuildGraphService; +use std::collections::HashMap; +use std::env; +use std::net::SocketAddr; +use clap::{Arg, Command}; +use log::info; +use simple_logger::SimpleLogger; + +#[tokio::main] +async fn main() { + SimpleLogger::new().init().unwrap(); + + let matches = Command::new("build-graph-service") + .version("1.0") + .about("DataBuild Build Graph Service") + .arg( + Arg::new("port") + .short('p') + .long("port") + .value_name("PORT") + .help("Port to listen on") + .default_value("8080") + ) + .arg( + Arg::new("host") + .long("host") + .value_name("HOST") + .help("Host to bind to") + .default_value("0.0.0.0") + ) + .arg( + Arg::new("event-log") + .long("event-log") + .value_name("URI") + .help("Build event log URI") + .default_value("sqlite:///tmp/databuild.db") + ) + .arg( + Arg::new("graph-label") + .long("graph-label") + .value_name("LABEL") + .help("Graph label") + .default_value("//example:graph") + ) + .arg( + Arg::new("job-lookup-path") + .long("job-lookup-path") + .value_name("PATH") + .help("Job lookup binary path") + .default_value("job_lookup") + ) + .get_matches(); + + let port: u16 = matches.get_one::("port").unwrap() + .parse().expect("Invalid port number"); + let host = matches.get_one::("host").unwrap(); + let event_log_uri = matches.get_one::("event-log").unwrap(); + let graph_label = matches.get_one::("graph-label").unwrap().to_string(); + let job_lookup_path = matches.get_one::("job-lookup-path").unwrap().to_string(); + + // Get candidate jobs from environment + let candidate_jobs: HashMap = env::var("DATABUILD_CANDIDATE_JOBS") + .map(|s| serde_json::from_str(&s).unwrap_or_else(|_| HashMap::new())) + .unwrap_or_else(|_| HashMap::new()); + + info!("Starting Build Graph Service on {}:{}", host, port); + info!("Event log URI: {}", event_log_uri); + info!("Graph label: {}", graph_label); + info!("Job lookup path: {}", job_lookup_path); + info!("Candidate jobs: {} configured", candidate_jobs.len()); + + // Create service + let service = match BuildGraphService::new( + event_log_uri, + graph_label, + job_lookup_path, + candidate_jobs, + ).await { + Ok(service) => service, + Err(e) => { + eprintln!("Failed to create service: {}", e); + std::process::exit(1); + } + }; + + // Create router + let app = service.create_router(); + + // Start server + let addr: SocketAddr = format!("{}:{}", host, port).parse().unwrap(); + info!("Build Graph Service listening on {}", addr); + + let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); + axum::serve(listener, app.into_make_service()) + .await + .unwrap(); +} \ No newline at end of file diff --git a/databuild/service/mod.rs b/databuild/service/mod.rs new file mode 100644 index 0000000..fc152b0 --- /dev/null +++ b/databuild/service/mod.rs @@ -0,0 +1,150 @@ +use crate::*; +use crate::event_log::{BuildEventLog, BuildEventLogError, create_build_event_log}; +use axum::{ + routing::{get, post, delete}, + Router, +}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; +use uuid::Uuid; + +pub mod handlers; + +#[derive(Clone)] +pub struct BuildGraphService { + pub event_log: Arc, + pub event_log_uri: String, + pub active_builds: Arc>>, + pub graph_label: String, + pub job_lookup_path: String, + pub candidate_jobs: HashMap, +} + +#[derive(Debug, Clone)] +pub struct BuildRequestState { + pub build_request_id: String, + pub status: BuildRequestStatus, + pub requested_partitions: Vec, + pub created_at: i64, + pub updated_at: i64, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct BuildRequest { + pub partitions: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct BuildRequestResponse { + pub build_request_id: String, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct BuildStatusResponse { + pub build_request_id: String, + pub status: String, + pub requested_partitions: Vec, + pub created_at: i64, + pub updated_at: i64, + pub events: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct BuildEventSummary { + pub event_id: String, + pub timestamp: i64, + pub event_type: String, + pub message: String, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct PartitionStatusResponse { + pub partition_ref: String, + pub status: String, + pub last_updated: Option, + pub build_requests: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct PartitionEventsResponse { + pub partition_ref: String, + pub events: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct AnalyzeRequest { + pub partitions: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct AnalyzeResponse { + pub job_graph: serde_json::Value, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ErrorResponse { + pub error: String, +} + +impl BuildGraphService { + pub async fn new( + event_log_uri: &str, + graph_label: String, + job_lookup_path: String, + candidate_jobs: HashMap, + ) -> Result { + let event_log = create_build_event_log(event_log_uri).await?; + + Ok(Self { + event_log: Arc::from(event_log), + event_log_uri: event_log_uri.to_string(), + active_builds: Arc::new(RwLock::new(HashMap::new())), + graph_label, + job_lookup_path, + candidate_jobs, + }) + } + + pub fn create_router(self) -> Router { + Router::new() + .route("/api/v1/builds", post(handlers::submit_build_request)) + .route("/api/v1/builds/:id", get(handlers::get_build_status)) + .route("/api/v1/builds/:id", delete(handlers::cancel_build_request)) + .route("/api/v1/partitions/:ref/status", get(handlers::get_partition_status)) + .route("/api/v1/partitions/:ref/events", get(handlers::get_partition_events)) + .route("/api/v1/analyze", post(handlers::analyze_build_graph)) + .with_state(Arc::new(self)) + } + + pub fn generate_build_request_id() -> String { + Uuid::new_v4().to_string() + } + + pub fn status_to_string(status: BuildRequestStatus) -> String { + match status { + BuildRequestStatus::BuildRequestUnknown => "unknown".to_string(), + BuildRequestStatus::BuildRequestReceived => "received".to_string(), + BuildRequestStatus::BuildRequestPlanning => "planning".to_string(), + BuildRequestStatus::BuildRequestExecuting => "executing".to_string(), + BuildRequestStatus::BuildRequestCompleted => "completed".to_string(), + BuildRequestStatus::BuildRequestFailed => "failed".to_string(), + BuildRequestStatus::BuildRequestCancelled => "cancelled".to_string(), + } + } + + pub fn partition_status_to_string(status: PartitionStatus) -> String { + match status { + PartitionStatus::PartitionUnknown => "unknown".to_string(), + PartitionStatus::PartitionRequested => "requested".to_string(), + PartitionStatus::PartitionScheduled => "scheduled".to_string(), + PartitionStatus::PartitionBuilding => "building".to_string(), + PartitionStatus::PartitionAvailable => "available".to_string(), + PartitionStatus::PartitionFailed => "failed".to_string(), + PartitionStatus::PartitionDelegated => "delegated".to_string(), + } + } +} + +pub type ServiceState = Arc; \ No newline at end of file