databuild/databuild/orchestration/mod.rs
2025-08-20 23:34:37 -07:00

261 lines
No EOL
8.7 KiB
Rust

use crate::*;
use crate::event_log::{writer::EventWriter, query_engine::BELQueryEngine};
use log::info;
use std::sync::Arc;
pub mod error;
pub mod events;
pub use error::{OrchestrationError, Result};
/// Result of a build execution
#[derive(Debug, Clone)]
pub enum BuildResult {
Success { jobs_completed: usize },
Failed { jobs_completed: usize, jobs_failed: usize },
FailFast { trigger_job: String },
}
/// Core orchestrator for managing build lifecycle and event emission
pub struct BuildOrchestrator {
event_writer: EventWriter,
build_request_id: String,
requested_partitions: Vec<PartitionRef>,
}
impl BuildOrchestrator {
/// Create a new build orchestrator
pub fn new(
query_engine: Arc<BELQueryEngine>,
build_request_id: String,
requested_partitions: Vec<PartitionRef>,
) -> Self {
Self {
event_writer: EventWriter::new(query_engine),
build_request_id,
requested_partitions,
}
}
/// Get the build request ID
pub fn build_request_id(&self) -> &str {
&self.build_request_id
}
/// Get the requested partitions
pub fn requested_partitions(&self) -> &[PartitionRef] {
&self.requested_partitions
}
/// Emit build request received event and start the build lifecycle
pub async fn start_build(&self) -> Result<()> {
info!("Starting build for request: {}", self.build_request_id);
self.event_writer.request_build(
self.build_request_id.clone(),
self.requested_partitions.clone(),
).await
.map_err(OrchestrationError::EventLog)?;
Ok(())
}
/// Emit build planning started event
pub async fn start_planning(&self) -> Result<()> {
info!("Starting build planning for request: {}", self.build_request_id);
self.event_writer.update_build_status(
self.build_request_id.clone(),
BuildRequestStatusCode::BuildRequestPlanning.status(),
"Starting build planning".to_string(),
).await
.map_err(OrchestrationError::EventLog)?;
Ok(())
}
/// Emit build execution started event
pub async fn start_execution(&self) -> Result<()> {
info!("Starting build execution for request: {}", self.build_request_id);
self.event_writer.update_build_status(
self.build_request_id.clone(),
BuildRequestStatusCode::BuildRequestExecuting.status(),
"Starting build execution".to_string(),
).await
.map_err(OrchestrationError::EventLog)?;
Ok(())
}
/// Emit build completion event
pub async fn complete_build(&self, result: BuildResult) -> Result<()> {
info!("Completing build for request: {} with result: {:?}",
self.build_request_id, result);
let (status, message) = match &result {
BuildResult::Success { jobs_completed } => {
(BuildRequestStatusCode::BuildRequestCompleted,
format!("Build completed successfully with {} jobs", jobs_completed))
}
BuildResult::Failed { jobs_completed, jobs_failed } => {
(BuildRequestStatusCode::BuildRequestFailed,
format!("Build failed: {} jobs completed, {} jobs failed", jobs_completed, jobs_failed))
}
BuildResult::FailFast { trigger_job } => {
(BuildRequestStatusCode::BuildRequestFailed,
format!("Build failed fast due to job: {}", trigger_job))
}
};
self.event_writer.update_build_status(
self.build_request_id.clone(),
status.status(),
message,
).await
.map_err(OrchestrationError::EventLog)?;
Ok(())
}
/// Emit analysis completed event
pub async fn emit_analysis_completed(&self, task_count: usize) -> Result<()> {
self.event_writer.update_build_status_with_partitions(
self.build_request_id.clone(),
BuildRequestStatusCode::BuildRequestAnalysisCompleted.status(),
self.requested_partitions.clone(),
format!("Analysis completed successfully, {} tasks planned", task_count),
).await
.map_err(OrchestrationError::EventLog)?;
Ok(())
}
/// Emit job scheduled event
pub async fn emit_job_scheduled(&self, job: &JobEvent) -> Result<()> {
let event = events::create_job_scheduled_event(
self.build_request_id.clone(),
job,
);
self.event_writer.append_event(event).await
.map_err(OrchestrationError::EventLog)?;
Ok(())
}
/// Emit job completed event
pub async fn emit_job_completed(&self, job: &JobEvent) -> Result<()> {
let event = events::create_job_completed_event(
self.build_request_id.clone(),
job,
);
self.event_writer.append_event(event).await
.map_err(OrchestrationError::EventLog)?;
Ok(())
}
/// Emit partition available event
pub async fn emit_partition_available(&self, partition: &PartitionEvent) -> Result<()> {
let event = events::create_partition_available_event(
self.build_request_id.clone(),
partition,
);
self.event_writer.append_event(event).await
.map_err(OrchestrationError::EventLog)?;
Ok(())
}
/// Emit delegation event
pub async fn emit_delegation(
&self,
partition_ref: &str,
target_build: &str,
message: &str,
) -> Result<()> {
let partition = PartitionRef { str: partition_ref.to_string() };
self.event_writer.record_delegation(
self.build_request_id.clone(),
partition,
target_build.to_string(),
message.to_string(),
).await
.map_err(OrchestrationError::EventLog)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_build_lifecycle_events() {
// Use mock BEL query engine for testing
let query_engine = crate::event_log::mock::create_mock_bel_query_engine().await.unwrap();
let partitions = vec![PartitionRef { str: "test/partition".to_string() }];
let orchestrator = BuildOrchestrator::new(
query_engine,
"test-build-123".to_string(),
partitions.clone(),
);
// Test full build lifecycle
orchestrator.start_build().await.unwrap();
orchestrator.start_planning().await.unwrap();
orchestrator.start_execution().await.unwrap();
orchestrator.complete_build(BuildResult::Success { jobs_completed: 5 }).await.unwrap();
// Note: Since we're using the real BELQueryEngine with mock storage,
// we can't easily inspect emitted events in this test without significant refactoring.
// The test verifies that the orchestration methods complete without errors,
// which exercises the event emission code paths.
// TODO: If we need to verify specific events, we could:
// 1. Query the mock storage through the query engine
// 2. Create a specialized test storage that captures events
// 3. Use the existing MockBuildEventLog test pattern with dependency injection
}
#[tokio::test]
async fn test_partition_and_job_events() {
// Use mock BEL query engine for testing
let query_engine = crate::event_log::mock::create_mock_bel_query_engine().await.unwrap();
let orchestrator = BuildOrchestrator::new(
query_engine,
"test-build-456".to_string(),
vec![],
);
// Test analysis completed event
orchestrator.emit_analysis_completed(3).await.unwrap();
// Test job event
let partition = PartitionRef { str: "data/users".to_string() };
let job_event = JobEvent {
job_run_id: "job-run-123".to_string(),
job_label: Some(JobLabel { label: "//:test_job".to_string() }),
target_partitions: vec![partition.clone()],
status_code: JobStatus::JobScheduled as i32,
status_name: JobStatus::JobScheduled.to_display_string(),
message: "Job scheduled".to_string(),
config: None,
manifests: vec![],
};
orchestrator.emit_job_scheduled(&job_event).await.unwrap();
// Note: Same testing limitation as above.
// We verify that the methods complete successfully without panicking.
}
}