1693 lines
No EOL
64 KiB
Rust
1693 lines
No EOL
64 KiB
Rust
use super::*;
|
|
use crate::event_log::{current_timestamp_nanos, create_build_event};
|
|
use crate::orchestration::{BuildOrchestrator, BuildResult};
|
|
use crate::mermaid_utils;
|
|
use axum::{
|
|
extract::{Path, State},
|
|
http::StatusCode,
|
|
};
|
|
use axum_jsonschema::Json;
|
|
use log::{error, info};
|
|
use serde::Deserialize;
|
|
use schemars::JsonSchema;
|
|
use std::process::Command;
|
|
use std::env;
|
|
|
|
// Simple base64 URL-safe decoding function for job labels
|
|
fn base64_url_decode(encoded: &str) -> Result<String, Box<dyn std::error::Error>> {
|
|
|
|
// Convert URL-safe base64 back to regular base64
|
|
let mut padded = encoded.replace('-', "+").replace('_', "/");
|
|
|
|
// Add padding if needed
|
|
match padded.len() % 4 {
|
|
2 => padded.push_str("=="),
|
|
3 => padded.push_str("="),
|
|
_ => {}
|
|
}
|
|
|
|
// Manual base64 decoding (simplified)
|
|
let alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
|
|
let mut result = Vec::new();
|
|
let mut buffer = 0u32;
|
|
let mut bits = 0;
|
|
|
|
for c in padded.chars() {
|
|
if c == '=' { break; }
|
|
|
|
if let Some(index) = alphabet.find(c) {
|
|
buffer = (buffer << 6) | (index as u32);
|
|
bits += 6;
|
|
|
|
if bits >= 8 {
|
|
result.push(((buffer >> (bits - 8)) & 0xFF) as u8);
|
|
bits -= 8;
|
|
}
|
|
}
|
|
}
|
|
|
|
String::from_utf8(result).map_err(|e| e.into())
|
|
}
|
|
|
|
pub async fn submit_build_request(
|
|
State(service): State<ServiceState>,
|
|
Json(request): Json<BuildRequest>,
|
|
) -> Result<Json<BuildRequestResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
let build_request_id = BuildGraphService::generate_build_request_id();
|
|
let timestamp = current_timestamp_nanos();
|
|
|
|
info!("Received build request {} for partitions: {:?}", build_request_id, request.partitions);
|
|
|
|
// Create build request state
|
|
let build_state = BuildRequestState {
|
|
build_request_id: build_request_id.clone(),
|
|
status: BuildRequestStatus::BuildRequestReceived,
|
|
requested_partitions: request.partitions.clone(),
|
|
created_at: timestamp,
|
|
updated_at: timestamp,
|
|
};
|
|
|
|
// Store in active builds
|
|
{
|
|
let mut active_builds = service.active_builds.write().await;
|
|
active_builds.insert(build_request_id.clone(), build_state);
|
|
}
|
|
|
|
// Create orchestrator and emit build request received event
|
|
let requested_partitions: Vec<PartitionRef> = request.partitions.iter()
|
|
.map(|p| PartitionRef { str: p.clone() })
|
|
.collect();
|
|
|
|
let orchestrator = BuildOrchestrator::new(
|
|
service.event_log.clone(),
|
|
build_request_id.clone(),
|
|
requested_partitions,
|
|
);
|
|
|
|
if let Err(e) = orchestrator.start_build().await {
|
|
error!("Failed to log build request received event: {}", e);
|
|
return Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to log build request: {}", e),
|
|
}),
|
|
));
|
|
}
|
|
|
|
// Start build execution in background
|
|
let service_clone = service.clone();
|
|
let build_request_id_clone = build_request_id.clone();
|
|
let partitions_clone = request.partitions.clone();
|
|
|
|
tokio::spawn(async move {
|
|
if let Err(e) = execute_build_request(
|
|
service_clone,
|
|
build_request_id_clone,
|
|
partitions_clone,
|
|
).await {
|
|
error!("Build request execution failed: {}", e);
|
|
}
|
|
});
|
|
|
|
Ok(Json(BuildRequestResponse { build_request_id }))
|
|
}
|
|
|
|
#[derive(Deserialize, JsonSchema)]
|
|
pub struct BuildStatusRequest {
|
|
pub build_request_id: String,
|
|
}
|
|
|
|
pub async fn get_build_status(
|
|
State(service): State<ServiceState>,
|
|
Path(BuildStatusRequest { build_request_id }): Path<BuildStatusRequest>,
|
|
) -> Result<Json<BuildStatusResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
// Get events for this build request from the event log (source of truth)
|
|
let events = match service.event_log.get_build_request_events(&build_request_id, None).await {
|
|
Ok(events) => events,
|
|
Err(e) => {
|
|
error!("Failed to get build request events: {}", e);
|
|
return Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to query build request events: {}", e),
|
|
}),
|
|
));
|
|
}
|
|
};
|
|
|
|
info!("Build request {}: Found {} events", build_request_id, events.len());
|
|
|
|
// Check if build request exists by looking for any events
|
|
if events.is_empty() {
|
|
return Err((
|
|
StatusCode::NOT_FOUND,
|
|
Json(ErrorResponse {
|
|
error: "Build request not found".to_string(),
|
|
}),
|
|
));
|
|
}
|
|
|
|
// Reconstruct build state from events - fail if no valid build request events found
|
|
let mut status: Option<BuildRequestStatus> = None;
|
|
let mut requested_partitions = Vec::new();
|
|
let mut created_at = 0i64;
|
|
let mut updated_at = 0i64;
|
|
let mut partitions_set = false;
|
|
|
|
// Sort events by timestamp to process in chronological order
|
|
let mut sorted_events = events.clone();
|
|
sorted_events.sort_by_key(|e| e.timestamp);
|
|
|
|
for event in &sorted_events {
|
|
if event.timestamp > updated_at {
|
|
updated_at = event.timestamp;
|
|
}
|
|
if created_at == 0 || event.timestamp < created_at {
|
|
created_at = event.timestamp;
|
|
}
|
|
|
|
// Extract information from build request events
|
|
if let Some(crate::build_event::EventType::BuildRequestEvent(req_event)) = &event.event_type {
|
|
info!("Processing BuildRequestEvent: status={}, message='{}'", req_event.status_code, req_event.message);
|
|
|
|
// Update status with the latest event - convert from i32 to enum
|
|
status = Some(match req_event.status_code {
|
|
0 => BuildRequestStatus::BuildRequestUnknown, // Default protobuf value - should not happen in production
|
|
1 => BuildRequestStatus::BuildRequestReceived,
|
|
2 => BuildRequestStatus::BuildRequestPlanning,
|
|
7 => BuildRequestStatus::BuildRequestAnalysisCompleted,
|
|
3 => BuildRequestStatus::BuildRequestExecuting,
|
|
4 => BuildRequestStatus::BuildRequestCompleted,
|
|
5 => BuildRequestStatus::BuildRequestFailed,
|
|
6 => BuildRequestStatus::BuildRequestCancelled,
|
|
unknown_status => {
|
|
error!("Invalid BuildRequestStatus value: {} in build request {}", unknown_status, build_request_id);
|
|
return Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Invalid build request status value: {}", unknown_status),
|
|
}),
|
|
));
|
|
},
|
|
});
|
|
|
|
// Use partitions from the first event that has them (typically the "received" event)
|
|
if !partitions_set && !req_event.requested_partitions.is_empty() {
|
|
info!("Setting requested partitions from event: {:?}", req_event.requested_partitions);
|
|
requested_partitions = req_event.requested_partitions.iter()
|
|
.map(|p| p.str.clone())
|
|
.collect();
|
|
partitions_set = true;
|
|
}
|
|
} else {
|
|
info!("Event is not a BuildRequestEvent: {:?}", event.event_type.as_ref().map(|t| std::mem::discriminant(t)));
|
|
}
|
|
}
|
|
|
|
// Ensure we found at least one valid BuildRequestEvent
|
|
let final_status = status.ok_or_else(|| {
|
|
error!("No valid BuildRequestEvent found in {} events for build request {}", events.len(), build_request_id);
|
|
(
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("No valid build request events found - data corruption detected"),
|
|
}),
|
|
)
|
|
})?;
|
|
|
|
// Clone events for later use in mermaid generation
|
|
let events_for_mermaid = events.clone();
|
|
|
|
// Convert events to summary format for response
|
|
let event_summaries: Vec<BuildEventSummary> = events.into_iter().map(|e| {
|
|
let (job_label, partition_ref, delegated_build_id) = extract_navigation_data(&e.event_type);
|
|
BuildEventSummary {
|
|
event_id: e.event_id,
|
|
timestamp: e.timestamp,
|
|
event_type: event_type_to_string(&e.event_type),
|
|
message: event_to_message(&e.event_type),
|
|
build_request_id: e.build_request_id,
|
|
job_label,
|
|
partition_ref,
|
|
delegated_build_id,
|
|
}
|
|
}).collect();
|
|
|
|
let final_status_string = BuildGraphService::status_to_string(final_status);
|
|
info!("Build request {}: Final status={}, partitions={:?}", build_request_id, final_status_string, requested_partitions);
|
|
|
|
// Extract the job graph from events (find the most recent JobGraphEvent)
|
|
let (job_graph_json, mermaid_diagram) = {
|
|
let mut job_graph: Option<JobGraph> = None;
|
|
|
|
// Find the most recent JobGraphEvent in the events
|
|
for event in &events_for_mermaid {
|
|
if let Some(crate::build_event::EventType::JobGraphEvent(graph_event)) = &event.event_type {
|
|
if let Some(ref graph) = graph_event.job_graph {
|
|
job_graph = Some(graph.clone());
|
|
}
|
|
}
|
|
}
|
|
|
|
if let Some(ref graph) = job_graph {
|
|
// Convert job graph to JSON
|
|
let graph_json = match serde_json::to_value(graph) {
|
|
Ok(json) => Some(json),
|
|
Err(e) => {
|
|
error!("Failed to serialize job graph: {}", e);
|
|
None
|
|
}
|
|
};
|
|
|
|
// Generate mermaid diagram with current status
|
|
let mermaid = mermaid_utils::generate_mermaid_with_status(graph, &events_for_mermaid);
|
|
|
|
(graph_json, Some(mermaid))
|
|
} else {
|
|
(None, None)
|
|
}
|
|
};
|
|
|
|
Ok(Json(BuildStatusResponse {
|
|
build_request_id,
|
|
status: final_status_string,
|
|
requested_partitions,
|
|
created_at: created_at,
|
|
updated_at: updated_at,
|
|
events: event_summaries,
|
|
job_graph: job_graph_json,
|
|
mermaid_diagram,
|
|
}))
|
|
}
|
|
|
|
#[derive(Deserialize, JsonSchema)]
|
|
pub struct CancelBuildRequest {
|
|
pub build_request_id: String,
|
|
}
|
|
|
|
pub async fn cancel_build_request(
|
|
State(service): State<ServiceState>,
|
|
Path(CancelBuildRequest { build_request_id }): Path<CancelBuildRequest>,
|
|
) -> Result<Json<BuildCancelResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
// Update build request state
|
|
{
|
|
let mut active_builds = service.active_builds.write().await;
|
|
if let Some(build_state) = active_builds.get_mut(&build_request_id) {
|
|
build_state.status = BuildRequestStatus::BuildRequestCancelled;
|
|
build_state.updated_at = current_timestamp_nanos();
|
|
} else {
|
|
return Err((
|
|
StatusCode::NOT_FOUND,
|
|
Json(ErrorResponse {
|
|
error: "Build request not found".to_string(),
|
|
}),
|
|
));
|
|
}
|
|
}
|
|
|
|
// Log cancellation event
|
|
let event = create_build_event(
|
|
build_request_id.clone(),
|
|
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
|
|
status_code: BuildRequestStatus::BuildRequestCancelled as i32,
|
|
status_name: BuildRequestStatus::BuildRequestCancelled.to_display_string(),
|
|
requested_partitions: vec![],
|
|
message: "Build request cancelled".to_string(),
|
|
}),
|
|
);
|
|
|
|
if let Err(e) = service.event_log.append_event(event).await {
|
|
error!("Failed to log build request cancelled event: {}", e);
|
|
}
|
|
|
|
info!("Build request {} cancelled", build_request_id);
|
|
|
|
Ok(Json(BuildCancelResponse {
|
|
cancelled: true,
|
|
build_request_id,
|
|
}))
|
|
}
|
|
|
|
#[derive(Deserialize, JsonSchema)]
|
|
pub struct PartitionStatusRequest {
|
|
pub partition_ref: String,
|
|
}
|
|
|
|
pub async fn get_partition_status(
|
|
State(service): State<ServiceState>,
|
|
Path(PartitionStatusRequest { partition_ref }): Path<PartitionStatusRequest>,
|
|
) -> Result<Json<PartitionStatusResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
// Get latest partition status
|
|
let (status, last_updated) = match service.event_log.get_latest_partition_status(&partition_ref).await {
|
|
Ok(Some((status, timestamp))) => (status, Some(timestamp)),
|
|
Ok(None) => {
|
|
// No partition events found - this is a legitimate 404
|
|
return Err((
|
|
StatusCode::NOT_FOUND,
|
|
Json(ErrorResponse {
|
|
error: format!("Partition not found: {}", partition_ref),
|
|
}),
|
|
));
|
|
},
|
|
Err(e) => {
|
|
error!("Failed to get partition status: {}", e);
|
|
return Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to get partition status: {}", e),
|
|
}),
|
|
));
|
|
}
|
|
};
|
|
|
|
// Get active builds for this partition
|
|
let build_requests = match service.event_log.get_active_builds_for_partition(&partition_ref).await {
|
|
Ok(builds) => builds,
|
|
Err(e) => {
|
|
error!("Failed to get active builds for partition: {}", e);
|
|
return Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to get active builds for partition: {}", e),
|
|
}),
|
|
));
|
|
}
|
|
};
|
|
|
|
Ok(Json(PartitionStatusResponse {
|
|
partition_ref,
|
|
status: BuildGraphService::partition_status_to_string(status),
|
|
last_updated,
|
|
build_requests,
|
|
}))
|
|
}
|
|
|
|
#[derive(Deserialize, JsonSchema)]
|
|
pub struct PartitionEventsRequest {
|
|
pub partition_ref: String,
|
|
}
|
|
|
|
pub async fn get_partition_events(
|
|
State(service): State<ServiceState>,
|
|
Path(PartitionEventsRequest { partition_ref }): Path<PartitionEventsRequest>,
|
|
) -> Result<Json<PartitionEventsResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
let events = match service.event_log.get_partition_events(&partition_ref, None).await {
|
|
Ok(events) => events.into_iter().map(|e| {
|
|
let (job_label, partition_ref, delegated_build_id) = extract_navigation_data(&e.event_type);
|
|
BuildEventSummary {
|
|
event_id: e.event_id,
|
|
timestamp: e.timestamp,
|
|
event_type: event_type_to_string(&e.event_type),
|
|
message: event_to_message(&e.event_type),
|
|
build_request_id: e.build_request_id,
|
|
job_label,
|
|
partition_ref,
|
|
delegated_build_id,
|
|
}
|
|
}).collect(),
|
|
Err(e) => {
|
|
error!("Failed to get partition events: {}", e);
|
|
return Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to get partition events: {}", e),
|
|
}),
|
|
));
|
|
}
|
|
};
|
|
|
|
Ok(Json(PartitionEventsResponse {
|
|
partition_ref,
|
|
events,
|
|
}))
|
|
}
|
|
|
|
pub async fn analyze_build_graph(
|
|
State(service): State<ServiceState>,
|
|
Json(request): Json<AnalyzeRequest>,
|
|
) -> Result<Json<AnalyzeResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
// Call the analyze command (use temporary ID for analyze-only requests)
|
|
let temp_build_request_id = BuildGraphService::generate_build_request_id();
|
|
let analyze_result = run_analyze_command(&service, &temp_build_request_id, &request.partitions).await;
|
|
|
|
match analyze_result {
|
|
Ok(job_graph) => {
|
|
let job_graph_json = match serde_json::to_value(&job_graph) {
|
|
Ok(json) => json,
|
|
Err(e) => {
|
|
error!("Failed to serialize job graph: {}", e);
|
|
return Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to serialize job graph: {}", e),
|
|
}),
|
|
));
|
|
}
|
|
};
|
|
|
|
Ok(Json(AnalyzeResponse {
|
|
job_graph: job_graph_json,
|
|
}))
|
|
}
|
|
Err(e) => {
|
|
error!("Failed to analyze build graph: {}", e);
|
|
Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to analyze build graph: {}", e),
|
|
}),
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn execute_build_request(
|
|
service: ServiceState,
|
|
build_request_id: String,
|
|
partitions: Vec<String>,
|
|
) -> Result<(), String> {
|
|
info!("Starting build execution for request {}", build_request_id);
|
|
|
|
// Create orchestrator for this build request
|
|
let requested_partitions: Vec<PartitionRef> = partitions.iter()
|
|
.map(|p| PartitionRef { str: p.clone() })
|
|
.collect();
|
|
|
|
let orchestrator = BuildOrchestrator::new(
|
|
service.event_log.clone(),
|
|
build_request_id.clone(),
|
|
requested_partitions,
|
|
);
|
|
|
|
// Update status to planning
|
|
update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestPlanning).await;
|
|
|
|
// Log planning event
|
|
if let Err(e) = orchestrator.start_planning().await {
|
|
error!("Failed to log planning event: {}", e);
|
|
}
|
|
|
|
// Analyze the build graph
|
|
let job_graph = match run_analyze_command(&service, &build_request_id, &partitions).await {
|
|
Ok(graph) => graph,
|
|
Err(e) => {
|
|
error!("Failed to analyze build graph: {}", e);
|
|
update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestFailed).await;
|
|
|
|
// Log failure event
|
|
if let Err(log_err) = orchestrator.complete_build(BuildResult::Failed { jobs_completed: 0, jobs_failed: 1 }).await {
|
|
error!("Failed to log failure event: {}", log_err);
|
|
}
|
|
|
|
return Err(e);
|
|
}
|
|
};
|
|
|
|
|
|
// Update status to executing
|
|
update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestExecuting).await;
|
|
|
|
// Log executing event
|
|
if let Err(e) = orchestrator.start_execution().await {
|
|
error!("Failed to log executing event: {}", e);
|
|
}
|
|
|
|
// Execute the build graph
|
|
match run_execute_command(&service, &build_request_id, &job_graph).await {
|
|
Ok(_) => {
|
|
info!("Build request {} completed successfully", build_request_id);
|
|
update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestCompleted).await;
|
|
|
|
// Log completion event
|
|
if let Err(e) = orchestrator.complete_build(BuildResult::Success { jobs_completed: 0 }).await {
|
|
error!("Failed to log completion event: {}", e);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
Err(e) => {
|
|
error!("Build request {} failed: {}", build_request_id, e);
|
|
update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestFailed).await;
|
|
|
|
// Log failure event
|
|
if let Err(log_err) = orchestrator.complete_build(BuildResult::Failed { jobs_completed: 0, jobs_failed: 1 }).await {
|
|
error!("Failed to log failure event: {}", log_err);
|
|
}
|
|
|
|
Err(e)
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn update_build_request_status(
|
|
service: &ServiceState,
|
|
build_request_id: &str,
|
|
status: BuildRequestStatus,
|
|
) {
|
|
let mut active_builds = service.active_builds.write().await;
|
|
if let Some(build_state) = active_builds.get_mut(build_request_id) {
|
|
build_state.status = status;
|
|
build_state.updated_at = current_timestamp_nanos();
|
|
}
|
|
}
|
|
|
|
async fn run_analyze_command(
|
|
service: &ServiceState,
|
|
build_request_id: &str,
|
|
partitions: &[String],
|
|
) -> Result<JobGraph, String> {
|
|
// Run analyze command
|
|
let analyze_binary = env::var("DATABUILD_ANALYZE_BINARY")
|
|
.unwrap_or_else(|_| "databuild_analyze".to_string());
|
|
|
|
let output = Command::new(&analyze_binary)
|
|
.args(partitions)
|
|
.env("DATABUILD_JOB_LOOKUP_PATH", &service.job_lookup_path)
|
|
.env("DATABUILD_CANDIDATE_JOBS", serde_json::to_string(&service.candidate_jobs).unwrap())
|
|
.env("DATABUILD_BUILD_EVENT_LOG", &service.event_log_uri)
|
|
.env("DATABUILD_BUILD_REQUEST_ID", build_request_id)
|
|
.output()
|
|
.map_err(|e| format!("Failed to execute analyze command: {}", e))?;
|
|
|
|
if !output.status.success() {
|
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
|
return Err(format!("Analyze command failed: {}", stderr));
|
|
}
|
|
|
|
let stdout = String::from_utf8_lossy(&output.stdout);
|
|
let job_graph: JobGraph = serde_json::from_str(&stdout)
|
|
.map_err(|e| format!("Failed to parse analyze result: {}", e))?;
|
|
|
|
Ok(job_graph)
|
|
}
|
|
|
|
async fn run_execute_command(
|
|
service: &ServiceState,
|
|
build_request_id: &str,
|
|
job_graph: &JobGraph,
|
|
) -> Result<(), String> {
|
|
// Serialize job graph
|
|
let job_graph_json = serde_json::to_string(job_graph)
|
|
.map_err(|e| format!("Failed to serialize job graph: {}", e))?;
|
|
|
|
// Run execute command
|
|
let execute_binary = env::var("DATABUILD_EXECUTE_BINARY")
|
|
.unwrap_or_else(|_| "databuild_execute".to_string());
|
|
|
|
let mut child = Command::new(&execute_binary)
|
|
.env("DATABUILD_JOB_LOOKUP_PATH", &service.job_lookup_path)
|
|
.env("DATABUILD_CANDIDATE_JOBS", serde_json::to_string(&service.candidate_jobs).unwrap())
|
|
.env("DATABUILD_BUILD_EVENT_LOG", &service.event_log_uri)
|
|
.env("DATABUILD_BUILD_REQUEST_ID", build_request_id)
|
|
.stdin(std::process::Stdio::piped())
|
|
.stdout(std::process::Stdio::piped())
|
|
.stderr(std::process::Stdio::piped())
|
|
.spawn()
|
|
.map_err(|e| format!("Failed to spawn execute command: {}", e))?;
|
|
|
|
// Write job graph to stdin
|
|
if let Some(stdin) = child.stdin.take() {
|
|
use std::io::Write;
|
|
let mut stdin = stdin;
|
|
stdin.write_all(job_graph_json.as_bytes())
|
|
.map_err(|e| format!("Failed to write job graph to stdin: {}", e))?;
|
|
}
|
|
|
|
// Wait for completion
|
|
let output = child.wait_with_output()
|
|
.map_err(|e| format!("Failed to wait for execute command: {}", e))?;
|
|
|
|
if !output.status.success() {
|
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
|
return Err(format!("Execute command failed: {}", stderr));
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn event_type_to_string(event_type: &Option<crate::build_event::EventType>) -> String {
|
|
match event_type {
|
|
Some(crate::build_event::EventType::BuildRequestEvent(_)) => "build_request".to_string(),
|
|
Some(crate::build_event::EventType::PartitionEvent(_)) => "partition".to_string(),
|
|
Some(crate::build_event::EventType::JobEvent(_)) => "job".to_string(),
|
|
Some(crate::build_event::EventType::DelegationEvent(_)) => "delegation".to_string(),
|
|
Some(crate::build_event::EventType::JobGraphEvent(_)) => "job_graph".to_string(),
|
|
Some(crate::build_event::EventType::PartitionInvalidationEvent(_)) => "partition_invalidation".to_string(),
|
|
Some(crate::build_event::EventType::TaskCancelEvent(_)) => "task_cancel".to_string(),
|
|
Some(crate::build_event::EventType::BuildCancelEvent(_)) => "build_cancel".to_string(),
|
|
None => "INVALID_EVENT_TYPE".to_string(), // Make this obvious rather than hiding it
|
|
}
|
|
}
|
|
|
|
fn event_to_message(event_type: &Option<crate::build_event::EventType>) -> String {
|
|
match event_type {
|
|
Some(crate::build_event::EventType::BuildRequestEvent(event)) => event.message.clone(),
|
|
Some(crate::build_event::EventType::PartitionEvent(event)) => event.message.clone(),
|
|
Some(crate::build_event::EventType::JobEvent(event)) => event.message.clone(),
|
|
Some(crate::build_event::EventType::DelegationEvent(event)) => event.message.clone(),
|
|
Some(crate::build_event::EventType::JobGraphEvent(event)) => event.message.clone(),
|
|
Some(crate::build_event::EventType::PartitionInvalidationEvent(event)) => event.reason.clone(),
|
|
Some(crate::build_event::EventType::TaskCancelEvent(event)) => event.reason.clone(),
|
|
Some(crate::build_event::EventType::BuildCancelEvent(event)) => event.reason.clone(),
|
|
None => "INVALID_EVENT_NO_MESSAGE".to_string(), // Make this obvious
|
|
}
|
|
}
|
|
|
|
fn extract_navigation_data(event_type: &Option<crate::build_event::EventType>) -> (Option<String>, Option<String>, Option<String>) {
|
|
match event_type {
|
|
Some(crate::build_event::EventType::JobEvent(event)) => {
|
|
let job_label = event.job_label.as_ref().map(|l| l.label.clone());
|
|
(job_label, None, None)
|
|
},
|
|
Some(crate::build_event::EventType::PartitionEvent(event)) => {
|
|
let partition_ref = event.partition_ref.as_ref().map(|r| r.str.clone());
|
|
(None, partition_ref, None)
|
|
},
|
|
Some(crate::build_event::EventType::DelegationEvent(event)) => {
|
|
let delegated_build_id = Some(event.delegated_to_build_request_id.clone());
|
|
(None, None, delegated_build_id)
|
|
},
|
|
Some(crate::build_event::EventType::BuildRequestEvent(_)) => {
|
|
// Build request events don't need navigation links (self-referential)
|
|
(None, None, None)
|
|
},
|
|
Some(crate::build_event::EventType::JobGraphEvent(_)) => {
|
|
// Job graph events don't need navigation links
|
|
(None, None, None)
|
|
},
|
|
Some(crate::build_event::EventType::PartitionInvalidationEvent(event)) => {
|
|
let partition_ref = event.partition_ref.as_ref().map(|r| r.str.clone());
|
|
(None, partition_ref, None)
|
|
},
|
|
Some(crate::build_event::EventType::TaskCancelEvent(_event)) => {
|
|
// Task cancel events reference job run IDs, which we could potentially navigate to
|
|
(None, None, None)
|
|
},
|
|
Some(crate::build_event::EventType::BuildCancelEvent(_)) => {
|
|
// Build cancel events don't need navigation links
|
|
(None, None, None)
|
|
},
|
|
None => (None, None, None),
|
|
}
|
|
}
|
|
|
|
// New handlers for list endpoints
|
|
use axum::extract::Query;
|
|
use std::collections::HashMap;
|
|
|
|
pub async fn list_build_requests(
|
|
State(service): State<ServiceState>,
|
|
Query(params): Query<HashMap<String, String>>,
|
|
) -> Result<Json<BuildsListResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
let limit = params.get("limit")
|
|
.and_then(|s| s.parse::<u32>().ok())
|
|
.unwrap_or(20)
|
|
.min(100); // Cap at 100
|
|
|
|
let offset = params.get("offset")
|
|
.and_then(|s| s.parse::<u32>().ok())
|
|
.unwrap_or(0);
|
|
|
|
let status_filter = params.get("status")
|
|
.and_then(|s| match s.as_str() {
|
|
"received" => Some(BuildRequestStatus::BuildRequestReceived),
|
|
"planning" => Some(BuildRequestStatus::BuildRequestPlanning),
|
|
"executing" => Some(BuildRequestStatus::BuildRequestExecuting),
|
|
"completed" => Some(BuildRequestStatus::BuildRequestCompleted),
|
|
"failed" => Some(BuildRequestStatus::BuildRequestFailed),
|
|
"cancelled" => Some(BuildRequestStatus::BuildRequestCancelled),
|
|
_ => None,
|
|
});
|
|
|
|
match service.event_log.list_build_requests(limit, offset, status_filter).await {
|
|
Ok((summaries, total_count)) => {
|
|
let builds: Vec<BuildSummary> = summaries.into_iter().map(|s| BuildSummary {
|
|
build_request_id: s.build_request_id,
|
|
status: format!("{:?}", s.status),
|
|
requested_partitions: s.requested_partitions,
|
|
created_at: s.created_at,
|
|
updated_at: s.updated_at,
|
|
}).collect();
|
|
|
|
let has_more = (offset + limit) < total_count;
|
|
|
|
Ok(Json(BuildsListResponse {
|
|
builds,
|
|
total_count,
|
|
has_more,
|
|
}))
|
|
}
|
|
Err(e) => {
|
|
error!("Failed to list build requests: {}", e);
|
|
Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to list build requests: {}", e),
|
|
}),
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn list_partitions(
|
|
State(service): State<ServiceState>,
|
|
Query(params): Query<HashMap<String, String>>,
|
|
) -> Result<Json<PartitionsListResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
let limit = params.get("limit")
|
|
.and_then(|s| s.parse::<u32>().ok())
|
|
.unwrap_or(20)
|
|
.min(100); // Cap at 100
|
|
|
|
let offset = params.get("offset")
|
|
.and_then(|s| s.parse::<u32>().ok())
|
|
.unwrap_or(0);
|
|
|
|
let status_filter = params.get("status")
|
|
.and_then(|s| match s.as_str() {
|
|
"requested" => Some(PartitionStatus::PartitionRequested),
|
|
"analyzed" => Some(PartitionStatus::PartitionAnalyzed),
|
|
"building" => Some(PartitionStatus::PartitionBuilding),
|
|
"available" => Some(PartitionStatus::PartitionAvailable),
|
|
"failed" => Some(PartitionStatus::PartitionFailed),
|
|
"delegated" => Some(PartitionStatus::PartitionDelegated),
|
|
_ => None,
|
|
});
|
|
|
|
match service.event_log.list_recent_partitions(limit, offset, status_filter).await {
|
|
Ok((summaries, total_count)) => {
|
|
let partitions: Vec<PartitionSummary> = summaries.into_iter().map(|s| PartitionSummary {
|
|
partition_ref: s.partition_ref,
|
|
status: format!("{:?}", s.status),
|
|
updated_at: s.updated_at,
|
|
build_request_id: s.build_request_id,
|
|
}).collect();
|
|
|
|
let has_more = (offset + limit) < total_count;
|
|
|
|
Ok(Json(PartitionsListResponse {
|
|
partitions,
|
|
total_count,
|
|
has_more,
|
|
}))
|
|
}
|
|
Err(e) => {
|
|
error!("Failed to list partitions: {}", e);
|
|
Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to list partitions: {}", e),
|
|
}),
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
// New unified protobuf-based handler for future migration
|
|
pub async fn list_partitions_unified(
|
|
State(service): State<ServiceState>,
|
|
Query(params): Query<HashMap<String, String>>,
|
|
) -> Result<Json<crate::PartitionsListResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
let limit = params.get("limit")
|
|
.and_then(|s| s.parse::<u32>().ok())
|
|
.unwrap_or(20)
|
|
.min(100); // Cap at 100
|
|
|
|
let offset = params.get("offset")
|
|
.and_then(|s| s.parse::<u32>().ok())
|
|
.unwrap_or(0);
|
|
|
|
let status_filter = params.get("status")
|
|
.and_then(|s| crate::PartitionStatus::from_display_string(s));
|
|
|
|
// Use repository with protobuf response format
|
|
let repository = crate::repositories::partitions::PartitionsRepository::new(service.event_log.clone());
|
|
|
|
let request = crate::PartitionsListRequest {
|
|
limit: Some(limit),
|
|
offset: Some(offset),
|
|
status_filter: status_filter.map(|s| s.to_display_string()),
|
|
};
|
|
|
|
match repository.list_protobuf(request).await {
|
|
Ok(response) => {
|
|
Ok(Json(response))
|
|
}
|
|
Err(e) => {
|
|
error!("Failed to list partitions: {}", e);
|
|
Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to list partitions: {}", e),
|
|
}),
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn get_activity_summary(
|
|
State(service): State<ServiceState>,
|
|
) -> Result<Json<ActivityResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
match service.event_log.get_activity_summary().await {
|
|
Ok(summary) => {
|
|
let recent_builds: Vec<BuildSummary> = summary.recent_builds.into_iter().map(|s| BuildSummary {
|
|
build_request_id: s.build_request_id,
|
|
status: format!("{:?}", s.status),
|
|
requested_partitions: s.requested_partitions,
|
|
created_at: s.created_at,
|
|
updated_at: s.updated_at,
|
|
}).collect();
|
|
|
|
let recent_partitions: Vec<PartitionSummary> = summary.recent_partitions.into_iter().map(|s| PartitionSummary {
|
|
partition_ref: s.partition_ref,
|
|
status: format!("{:?}", s.status),
|
|
updated_at: s.updated_at,
|
|
build_request_id: s.build_request_id,
|
|
}).collect();
|
|
|
|
// Simple system status logic
|
|
let system_status = if summary.active_builds_count > 10 {
|
|
"degraded".to_string()
|
|
} else {
|
|
"healthy".to_string()
|
|
};
|
|
|
|
Ok(Json(ActivityResponse {
|
|
active_builds_count: summary.active_builds_count,
|
|
recent_builds,
|
|
recent_partitions,
|
|
total_partitions_count: summary.total_partitions_count,
|
|
system_status,
|
|
graph_name: service.graph_label.clone(),
|
|
}))
|
|
}
|
|
Err(e) => {
|
|
error!("Failed to get activity summary: {}", e);
|
|
Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to get activity summary: {}", e),
|
|
}),
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Deserialize, JsonSchema)]
|
|
pub struct JobMetricsRequest {
|
|
pub label: String,
|
|
}
|
|
|
|
pub async fn list_jobs(
|
|
State(service): State<ServiceState>,
|
|
Query(params): Query<HashMap<String, String>>,
|
|
) -> Result<Json<JobsListResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
let search_term = params.get("search").map(|s| s.to_lowercase());
|
|
|
|
// Debug: Let's see what's actually in the database
|
|
let debug_query = "
|
|
SELECT
|
|
je.job_label,
|
|
je.status,
|
|
COUNT(*) as count_for_this_status
|
|
FROM job_events je
|
|
JOIN build_events be ON je.event_id = be.event_id
|
|
WHERE je.job_label != ''
|
|
GROUP BY je.job_label, je.status
|
|
ORDER BY je.job_label, je.status";
|
|
|
|
// Log the debug results first
|
|
if let Ok(debug_result) = service.event_log.execute_query(debug_query).await {
|
|
for row in &debug_result.rows {
|
|
if row.len() >= 3 {
|
|
log::info!("Debug: job_label={}, status={}, count={}", row[0], row[1], row[2]);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Original query but let's see all statuses
|
|
let query = "
|
|
WITH job_durations AS (
|
|
SELECT
|
|
je.job_label,
|
|
be.build_request_id,
|
|
(MAX(be.timestamp) - MIN(be.timestamp)) / 1000000 as duration_ms
|
|
FROM job_events je
|
|
JOIN build_events be ON je.event_id = be.event_id
|
|
GROUP BY je.job_label, be.build_request_id
|
|
HAVING MAX(CASE WHEN je.status IN ('3', '4', '5', '6') THEN 1 ELSE 0 END) = 1
|
|
)
|
|
SELECT
|
|
je.job_label,
|
|
COUNT(CASE WHEN je.status IN ('3', '6') THEN 1 END) as completed_count,
|
|
COUNT(CASE WHEN je.status IN ('4', '5') THEN 1 END) as failed_count,
|
|
COUNT(CASE WHEN je.status IN ('3', '4', '5', '6') THEN 1 END) as total_count,
|
|
COALESCE(AVG(jd.duration_ms), 0) as avg_duration_ms,
|
|
MAX(be.timestamp) as last_run,
|
|
GROUP_CONCAT(DISTINCT je.status) as all_statuses
|
|
FROM job_events je
|
|
JOIN build_events be ON je.event_id = be.event_id
|
|
LEFT JOIN job_durations jd ON je.job_label = jd.job_label
|
|
WHERE je.job_label != ''
|
|
GROUP BY je.job_label
|
|
ORDER BY last_run DESC";
|
|
|
|
match service.event_log.execute_query(query).await {
|
|
Ok(result) => {
|
|
let mut jobs = Vec::new();
|
|
|
|
for row in result.rows {
|
|
if row.len() >= 7 {
|
|
let job_label = &row[0];
|
|
|
|
// Apply search filter if provided
|
|
if let Some(ref search) = search_term {
|
|
if !job_label.to_lowercase().contains(search) {
|
|
continue;
|
|
}
|
|
}
|
|
|
|
let completed_count: u32 = row[1].parse().unwrap_or(0);
|
|
let failed_count: u32 = row[2].parse().unwrap_or(0);
|
|
let total_count: u32 = row[3].parse().unwrap_or(0);
|
|
let avg_duration_ms: Option<i64> = row[4].parse::<f64>().ok().map(|f| f as i64);
|
|
let last_run: Option<i64> = row[5].parse().ok();
|
|
let all_statuses = &row[6];
|
|
|
|
// Log additional debug info
|
|
log::info!("Job: {}, completed: {}, failed: {}, total: {}, statuses: {}",
|
|
job_label, completed_count, failed_count, total_count, all_statuses);
|
|
|
|
let success_rate = if total_count > 0 {
|
|
completed_count as f64 / total_count as f64
|
|
} else {
|
|
0.0
|
|
};
|
|
|
|
jobs.push(JobSummary {
|
|
job_label: job_label.clone(),
|
|
success_rate,
|
|
avg_duration_ms,
|
|
recent_runs: total_count.min(50), // Limit to recent runs
|
|
last_run,
|
|
});
|
|
}
|
|
}
|
|
|
|
let total_count = jobs.len() as u32;
|
|
|
|
Ok(Json(JobsListResponse {
|
|
jobs,
|
|
total_count,
|
|
}))
|
|
}
|
|
Err(e) => {
|
|
error!("Failed to list jobs: {}", e);
|
|
Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to list jobs: {}", e),
|
|
}),
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn get_job_metrics(
|
|
State(service): State<ServiceState>,
|
|
Path(JobMetricsRequest { label }): Path<JobMetricsRequest>,
|
|
) -> Result<Json<JobMetricsResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
// Decode the base64-encoded job label
|
|
let decoded_label = match base64_url_decode(&label) {
|
|
Ok(decoded) => decoded,
|
|
Err(_) => {
|
|
return Err((
|
|
StatusCode::BAD_REQUEST,
|
|
Json(ErrorResponse {
|
|
error: "Invalid job label encoding".to_string(),
|
|
}),
|
|
));
|
|
}
|
|
};
|
|
|
|
log::info!("get_job_metrics: encoded='{}', decoded='{}'", label, decoded_label);
|
|
|
|
// Get overall job metrics
|
|
let metrics_query = "
|
|
WITH job_run_durations AS (
|
|
SELECT
|
|
be.build_request_id,
|
|
(MAX(be.timestamp) - MIN(be.timestamp)) / 1000000 as duration_ms
|
|
FROM job_events je
|
|
JOIN build_events be ON je.event_id = be.event_id
|
|
WHERE je.job_label = ?
|
|
GROUP BY be.build_request_id
|
|
HAVING MAX(CASE WHEN je.status IN ('3', '4', '5', '6') THEN 1 ELSE 0 END) = 1
|
|
)
|
|
SELECT
|
|
COUNT(CASE WHEN je.status IN ('3', '6') THEN 1 END) as completed_count,
|
|
COUNT(CASE WHEN je.status IN ('3', '4', '5', '6') THEN 1 END) as total_count,
|
|
COALESCE(AVG(jrd.duration_ms), 0) as avg_duration_ms
|
|
FROM job_events je
|
|
JOIN build_events be ON je.event_id = be.event_id
|
|
LEFT JOIN job_run_durations jrd ON be.build_request_id = jrd.build_request_id
|
|
WHERE je.job_label = ?";
|
|
|
|
let (success_rate, total_runs, avg_duration_ms) = match service.event_log.execute_query(&metrics_query.replace("?", &format!("'{}'", decoded_label)).replace("?", &format!("'{}'", decoded_label))).await {
|
|
Ok(result) if !result.rows.is_empty() => {
|
|
let row = &result.rows[0];
|
|
let completed_count: u32 = row[0].parse().unwrap_or(0);
|
|
let total_count: u32 = row[1].parse().unwrap_or(0);
|
|
let avg_duration: Option<i64> = row[2].parse::<f64>().ok().map(|f| f as i64);
|
|
|
|
let success_rate = if total_count > 0 {
|
|
completed_count as f64 / total_count as f64
|
|
} else {
|
|
0.0
|
|
};
|
|
|
|
(success_rate, total_count, avg_duration)
|
|
}
|
|
_ => (0.0, 0, None),
|
|
};
|
|
|
|
// Get recent runs - consolidated by build request to show final status per job run
|
|
let recent_runs_query = "
|
|
SELECT
|
|
be.build_request_id,
|
|
je.target_partitions,
|
|
je.status,
|
|
MIN(be.timestamp) as started_at,
|
|
MAX(be.timestamp) as completed_at
|
|
FROM job_events je
|
|
JOIN build_events be ON je.event_id = be.event_id
|
|
WHERE je.job_label = ?
|
|
GROUP BY be.build_request_id, je.target_partitions
|
|
HAVING je.status = (
|
|
SELECT je2.status
|
|
FROM job_events je2
|
|
JOIN build_events be2 ON je2.event_id = be2.event_id
|
|
WHERE je2.job_label = ?
|
|
AND be2.build_request_id = be.build_request_id
|
|
ORDER BY be2.timestamp DESC
|
|
LIMIT 1
|
|
)
|
|
ORDER BY started_at DESC
|
|
LIMIT 50";
|
|
|
|
let recent_runs = match service.event_log.execute_query(&recent_runs_query.replace("?", &format!("'{}'", decoded_label)).replace("?", &format!("'{}'", decoded_label))).await {
|
|
Ok(result) => {
|
|
result.rows.into_iter().map(|row| {
|
|
let build_request_id = row[0].clone();
|
|
let partitions_json: String = row[1].clone();
|
|
let status_code: String = row[2].clone();
|
|
let started_at: i64 = row[3].parse().unwrap_or(0);
|
|
let completed_at: i64 = row[4].parse().unwrap_or(started_at);
|
|
let duration_ms: Option<i64> = if completed_at > started_at {
|
|
Some(completed_at - started_at)
|
|
} else {
|
|
None
|
|
};
|
|
|
|
let partitions: Vec<String> = serde_json::from_str::<Vec<serde_json::Value>>(&partitions_json)
|
|
.unwrap_or_default()
|
|
.into_iter()
|
|
.filter_map(|v| {
|
|
v.get("str").and_then(|s| s.as_str()).map(|s| s.to_string())
|
|
})
|
|
.collect();
|
|
|
|
let status = match status_code.as_str() {
|
|
"1" => "scheduled",
|
|
"2" => "running",
|
|
"3" => "completed",
|
|
"4" => "failed",
|
|
"5" => "cancelled",
|
|
"6" => "skipped",
|
|
_ => "unknown",
|
|
};
|
|
|
|
JobRunSummary {
|
|
build_request_id,
|
|
partitions,
|
|
status: status.to_string(),
|
|
duration_ms,
|
|
started_at,
|
|
}
|
|
}).collect()
|
|
}
|
|
Err(_) => Vec::new(),
|
|
};
|
|
|
|
// Get daily stats (simplified - just recent days)
|
|
let daily_stats_query = "
|
|
WITH daily_job_durations AS (
|
|
SELECT
|
|
date(be.timestamp/1000000000, 'unixepoch') as date,
|
|
be.build_request_id,
|
|
(MAX(be.timestamp) - MIN(be.timestamp)) / 1000000 as duration_ms
|
|
FROM job_events je
|
|
JOIN build_events be ON je.event_id = be.event_id
|
|
WHERE je.job_label = ?
|
|
AND be.timestamp > (strftime('%s', 'now', '-30 days') * 1000000000)
|
|
GROUP BY date(be.timestamp/1000000000, 'unixepoch'), be.build_request_id
|
|
HAVING MAX(CASE WHEN je.status IN ('3', '4', '5', '6') THEN 1 ELSE 0 END) = 1
|
|
)
|
|
SELECT
|
|
date(be.timestamp/1000000000, 'unixepoch') as date,
|
|
COUNT(CASE WHEN je.status IN ('3', '6') THEN 1 END) as completed_count,
|
|
COUNT(CASE WHEN je.status IN ('3', '4', '5', '6') THEN 1 END) as total_count,
|
|
COALESCE(AVG(djd.duration_ms), 0) as avg_duration_ms
|
|
FROM job_events je
|
|
JOIN build_events be ON je.event_id = be.event_id
|
|
LEFT JOIN daily_job_durations djd ON date(be.timestamp/1000000000, 'unixepoch') = djd.date
|
|
WHERE je.job_label = ?
|
|
AND be.timestamp > (strftime('%s', 'now', '-30 days') * 1000000000)
|
|
GROUP BY date(be.timestamp/1000000000, 'unixepoch')
|
|
ORDER BY date DESC";
|
|
|
|
let daily_stats = match service.event_log.execute_query(&daily_stats_query.replace("?", &format!("'{}'", decoded_label)).replace("?", &format!("'{}'", decoded_label))).await {
|
|
Ok(result) => {
|
|
result.rows.into_iter().map(|row| {
|
|
let date = row[0].clone();
|
|
let completed_count: u32 = row[1].parse().unwrap_or(0);
|
|
let total_count: u32 = row[2].parse().unwrap_or(0);
|
|
let avg_duration: Option<i64> = row[3].parse::<f64>().ok().map(|f| f as i64);
|
|
|
|
let success_rate = if total_count > 0 {
|
|
completed_count as f64 / total_count as f64
|
|
} else {
|
|
0.0
|
|
};
|
|
|
|
JobDailyStats {
|
|
date,
|
|
success_rate,
|
|
avg_duration_ms: avg_duration,
|
|
total_runs: total_count,
|
|
}
|
|
}).collect()
|
|
}
|
|
Err(_) => Vec::new(),
|
|
};
|
|
|
|
Ok(Json(JobMetricsResponse {
|
|
job_label: decoded_label,
|
|
success_rate,
|
|
avg_duration_ms,
|
|
total_runs,
|
|
recent_runs,
|
|
daily_stats,
|
|
}))
|
|
}
|
|
|
|
// Repository-based handlers for the new shared core functionality
|
|
use crate::repositories::{
|
|
partitions::PartitionsRepository,
|
|
jobs::JobsRepository,
|
|
tasks::TasksRepository,
|
|
builds::BuildsRepository,
|
|
};
|
|
|
|
/// Request for partition detail endpoint
|
|
#[derive(Deserialize, JsonSchema)]
|
|
pub struct PartitionDetailRequest {
|
|
pub partition_ref: String,
|
|
}
|
|
|
|
/// Get detailed partition information with timeline
|
|
pub async fn get_partition_detail(
|
|
State(service): State<ServiceState>,
|
|
Path(PartitionDetailRequest { partition_ref }): Path<PartitionDetailRequest>,
|
|
) -> Result<Json<PartitionDetailResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
let repository = PartitionsRepository::new(service.event_log.clone());
|
|
|
|
match repository.show_protobuf(&partition_ref).await {
|
|
Ok(Some(protobuf_response)) => {
|
|
let timeline_events: Vec<PartitionTimelineEvent> = protobuf_response.timeline.into_iter().map(|event| {
|
|
PartitionTimelineEvent {
|
|
timestamp: event.timestamp,
|
|
status_code: event.status_code,
|
|
status_name: event.status_name,
|
|
message: event.message,
|
|
build_request_id: event.build_request_id,
|
|
job_run_id: event.job_run_id,
|
|
}
|
|
}).collect();
|
|
|
|
Ok(Json(PartitionDetailResponse {
|
|
partition_ref: protobuf_response.partition_ref,
|
|
status_code: protobuf_response.status_code,
|
|
status_name: protobuf_response.status_name,
|
|
last_updated: protobuf_response.last_updated,
|
|
builds_count: protobuf_response.builds_count,
|
|
last_successful_build: protobuf_response.last_successful_build,
|
|
invalidation_count: protobuf_response.invalidation_count,
|
|
timeline: timeline_events,
|
|
}))
|
|
}
|
|
Ok(None) => Err((
|
|
StatusCode::NOT_FOUND,
|
|
Json(ErrorResponse {
|
|
error: format!("Partition '{}' not found", partition_ref),
|
|
}),
|
|
)),
|
|
Err(e) => {
|
|
error!("Failed to get partition detail: {}", e);
|
|
Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to get partition detail: {}", e),
|
|
}),
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Invalidate a partition
|
|
#[derive(Deserialize, JsonSchema)]
|
|
pub struct InvalidatePartitionRequest {
|
|
pub reason: String,
|
|
pub build_request_id: String,
|
|
}
|
|
|
|
/// Request for partition invalidation endpoint path
|
|
#[derive(Deserialize, JsonSchema)]
|
|
pub struct PartitionInvalidatePathRequest {
|
|
pub partition_ref: String,
|
|
}
|
|
|
|
pub async fn invalidate_partition(
|
|
State(service): State<ServiceState>,
|
|
Path(PartitionInvalidatePathRequest { partition_ref }): Path<PartitionInvalidatePathRequest>,
|
|
Json(request): Json<InvalidatePartitionRequest>,
|
|
) -> Result<Json<PartitionInvalidateResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
let repository = PartitionsRepository::new(service.event_log.clone());
|
|
|
|
match repository.invalidate(&partition_ref, request.reason.clone(), request.build_request_id).await {
|
|
Ok(()) => Ok(Json(PartitionInvalidateResponse {
|
|
invalidated: true,
|
|
partition_ref,
|
|
reason: request.reason,
|
|
})),
|
|
Err(e) => {
|
|
error!("Failed to invalidate partition: {}", e);
|
|
Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to invalidate partition: {}", e),
|
|
}),
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// List jobs using repository
|
|
pub async fn list_jobs_repository(
|
|
State(service): State<ServiceState>,
|
|
Query(params): Query<HashMap<String, String>>,
|
|
) -> Result<Json<JobsRepositoryListResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
let repository = JobsRepository::new(service.event_log.clone());
|
|
let limit = params.get("limit").and_then(|s| s.parse().ok());
|
|
|
|
match repository.list(limit).await {
|
|
Ok(jobs) => {
|
|
let job_summaries: Vec<JobRepositorySummary> = jobs.into_iter().map(|job| {
|
|
JobRepositorySummary {
|
|
job_label: job.job_label,
|
|
total_runs: job.total_runs,
|
|
successful_runs: job.successful_runs,
|
|
failed_runs: job.failed_runs,
|
|
cancelled_runs: job.cancelled_runs,
|
|
average_partitions_per_run: job.average_partitions_per_run,
|
|
last_run_timestamp: job.last_run_timestamp,
|
|
last_run_status: format!("{:?}", job.last_run_status),
|
|
recent_builds: job.recent_builds,
|
|
}
|
|
}).collect();
|
|
|
|
let total_count = job_summaries.len() as u32;
|
|
Ok(Json(JobsRepositoryListResponse {
|
|
jobs: job_summaries,
|
|
total_count,
|
|
}))
|
|
}
|
|
Err(e) => {
|
|
error!("Failed to list jobs: {}", e);
|
|
Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to list jobs: {}", e),
|
|
}),
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Request for job detail endpoint
|
|
#[derive(Deserialize, JsonSchema)]
|
|
pub struct JobDetailRequest {
|
|
pub label: String,
|
|
}
|
|
|
|
/// Get detailed job information
|
|
pub async fn get_job_detail(
|
|
State(service): State<ServiceState>,
|
|
Path(JobDetailRequest { label }): Path<JobDetailRequest>,
|
|
) -> Result<Json<JobDetailResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
let job_label = label;
|
|
let repository = JobsRepository::new(service.event_log.clone());
|
|
|
|
match repository.show_protobuf(&job_label).await {
|
|
Ok(Some(protobuf_response)) => {
|
|
let run_summaries: Vec<JobRunDetail> = protobuf_response.runs.into_iter().map(|run| {
|
|
JobRunDetail {
|
|
job_run_id: run.job_run_id,
|
|
build_request_id: run.build_request_id,
|
|
target_partitions: run.target_partitions,
|
|
status_code: run.status_code,
|
|
status_name: run.status_name,
|
|
started_at: run.started_at,
|
|
completed_at: run.completed_at,
|
|
duration_ms: run.duration_ms,
|
|
message: run.message,
|
|
}
|
|
}).collect();
|
|
|
|
Ok(Json(JobDetailResponse {
|
|
job_label: protobuf_response.job_label,
|
|
total_runs: protobuf_response.total_runs,
|
|
successful_runs: protobuf_response.successful_runs,
|
|
failed_runs: protobuf_response.failed_runs,
|
|
cancelled_runs: protobuf_response.cancelled_runs,
|
|
average_partitions_per_run: protobuf_response.average_partitions_per_run,
|
|
last_run_timestamp: protobuf_response.last_run_timestamp,
|
|
last_run_status_code: protobuf_response.last_run_status_code,
|
|
last_run_status_name: protobuf_response.last_run_status_name,
|
|
recent_builds: protobuf_response.recent_builds,
|
|
runs: run_summaries,
|
|
}))
|
|
}
|
|
Ok(None) => Err((
|
|
StatusCode::NOT_FOUND,
|
|
Json(ErrorResponse {
|
|
error: format!("Job '{}' not found", job_label),
|
|
}),
|
|
)),
|
|
Err(e) => {
|
|
error!("Failed to get job detail: {}", e);
|
|
Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to get job detail: {}", e),
|
|
}),
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// List tasks using repository
|
|
pub async fn list_tasks(
|
|
State(service): State<ServiceState>,
|
|
Query(params): Query<HashMap<String, String>>,
|
|
) -> Result<Json<TasksListResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
let repository = TasksRepository::new(service.event_log.clone());
|
|
let limit = params.get("limit").and_then(|s| s.parse().ok());
|
|
|
|
match repository.list(limit).await {
|
|
Ok(tasks) => {
|
|
let task_summaries: Vec<TaskSummary> = tasks.into_iter().map(|task| {
|
|
TaskSummary {
|
|
job_run_id: task.job_run_id,
|
|
job_label: task.job_label,
|
|
build_request_id: task.build_request_id,
|
|
status: format!("{:?}", task.status),
|
|
target_partitions: task.target_partitions.into_iter().map(|p| p.str).collect(),
|
|
scheduled_at: task.scheduled_at,
|
|
started_at: task.started_at,
|
|
completed_at: task.completed_at,
|
|
duration_ms: task.duration_ms,
|
|
cancelled: task.cancelled,
|
|
message: task.message,
|
|
}
|
|
}).collect();
|
|
|
|
let total_count = task_summaries.len() as u32;
|
|
Ok(Json(TasksListResponse {
|
|
tasks: task_summaries,
|
|
total_count,
|
|
}))
|
|
}
|
|
Err(e) => {
|
|
error!("Failed to list tasks: {}", e);
|
|
Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to list tasks: {}", e),
|
|
}),
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Request for task detail endpoint
|
|
#[derive(Deserialize, JsonSchema)]
|
|
pub struct TaskDetailRequest {
|
|
pub job_run_id: String,
|
|
}
|
|
|
|
/// Get detailed task information
|
|
pub async fn get_task_detail(
|
|
State(service): State<ServiceState>,
|
|
Path(TaskDetailRequest { job_run_id }): Path<TaskDetailRequest>,
|
|
) -> Result<Json<TaskDetailResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
let repository = TasksRepository::new(service.event_log.clone());
|
|
|
|
match repository.show_protobuf(&job_run_id).await {
|
|
Ok(Some(protobuf_response)) => {
|
|
let timeline_events: Vec<TaskTimelineEvent> = protobuf_response.timeline.into_iter().map(|event| {
|
|
TaskTimelineEvent {
|
|
timestamp: event.timestamp,
|
|
status_code: event.status_code,
|
|
status_name: event.status_name,
|
|
message: event.message,
|
|
event_type: event.event_type,
|
|
cancel_reason: event.cancel_reason,
|
|
}
|
|
}).collect();
|
|
|
|
Ok(Json(TaskDetailResponse {
|
|
job_run_id: protobuf_response.job_run_id,
|
|
job_label: protobuf_response.job_label,
|
|
build_request_id: protobuf_response.build_request_id,
|
|
status_code: protobuf_response.status_code,
|
|
status_name: protobuf_response.status_name,
|
|
target_partitions: protobuf_response.target_partitions,
|
|
scheduled_at: protobuf_response.scheduled_at,
|
|
started_at: protobuf_response.started_at,
|
|
completed_at: protobuf_response.completed_at,
|
|
duration_ms: protobuf_response.duration_ms,
|
|
cancelled: protobuf_response.cancelled,
|
|
cancel_reason: protobuf_response.cancel_reason,
|
|
message: protobuf_response.message,
|
|
timeline: timeline_events,
|
|
}))
|
|
}
|
|
Ok(None) => Err((
|
|
StatusCode::NOT_FOUND,
|
|
Json(ErrorResponse {
|
|
error: format!("Task '{}' not found", job_run_id),
|
|
}),
|
|
)),
|
|
Err(e) => {
|
|
error!("Failed to get task detail: {}", e);
|
|
Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to get task detail: {}", e),
|
|
}),
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Cancel a task
|
|
#[derive(Deserialize, JsonSchema)]
|
|
pub struct CancelTaskRequest {
|
|
pub reason: String,
|
|
pub build_request_id: String,
|
|
}
|
|
|
|
/// Request for task cancel endpoint path
|
|
#[derive(Deserialize, JsonSchema)]
|
|
pub struct TaskCancelPathRequest {
|
|
pub job_run_id: String,
|
|
}
|
|
|
|
pub async fn cancel_task(
|
|
State(service): State<ServiceState>,
|
|
Path(TaskCancelPathRequest { job_run_id }): Path<TaskCancelPathRequest>,
|
|
Json(request): Json<CancelTaskRequest>,
|
|
) -> Result<Json<TaskCancelResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
let repository = TasksRepository::new(service.event_log.clone());
|
|
|
|
match repository.cancel(&job_run_id, request.reason.clone(), request.build_request_id).await {
|
|
Ok(()) => Ok(Json(TaskCancelResponse {
|
|
cancelled: true,
|
|
job_run_id,
|
|
reason: request.reason,
|
|
})),
|
|
Err(e) => {
|
|
error!("Failed to cancel task: {}", e);
|
|
Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to cancel task: {}", e),
|
|
}),
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// List builds using repository
|
|
pub async fn list_builds_repository(
|
|
State(service): State<ServiceState>,
|
|
Query(params): Query<HashMap<String, String>>,
|
|
) -> Result<Json<BuildsRepositoryListResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
let repository = BuildsRepository::new(service.event_log.clone());
|
|
let limit = params.get("limit").and_then(|s| s.parse().ok());
|
|
|
|
match repository.list(limit).await {
|
|
Ok(builds) => {
|
|
let build_summaries: Vec<BuildRepositorySummary> = builds.into_iter().map(|build| {
|
|
BuildRepositorySummary {
|
|
build_request_id: build.build_request_id,
|
|
status: format!("{:?}", build.status),
|
|
requested_partitions: build.requested_partitions.into_iter().map(|p| p.str).collect(),
|
|
total_jobs: build.total_jobs,
|
|
completed_jobs: build.completed_jobs,
|
|
failed_jobs: build.failed_jobs,
|
|
cancelled_jobs: build.cancelled_jobs,
|
|
requested_at: build.requested_at,
|
|
started_at: build.started_at,
|
|
completed_at: build.completed_at,
|
|
duration_ms: build.duration_ms,
|
|
cancelled: build.cancelled,
|
|
}
|
|
}).collect();
|
|
|
|
let total_count = build_summaries.len() as u32;
|
|
Ok(Json(BuildsRepositoryListResponse {
|
|
builds: build_summaries,
|
|
total_count,
|
|
}))
|
|
}
|
|
Err(e) => {
|
|
error!("Failed to list builds: {}", e);
|
|
Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to list builds: {}", e),
|
|
}),
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Request for build detail endpoint
|
|
#[derive(Deserialize, JsonSchema)]
|
|
pub struct BuildDetailRequest {
|
|
pub build_request_id: String,
|
|
}
|
|
|
|
/// Get detailed build information
|
|
pub async fn get_build_detail(
|
|
State(service): State<ServiceState>,
|
|
Path(BuildDetailRequest { build_request_id }): Path<BuildDetailRequest>,
|
|
) -> Result<Json<BuildDetailResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
let repository = BuildsRepository::new(service.event_log.clone());
|
|
|
|
match repository.show_protobuf(&build_request_id).await {
|
|
Ok(Some(protobuf_response)) => {
|
|
// Convert protobuf response to service response (with dual status fields)
|
|
let timeline_events: Vec<BuildTimelineEvent> = protobuf_response.timeline.into_iter().map(|event| {
|
|
BuildTimelineEvent {
|
|
timestamp: event.timestamp,
|
|
status_code: event.status_code,
|
|
status_name: event.status_name,
|
|
message: event.message,
|
|
event_type: event.event_type,
|
|
cancel_reason: event.cancel_reason,
|
|
}
|
|
}).collect();
|
|
|
|
Ok(Json(BuildDetailResponse {
|
|
build_request_id: protobuf_response.build_request_id,
|
|
status_code: protobuf_response.status_code,
|
|
status_name: protobuf_response.status_name,
|
|
requested_partitions: protobuf_response.requested_partitions,
|
|
total_jobs: protobuf_response.total_jobs,
|
|
completed_jobs: protobuf_response.completed_jobs,
|
|
failed_jobs: protobuf_response.failed_jobs,
|
|
cancelled_jobs: protobuf_response.cancelled_jobs,
|
|
requested_at: protobuf_response.requested_at,
|
|
started_at: protobuf_response.started_at,
|
|
completed_at: protobuf_response.completed_at,
|
|
duration_ms: protobuf_response.duration_ms,
|
|
cancelled: protobuf_response.cancelled,
|
|
cancel_reason: protobuf_response.cancel_reason,
|
|
timeline: timeline_events,
|
|
}))
|
|
}
|
|
Ok(None) => Err((
|
|
StatusCode::NOT_FOUND,
|
|
Json(ErrorResponse {
|
|
error: format!("Build '{}' not found", build_request_id),
|
|
}),
|
|
)),
|
|
Err(e) => {
|
|
error!("Failed to get build detail: {}", e);
|
|
Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to get build detail: {}", e),
|
|
}),
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Request for build cancel endpoint path
|
|
#[derive(Deserialize, JsonSchema)]
|
|
pub struct BuildCancelPathRequest {
|
|
pub build_request_id: String,
|
|
}
|
|
|
|
/// Cancel a build using repository
|
|
pub async fn cancel_build_repository(
|
|
State(service): State<ServiceState>,
|
|
Path(BuildCancelPathRequest { build_request_id }): Path<BuildCancelPathRequest>,
|
|
Json(request): Json<CancelBuildRepositoryRequest>,
|
|
) -> Result<Json<BuildCancelRepositoryResponse>, (StatusCode, Json<ErrorResponse>)> {
|
|
let repository = BuildsRepository::new(service.event_log.clone());
|
|
|
|
match repository.cancel(&build_request_id, request.reason.clone()).await {
|
|
Ok(()) => Ok(Json(BuildCancelRepositoryResponse {
|
|
cancelled: true,
|
|
build_request_id,
|
|
})),
|
|
Err(e) => {
|
|
error!("Failed to cancel build: {}", e);
|
|
Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
Json(ErrorResponse {
|
|
error: format!("Failed to cancel build: {}", e),
|
|
}),
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Deserialize, JsonSchema)]
|
|
pub struct CancelBuildRepositoryRequest {
|
|
pub reason: String,
|
|
} |