use crate::*; use crate::event_log::{BuildEventLogError, Result}; use crate::event_log::query_engine::BELQueryEngine; use crate::{JobDetailResponse, JobRunDetail as ServiceJobRunDetail}; use std::sync::Arc; use std::collections::HashMap; use serde::Serialize; /// Repository for querying job data from the build event log pub struct JobsRepository { query_engine: Arc, } /// Summary of a job's execution history and statistics #[derive(Debug, Clone, Serialize)] pub struct JobInfo { pub job_label: String, pub total_runs: usize, pub successful_runs: usize, pub failed_runs: usize, pub cancelled_runs: usize, pub last_run_timestamp: i64, pub last_run_status: JobStatus, pub average_partitions_per_run: f64, pub recent_builds: Vec, // Build request IDs that used this job } /// Detailed information about a specific job execution #[derive(Debug, Clone, Serialize)] pub struct JobRunDetail { pub job_run_id: String, pub job_label: String, pub build_request_id: String, pub target_partitions: Vec, pub status: JobStatus, pub scheduled_at: i64, pub started_at: Option, pub completed_at: Option, pub duration_ms: Option, pub message: String, pub config: Option, pub manifests: Vec, } impl JobsRepository { /// Create a new JobsRepository pub fn new(query_engine: Arc) -> Self { Self { query_engine } } /// List all jobs with their execution statistics /// /// Returns a summary of all jobs that have been executed, including /// success/failure statistics and recent activity. pub async fn list(&self, limit: Option) -> Result> { // Get all job events from the event log let events = self.query_engine.get_events_in_range(0, i64::MAX).await?; let mut job_data: HashMap> = HashMap::new(); // Collect all job events and group by job label for event in events { if let Some(build_event::EventType::JobEvent(j_event)) = &event.event_type { let job_label = j_event.job_label.as_ref() .map(|l| l.label.clone()) .unwrap_or_else(|| "unknown".to_string()); let status = match j_event.status_code { 1 => JobStatus::JobScheduled, 2 => JobStatus::JobRunning, 3 => JobStatus::JobCompleted, 4 => JobStatus::JobFailed, 5 => JobStatus::JobCancelled, 6 => JobStatus::JobSkipped, _ => JobStatus::JobUnknown, }; // Create or update job run detail let job_runs = job_data.entry(job_label.clone()).or_insert_with(Vec::new); // Find existing run or create new one if let Some(existing_run) = job_runs.iter_mut().find(|r| r.job_run_id == j_event.job_run_id) { // Update existing run with new status existing_run.status = status; existing_run.message = j_event.message.clone(); match status { JobStatus::JobRunning => { existing_run.started_at = Some(event.timestamp); } JobStatus::JobCompleted | JobStatus::JobFailed | JobStatus::JobCancelled => { existing_run.completed_at = Some(event.timestamp); if let Some(started) = existing_run.started_at { existing_run.duration_ms = Some((event.timestamp - started) / 1_000_000); // Convert to ms } existing_run.manifests = j_event.manifests.clone(); } _ => {} } } else { // Create new job run let job_run = JobRunDetail { job_run_id: j_event.job_run_id.clone(), job_label: job_label.clone(), build_request_id: event.build_request_id.clone(), target_partitions: j_event.target_partitions.clone(), status, scheduled_at: event.timestamp, started_at: if status == JobStatus::JobRunning { Some(event.timestamp) } else { None }, completed_at: None, duration_ms: None, message: j_event.message.clone(), config: j_event.config.clone(), manifests: j_event.manifests.clone(), }; job_runs.push(job_run); } } } // Convert to JobInfo structs with statistics let mut job_infos: Vec = job_data.into_iter() .map(|(job_label, job_runs)| { let total_runs = job_runs.len(); let successful_runs = job_runs.iter().filter(|r| r.status == JobStatus::JobCompleted).count(); let failed_runs = job_runs.iter().filter(|r| r.status == JobStatus::JobFailed).count(); let cancelled_runs = job_runs.iter().filter(|r| r.status == JobStatus::JobCancelled).count(); let (last_run_timestamp, last_run_status) = job_runs.iter() .max_by_key(|r| r.scheduled_at) .map(|r| (r.scheduled_at, r.status.clone())) .unwrap_or((0, JobStatus::JobUnknown)); let total_partitions: usize = job_runs.iter() .map(|r| r.target_partitions.len()) .sum(); let average_partitions_per_run = if total_runs > 0 { total_partitions as f64 / total_runs as f64 } else { 0.0 }; // Get recent unique build request IDs let mut recent_builds: Vec = job_runs.iter() .map(|r| r.build_request_id.clone()) .collect::>() .into_iter() .collect(); recent_builds.sort(); recent_builds.truncate(10); // Keep last 10 builds JobInfo { job_label, total_runs, successful_runs, failed_runs, cancelled_runs, last_run_timestamp, last_run_status, average_partitions_per_run, recent_builds, } }) .collect(); // Sort by last run timestamp (most recent first) job_infos.sort_by(|a, b| b.last_run_timestamp.cmp(&a.last_run_timestamp)); // Apply limit if specified if let Some(limit) = limit { job_infos.truncate(limit); } Ok(job_infos) } /// Show detailed information about a specific job /// /// Returns all execution runs for the specified job label, including /// detailed timing, status, and output information. pub async fn show(&self, job_label: &str) -> Result)>> { // Get all job events for this specific job let events = self.query_engine.get_events_in_range(0, i64::MAX).await?; let mut job_runs: Vec = Vec::new(); // Collect all job events for this job label for event in events { if let Some(build_event::EventType::JobEvent(j_event)) = &event.event_type { let event_job_label = j_event.job_label.as_ref() .map(|l| l.label.clone()) .unwrap_or_else(|| "unknown".to_string()); if event_job_label != job_label { continue; } let status = match j_event.status_code { 1 => JobStatus::JobScheduled, 2 => JobStatus::JobRunning, 3 => JobStatus::JobCompleted, 4 => JobStatus::JobFailed, 5 => JobStatus::JobCancelled, 6 => JobStatus::JobSkipped, _ => JobStatus::JobUnknown, }; // Find existing run or create new one if let Some(existing_run) = job_runs.iter_mut().find(|r| r.job_run_id == j_event.job_run_id) { // Update existing run with new status existing_run.status = status; existing_run.message = j_event.message.clone(); match status { JobStatus::JobRunning => { existing_run.started_at = Some(event.timestamp); } JobStatus::JobCompleted | JobStatus::JobFailed | JobStatus::JobCancelled => { existing_run.completed_at = Some(event.timestamp); if let Some(started) = existing_run.started_at { existing_run.duration_ms = Some((event.timestamp - started) / 1_000_000); // Convert to ms } existing_run.manifests = j_event.manifests.clone(); } _ => {} } } else { // Create new job run let job_run = JobRunDetail { job_run_id: j_event.job_run_id.clone(), job_label: job_label.to_string(), build_request_id: event.build_request_id.clone(), target_partitions: j_event.target_partitions.clone(), status, scheduled_at: event.timestamp, started_at: if status == JobStatus::JobRunning { Some(event.timestamp) } else { None }, completed_at: None, duration_ms: None, message: j_event.message.clone(), config: j_event.config.clone(), manifests: j_event.manifests.clone(), }; job_runs.push(job_run); } } } if job_runs.is_empty() { return Ok(None); } // Sort runs by scheduled time (most recent first) job_runs.sort_by(|a, b| b.scheduled_at.cmp(&a.scheduled_at)); // Calculate job statistics let total_runs = job_runs.len(); let successful_runs = job_runs.iter().filter(|r| r.status == JobStatus::JobCompleted).count(); let failed_runs = job_runs.iter().filter(|r| r.status == JobStatus::JobFailed).count(); let cancelled_runs = job_runs.iter().filter(|r| r.status == JobStatus::JobCancelled).count(); let (last_run_timestamp, last_run_status) = job_runs.iter() .max_by_key(|r| r.scheduled_at) .map(|r| (r.scheduled_at, r.status.clone())) .unwrap_or((0, JobStatus::JobUnknown)); let total_partitions: usize = job_runs.iter() .map(|r| r.target_partitions.len()) .sum(); let average_partitions_per_run = if total_runs > 0 { total_partitions as f64 / total_runs as f64 } else { 0.0 }; // Get recent unique build request IDs let mut recent_builds: Vec = job_runs.iter() .map(|r| r.build_request_id.clone()) .collect::>() .into_iter() .collect(); recent_builds.sort(); recent_builds.truncate(10); // Keep last 10 builds let job_info = JobInfo { job_label: job_label.to_string(), total_runs, successful_runs, failed_runs, cancelled_runs, last_run_timestamp, last_run_status, average_partitions_per_run, recent_builds, }; Ok(Some((job_info, job_runs))) } /// Show detailed information about a specific job using protobuf response format /// /// Returns the complete job details with dual status fields and run details. pub async fn show_protobuf(&self, job_label: &str) -> Result> { // Get job info and runs using existing show method if let Some((job_info, job_runs)) = self.show(job_label).await? { // Convert job runs to protobuf format let protobuf_runs: Vec = job_runs .into_iter() .map(|run| ServiceJobRunDetail { job_run_id: run.job_run_id, build_request_id: run.build_request_id, target_partitions: run.target_partitions, status_code: run.status as i32, status_name: run.status.to_display_string(), started_at: run.started_at, completed_at: run.completed_at, duration_ms: run.duration_ms, message: run.message, }) .collect(); let response = JobDetailResponse { job_label: job_info.job_label, total_runs: job_info.total_runs as u32, successful_runs: job_info.successful_runs as u32, failed_runs: job_info.failed_runs as u32, cancelled_runs: job_info.cancelled_runs as u32, average_partitions_per_run: job_info.average_partitions_per_run, last_run_timestamp: job_info.last_run_timestamp, last_run_status_code: job_info.last_run_status as i32, last_run_status_name: job_info.last_run_status.to_display_string(), recent_builds: job_info.recent_builds, runs: protobuf_runs, }; Ok(Some(response)) } else { Ok(None) } } /// List jobs using protobuf response format with dual status fields /// /// Returns JobsListResponse protobuf message with JobSummary objects containing /// last_run_status_code and last_run_status_name fields. pub async fn list_protobuf(&self, request: JobsListRequest) -> Result { // Get job info using existing list method let jobs = self.list(request.limit.map(|l| l as usize)).await?; // Convert to protobuf format let protobuf_jobs: Vec = jobs .into_iter() .map(|job| crate::JobSummary { job_label: job.job_label, total_runs: job.total_runs as u32, successful_runs: job.successful_runs as u32, failed_runs: job.failed_runs as u32, cancelled_runs: job.cancelled_runs as u32, average_partitions_per_run: job.average_partitions_per_run, last_run_timestamp: job.last_run_timestamp, last_run_status_code: job.last_run_status as i32, last_run_status_name: job.last_run_status.to_display_string(), recent_builds: job.recent_builds, }) .collect(); let total_count = protobuf_jobs.len() as u32; Ok(JobsListResponse { jobs: protobuf_jobs, total_count, }) } } #[cfg(test)] mod tests { use super::*; use crate::event_log::mock::{create_mock_bel_query_engine, create_mock_bel_query_engine_with_events, test_events}; #[tokio::test] async fn test_jobs_repository_list_empty() { let query_engine = create_mock_bel_query_engine().await.unwrap(); let repo = JobsRepository::new(query_engine); let jobs = repo.list(None).await.unwrap(); assert!(jobs.is_empty()); } #[tokio::test] async fn test_jobs_repository_list_with_data() { let build_id = "test-build-123".to_string(); let job_label1 = JobLabel { label: "//:process_data".to_string() }; let job_label2 = JobLabel { label: "//:generate_reports".to_string() }; let partition1 = PartitionRef { str: "data/users".to_string() }; let partition2 = PartitionRef { str: "reports/summary".to_string() }; // Create events for multiple jobs let events = vec![ test_events::job_event(Some(build_id.clone()), Some("job-run-1".to_string()), job_label1.clone(), vec![partition1.clone()], JobStatus::JobScheduled), test_events::job_event(Some(build_id.clone()), Some("job-run-1".to_string()), job_label1.clone(), vec![partition1.clone()], JobStatus::JobCompleted), test_events::job_event(Some(build_id.clone()), Some("job-run-2".to_string()), job_label2.clone(), vec![partition2.clone()], JobStatus::JobScheduled), test_events::job_event(Some(build_id.clone()), Some("job-run-2".to_string()), job_label2.clone(), vec![partition2.clone()], JobStatus::JobFailed), ]; let query_engine = create_mock_bel_query_engine_with_events(events).await.unwrap(); let repo = JobsRepository::new(query_engine); let jobs = repo.list(None).await.unwrap(); assert_eq!(jobs.len(), 2); // Find jobs by label let process_job = jobs.iter().find(|j| j.job_label == "//:process_data").unwrap(); let reports_job = jobs.iter().find(|j| j.job_label == "//:generate_reports").unwrap(); assert_eq!(process_job.total_runs, 1); assert_eq!(process_job.successful_runs, 1); assert_eq!(process_job.failed_runs, 0); assert_eq!(process_job.last_run_status, JobStatus::JobCompleted); assert_eq!(reports_job.total_runs, 1); assert_eq!(reports_job.successful_runs, 0); assert_eq!(reports_job.failed_runs, 1); assert_eq!(reports_job.last_run_status, JobStatus::JobFailed); } #[tokio::test] async fn test_jobs_repository_show() { let build_id = "test-build-456".to_string(); let job_label = JobLabel { label: "//:analytics_job".to_string() }; let partition = PartitionRef { str: "analytics/daily".to_string() }; let events = vec![ test_events::job_event(Some(build_id.clone()), Some("job-run-123".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobScheduled), test_events::job_event(Some(build_id.clone()), Some("job-run-123".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobRunning), test_events::job_event(Some(build_id.clone()), Some("job-run-123".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobCompleted), ]; let query_engine = create_mock_bel_query_engine_with_events(events).await.unwrap(); let repo = JobsRepository::new(query_engine); let result = repo.show(&job_label.label).await.unwrap(); assert!(result.is_some()); let (info, runs) = result.unwrap(); assert_eq!(info.job_label, "//:analytics_job"); assert_eq!(info.total_runs, 1); assert_eq!(info.successful_runs, 1); assert_eq!(info.last_run_status, JobStatus::JobCompleted); assert_eq!(runs.len(), 1); let run = &runs[0]; assert_eq!(run.job_run_id, "job-run-123"); assert_eq!(run.status, JobStatus::JobCompleted); assert_eq!(run.target_partitions.len(), 1); assert_eq!(run.target_partitions[0].str, "analytics/daily"); } #[tokio::test] async fn test_jobs_repository_show_nonexistent() { let query_engine = create_mock_bel_query_engine().await.unwrap(); let repo = JobsRepository::new(query_engine); let result = repo.show("//:nonexistent_job").await.unwrap(); assert!(result.is_none()); } #[tokio::test] async fn test_jobs_repository_statistics() { let build_id = "test-build-789".to_string(); let job_label = JobLabel { label: "//:batch_processor".to_string() }; let partition = PartitionRef { str: "batch/data".to_string() }; // Create multiple runs with different outcomes let events = vec![ // First run - successful test_events::job_event(Some(build_id.clone()), Some("run-1".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobScheduled), test_events::job_event(Some(build_id.clone()), Some("run-1".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobCompleted), // Second run - failed test_events::job_event(Some(build_id.clone()), Some("run-2".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobScheduled), test_events::job_event(Some(build_id.clone()), Some("run-2".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobFailed), // Third run - cancelled test_events::job_event(Some(build_id.clone()), Some("run-3".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobScheduled), test_events::job_event(Some(build_id.clone()), Some("run-3".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobCancelled), ]; let query_engine = create_mock_bel_query_engine_with_events(events).await.unwrap(); let repo = JobsRepository::new(query_engine); let result = repo.show(&job_label.label).await.unwrap(); assert!(result.is_some()); let (info, _runs) = result.unwrap(); assert_eq!(info.total_runs, 3); assert_eq!(info.successful_runs, 1); assert_eq!(info.failed_runs, 1); assert_eq!(info.cancelled_runs, 1); assert_eq!(info.average_partitions_per_run, 1.0); } }