Status code/enum refactor WIP

This commit is contained in:
Stuart Axelbrooke 2025-07-20 14:24:08 -07:00
parent 956bb463ff
commit dcc71bd13b
29 changed files with 3767 additions and 107 deletions

2
.gitignore vendored
View file

@ -12,3 +12,5 @@ node_modules
Cargo.toml
Cargo.lock
databuild/databuild.rs
generated_number
target

View file

@ -34,24 +34,26 @@ genrule(
rust_library(
name = "databuild",
srcs = [
"event_log/mock.rs",
"event_log/mod.rs",
"event_log/postgres.rs",
"event_log/sqlite.rs",
"event_log/stdout.rs",
"event_log/writer.rs",
"event_log/mock.rs",
"format_consistency_test.rs",
"lib.rs",
"mermaid_utils.rs",
"orchestration/error.rs",
"orchestration/events.rs",
"orchestration/mod.rs",
"repositories/builds/mod.rs",
"repositories/jobs/mod.rs",
"repositories/mod.rs",
"repositories/partitions/mod.rs",
"repositories/jobs/mod.rs",
"repositories/tasks/mod.rs",
"repositories/builds/mod.rs",
"service/handlers.rs",
"service/mod.rs",
"status_utils.rs",
":generate_databuild_rust",
],
edition = "2021",
@ -77,6 +79,8 @@ rust_library(
)
# OpenAPI Spec Generator binary (no dashboard dependency)
# No need to run this manually - it will automatically generate source and it will be used in
# the related targets (e.g. //databuild/client:extract_openapi_spec)
rust_binary(
name = "openapi_spec_generator",
srcs = ["service/openapi_spec_generator.rs"],

View file

@ -218,6 +218,12 @@ async fn main() -> Result<()> {
.about("DataBuild unified CLI")
.subcommand_required(false)
.arg_required_else_help(false)
.arg(
Arg::new("partitions")
.help("Partition references to build (legacy direct build mode)")
.num_args(1..)
.value_name("PARTITIONS")
)
.subcommand(
ClapCommand::new("build")
.about("Build partitions using the DataBuild execution engine")
@ -357,7 +363,26 @@ async fn main() -> Result<()> {
handle_builds_command(sub_matches, &event_log_uri).await?;
}
_ => {
// Show help if no subcommand provided
// Check if direct partition arguments were provided (legacy mode)
if let Some(partitions) = matches.get_many::<String>("partitions") {
let partition_list: Vec<String> = partitions.cloned().collect();
if !partition_list.is_empty() {
// Create a synthetic build command with these partitions
let build_cmd = ClapCommand::new("build")
.arg(Arg::new("partitions").num_args(1..))
.arg(Arg::new("event-log").long("event-log"))
.arg(Arg::new("build-request-id").long("build-request-id"));
let build_matches = build_cmd.try_get_matches_from(
std::iter::once("build".to_string()).chain(partition_list.clone())
).map_err(|e| CliError::InvalidArguments(format!("Failed to parse legacy build arguments: {}", e)))?;
handle_build_command(&build_matches).await?;
return Ok(());
}
}
// Show help if no subcommand or arguments provided
let mut cmd = ClapCommand::new("databuild")
.version("1.0")
.about("DataBuild unified CLI");
@ -377,40 +402,51 @@ async fn handle_partitions_command(matches: &ArgMatches, event_log_uri: &str) ->
match matches.subcommand() {
Some(("list", sub_matches)) => {
let limit = sub_matches.get_one::<String>("limit").and_then(|s| s.parse().ok());
let limit = sub_matches.get_one::<String>("limit").and_then(|s| s.parse::<u32>().ok());
let format = sub_matches.get_one::<String>("format").map(|s| s.as_str()).unwrap_or("table");
let partitions = repository.list(limit).await
// Use new protobuf response format for consistency with service
let request = PartitionsListRequest {
limit,
offset: None, // TODO: Add offset support to CLI
status_filter: None, // TODO: Add status filtering to CLI
};
let response = repository.list_protobuf(request).await
.map_err(|e| CliError::Database(format!("Failed to list partitions: {}", e)))?;
match format {
"json" => {
let json = serde_json::to_string_pretty(&partitions)
let json = serde_json::to_string_pretty(&response)
.map_err(|e| CliError::Output(format!("Failed to serialize to JSON: {}", e)))?;
println!("{}", json);
}
_ => {
if partitions.is_empty() {
if response.partitions.is_empty() {
println!("No partitions found");
return Ok(());
}
println!("Partitions ({} total):", partitions.len());
println!("Partitions ({} total):", response.total_count);
println!();
println!("{:<30} {:<15} {:<12} {:<12} {:<20}", "Partition", "Status", "Builds", "Invalidated", "Last Updated");
println!("{}", "-".repeat(90));
for partition in partitions {
let status_str = format!("{:?}", partition.current_status);
for partition in response.partitions {
let last_updated = format_timestamp(partition.last_updated);
println!("{:<30} {:<15} {:<12} {:<12} {:<20}",
partition.partition_ref,
status_str,
partition.status_name, // Use human-readable status name
partition.builds_count,
partition.invalidation_count,
last_updated
);
}
if response.has_more {
println!("\nNote: More results available. Use --limit to control output.");
}
}
}
}

View file

@ -35,26 +35,47 @@ genrule(
"typescript_generated/src/models/ActivityResponse.ts",
"typescript_generated/src/models/AnalyzeRequest.ts",
"typescript_generated/src/models/AnalyzeResponse.ts",
"typescript_generated/src/models/BuildCancelPathRequest.ts",
"typescript_generated/src/models/BuildCancelRepositoryResponse.ts",
"typescript_generated/src/models/BuildDetailRequest.ts",
"typescript_generated/src/models/BuildDetailResponse.ts",
"typescript_generated/src/models/BuildEventSummary.ts",
"typescript_generated/src/models/BuildRepositorySummary.ts",
"typescript_generated/src/models/BuildRequest.ts",
"typescript_generated/src/models/BuildRequestResponse.ts",
"typescript_generated/src/models/BuildStatusRequest.ts",
"typescript_generated/src/models/BuildStatusResponse.ts",
"typescript_generated/src/models/BuildSummary.ts",
"typescript_generated/src/models/BuildsListResponse.ts",
"typescript_generated/src/models/CancelBuildRequest.ts",
"typescript_generated/src/models/BuildTimelineEvent.ts",
"typescript_generated/src/models/BuildsRepositoryListResponse.ts",
"typescript_generated/src/models/CancelBuildRepositoryRequest.ts",
"typescript_generated/src/models/CancelTaskRequest.ts",
"typescript_generated/src/models/InvalidatePartitionRequest.ts",
"typescript_generated/src/models/JobDailyStats.ts",
"typescript_generated/src/models/JobDetailRequest.ts",
"typescript_generated/src/models/JobDetailResponse.ts",
"typescript_generated/src/models/JobMetricsRequest.ts",
"typescript_generated/src/models/JobMetricsResponse.ts",
"typescript_generated/src/models/JobRepositorySummary.ts",
"typescript_generated/src/models/JobRunDetail.ts",
"typescript_generated/src/models/JobRunSummary.ts",
"typescript_generated/src/models/JobSummary.ts",
"typescript_generated/src/models/JobsListResponse.ts",
"typescript_generated/src/models/JobsRepositoryListResponse.ts",
"typescript_generated/src/models/PartitionDetailRequest.ts",
"typescript_generated/src/models/PartitionDetailResponse.ts",
"typescript_generated/src/models/PartitionEventsRequest.ts",
"typescript_generated/src/models/PartitionEventsResponse.ts",
"typescript_generated/src/models/PartitionInvalidatePathRequest.ts",
"typescript_generated/src/models/PartitionInvalidateResponse.ts",
"typescript_generated/src/models/PartitionStatusRequest.ts",
"typescript_generated/src/models/PartitionStatusResponse.ts",
"typescript_generated/src/models/PartitionSummary.ts",
"typescript_generated/src/models/PartitionTimelineEvent.ts",
"typescript_generated/src/models/PartitionsListResponse.ts",
"typescript_generated/src/models/TaskCancelPathRequest.ts",
"typescript_generated/src/models/TaskCancelResponse.ts",
"typescript_generated/src/models/TaskDetailRequest.ts",
"typescript_generated/src/models/TaskDetailResponse.ts",
"typescript_generated/src/models/TaskSummary.ts",
"typescript_generated/src/models/TaskTimelineEvent.ts",
"typescript_generated/src/models/TasksListResponse.ts",
"typescript_generated/src/runtime.ts",
"typescript_generated/src/index.ts",
],
@ -82,26 +103,47 @@ genrule(
cp $$TEMP_DIR/src/models/ActivityResponse.ts $(location typescript_generated/src/models/ActivityResponse.ts)
cp $$TEMP_DIR/src/models/AnalyzeRequest.ts $(location typescript_generated/src/models/AnalyzeRequest.ts)
cp $$TEMP_DIR/src/models/AnalyzeResponse.ts $(location typescript_generated/src/models/AnalyzeResponse.ts)
cp $$TEMP_DIR/src/models/BuildCancelPathRequest.ts $(location typescript_generated/src/models/BuildCancelPathRequest.ts)
cp $$TEMP_DIR/src/models/BuildCancelRepositoryResponse.ts $(location typescript_generated/src/models/BuildCancelRepositoryResponse.ts)
cp $$TEMP_DIR/src/models/BuildDetailRequest.ts $(location typescript_generated/src/models/BuildDetailRequest.ts)
cp $$TEMP_DIR/src/models/BuildDetailResponse.ts $(location typescript_generated/src/models/BuildDetailResponse.ts)
cp $$TEMP_DIR/src/models/BuildEventSummary.ts $(location typescript_generated/src/models/BuildEventSummary.ts)
cp $$TEMP_DIR/src/models/BuildRepositorySummary.ts $(location typescript_generated/src/models/BuildRepositorySummary.ts)
cp $$TEMP_DIR/src/models/BuildRequest.ts $(location typescript_generated/src/models/BuildRequest.ts)
cp $$TEMP_DIR/src/models/BuildRequestResponse.ts $(location typescript_generated/src/models/BuildRequestResponse.ts)
cp $$TEMP_DIR/src/models/BuildStatusRequest.ts $(location typescript_generated/src/models/BuildStatusRequest.ts)
cp $$TEMP_DIR/src/models/BuildStatusResponse.ts $(location typescript_generated/src/models/BuildStatusResponse.ts)
cp $$TEMP_DIR/src/models/BuildSummary.ts $(location typescript_generated/src/models/BuildSummary.ts)
cp $$TEMP_DIR/src/models/BuildsListResponse.ts $(location typescript_generated/src/models/BuildsListResponse.ts)
cp $$TEMP_DIR/src/models/CancelBuildRequest.ts $(location typescript_generated/src/models/CancelBuildRequest.ts)
cp $$TEMP_DIR/src/models/BuildTimelineEvent.ts $(location typescript_generated/src/models/BuildTimelineEvent.ts)
cp $$TEMP_DIR/src/models/BuildsRepositoryListResponse.ts $(location typescript_generated/src/models/BuildsRepositoryListResponse.ts)
cp $$TEMP_DIR/src/models/CancelBuildRepositoryRequest.ts $(location typescript_generated/src/models/CancelBuildRepositoryRequest.ts)
cp $$TEMP_DIR/src/models/CancelTaskRequest.ts $(location typescript_generated/src/models/CancelTaskRequest.ts)
cp $$TEMP_DIR/src/models/InvalidatePartitionRequest.ts $(location typescript_generated/src/models/InvalidatePartitionRequest.ts)
cp $$TEMP_DIR/src/models/JobDailyStats.ts $(location typescript_generated/src/models/JobDailyStats.ts)
cp $$TEMP_DIR/src/models/JobDetailRequest.ts $(location typescript_generated/src/models/JobDetailRequest.ts)
cp $$TEMP_DIR/src/models/JobDetailResponse.ts $(location typescript_generated/src/models/JobDetailResponse.ts)
cp $$TEMP_DIR/src/models/JobMetricsRequest.ts $(location typescript_generated/src/models/JobMetricsRequest.ts)
cp $$TEMP_DIR/src/models/JobMetricsResponse.ts $(location typescript_generated/src/models/JobMetricsResponse.ts)
cp $$TEMP_DIR/src/models/JobRepositorySummary.ts $(location typescript_generated/src/models/JobRepositorySummary.ts)
cp $$TEMP_DIR/src/models/JobRunDetail.ts $(location typescript_generated/src/models/JobRunDetail.ts)
cp $$TEMP_DIR/src/models/JobRunSummary.ts $(location typescript_generated/src/models/JobRunSummary.ts)
cp $$TEMP_DIR/src/models/JobSummary.ts $(location typescript_generated/src/models/JobSummary.ts)
cp $$TEMP_DIR/src/models/JobsListResponse.ts $(location typescript_generated/src/models/JobsListResponse.ts)
cp $$TEMP_DIR/src/models/JobsRepositoryListResponse.ts $(location typescript_generated/src/models/JobsRepositoryListResponse.ts)
cp $$TEMP_DIR/src/models/PartitionDetailRequest.ts $(location typescript_generated/src/models/PartitionDetailRequest.ts)
cp $$TEMP_DIR/src/models/PartitionDetailResponse.ts $(location typescript_generated/src/models/PartitionDetailResponse.ts)
cp $$TEMP_DIR/src/models/PartitionEventsRequest.ts $(location typescript_generated/src/models/PartitionEventsRequest.ts)
cp $$TEMP_DIR/src/models/PartitionEventsResponse.ts $(location typescript_generated/src/models/PartitionEventsResponse.ts)
cp $$TEMP_DIR/src/models/PartitionInvalidatePathRequest.ts $(location typescript_generated/src/models/PartitionInvalidatePathRequest.ts)
cp $$TEMP_DIR/src/models/PartitionInvalidateResponse.ts $(location typescript_generated/src/models/PartitionInvalidateResponse.ts)
cp $$TEMP_DIR/src/models/PartitionStatusRequest.ts $(location typescript_generated/src/models/PartitionStatusRequest.ts)
cp $$TEMP_DIR/src/models/PartitionStatusResponse.ts $(location typescript_generated/src/models/PartitionStatusResponse.ts)
cp $$TEMP_DIR/src/models/PartitionSummary.ts $(location typescript_generated/src/models/PartitionSummary.ts)
cp $$TEMP_DIR/src/models/PartitionTimelineEvent.ts $(location typescript_generated/src/models/PartitionTimelineEvent.ts)
cp $$TEMP_DIR/src/models/PartitionsListResponse.ts $(location typescript_generated/src/models/PartitionsListResponse.ts)
cp $$TEMP_DIR/src/models/TaskCancelPathRequest.ts $(location typescript_generated/src/models/TaskCancelPathRequest.ts)
cp $$TEMP_DIR/src/models/TaskCancelResponse.ts $(location typescript_generated/src/models/TaskCancelResponse.ts)
cp $$TEMP_DIR/src/models/TaskDetailRequest.ts $(location typescript_generated/src/models/TaskDetailRequest.ts)
cp $$TEMP_DIR/src/models/TaskDetailResponse.ts $(location typescript_generated/src/models/TaskDetailResponse.ts)
cp $$TEMP_DIR/src/models/TaskSummary.ts $(location typescript_generated/src/models/TaskSummary.ts)
cp $$TEMP_DIR/src/models/TaskTimelineEvent.ts $(location typescript_generated/src/models/TaskTimelineEvent.ts)
cp $$TEMP_DIR/src/models/TasksListResponse.ts $(location typescript_generated/src/models/TasksListResponse.ts)
cp $$TEMP_DIR/src/runtime.ts $(location typescript_generated/src/runtime.ts)
cp $$TEMP_DIR/src/index.ts $(location typescript_generated/src/index.ts)
""",

View file

@ -306,8 +306,8 @@ export const BuildStatus = {
if (buildResponse.requested_partitions) {
for (const partition_ref of buildResponse.requested_partitions) {
try {
const partition_status = await apiClient.apiV1PartitionsRefStatusGet({
ref: partition_ref
const partition_status = await apiClient.apiV1PartitionsPartitionRefStatusGet({
partition_ref: partition_ref
});
console.log(`Loaded status for partition ${partition_ref}:`, partition_status);
this.partitionStatuses.set(partition_ref, partition_status);
@ -719,14 +719,14 @@ export const PartitionStatus = {
const apiClient = new DefaultApi(new Configuration({ basePath: '' }));
// Load partition status
const statusResponse = await apiClient.apiV1PartitionsRefStatusGet({
ref: this.partitionRef
const statusResponse = await apiClient.apiV1PartitionsPartitionRefStatusGet({
partition_ref: this.partitionRef
});
this.data = statusResponse;
// Load partition events for build history
const eventsResponse = await apiClient.apiV1PartitionsRefEventsGet({
ref: this.partitionRef
const eventsResponse = await apiClient.apiV1PartitionsPartitionRefEventsGet({
partition_ref: this.partitionRef
});
this.events = eventsResponse;

View file

@ -1,5 +1,5 @@
// Import the generated TypeScript client
import { DefaultApi, Configuration, ActivityResponse, BuildSummary, PartitionSummary, JobsListResponse, JobMetricsResponse, JobSummary, JobRunSummary, JobDailyStats } from '../client/typescript_generated/src/index';
import { DefaultApi, Configuration, ActivityResponse, BuildSummary, PartitionSummary, JobsRepositoryListResponse, JobMetricsResponse, JobRepositorySummary, JobRunSummary, JobDailyStats } from '../client/typescript_generated/src/index';
// Configure the API client
const apiConfig = new Configuration({
@ -86,7 +86,7 @@ export class DashboardService {
}
}
async getJobs(searchTerm?: string): Promise<JobSummary[]> {
async getJobs(searchTerm?: string): Promise<JobRepositorySummary[]> {
try {
// Build query parameters manually since the generated client may not support query params correctly
const queryParams = new URLSearchParams();
@ -99,7 +99,7 @@ export class DashboardService {
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const data: JobsListResponse = await response.json();
const data: JobsRepositoryListResponse = await response.json();
return data.jobs;
} catch (error) {
console.error('Failed to fetch jobs:', error);

View file

@ -195,17 +195,19 @@ enum BuildRequestStatus {
// Build request lifecycle event
message BuildRequestEvent {
BuildRequestStatus status = 1;
repeated PartitionRef requested_partitions = 2;
string message = 3; // Optional status message
BuildRequestStatus status_code = 1; // Enum for programmatic use
string status_name = 2; // Human-readable string
repeated PartitionRef requested_partitions = 3;
string message = 4; // Optional status message
}
// Partition state change event
message PartitionEvent {
PartitionRef partition_ref = 1;
PartitionStatus status = 2;
string message = 3; // Optional status message
string job_run_id = 4; // UUID of job run producing this partition (if applicable)
PartitionStatus status_code = 2; // Enum for programmatic use
string status_name = 3; // Human-readable string
string message = 4; // Optional status message
string job_run_id = 5; // UUID of job run producing this partition (if applicable)
}
// Job execution event
@ -213,10 +215,11 @@ message JobEvent {
string job_run_id = 1; // UUID for this job run
JobLabel job_label = 2; // Job being executed
repeated PartitionRef target_partitions = 3; // Partitions this job run produces
JobStatus status = 4;
string message = 5; // Optional status message
JobConfig config = 6; // Job configuration used (for SCHEDULED events)
repeated PartitionManifest manifests = 7; // Results (for COMPLETED events)
JobStatus status_code = 4; // Enum for programmatic use
string status_name = 5; // Human-readable string
string message = 6; // Optional status message
JobConfig config = 7; // Job configuration used (for SCHEDULED events)
repeated PartitionManifest manifests = 8; // Results (for COMPLETED events)
}
// Delegation event (when build request delegates to existing build)
@ -269,6 +272,123 @@ message BuildEvent {
}
}
///////////////////////////////////////////////////////////////////////////////////////////////
// List Operations (Unified CLI/Service Responses)
///////////////////////////////////////////////////////////////////////////////////////////////
//
// Partitions List
//
message PartitionsListRequest {
optional uint32 limit = 1;
optional uint32 offset = 2;
optional string status_filter = 3;
}
message PartitionsListResponse {
repeated PartitionSummary partitions = 1;
uint32 total_count = 2;
bool has_more = 3;
}
message PartitionSummary {
string partition_ref = 1;
PartitionStatus status_code = 2; // Enum for programmatic use
string status_name = 3; // Human-readable string
int64 last_updated = 4;
uint32 builds_count = 5;
uint32 invalidation_count = 6;
optional string last_successful_build = 7;
}
//
// Jobs List
//
message JobsListRequest {
optional uint32 limit = 1;
optional string search = 2;
}
message JobsListResponse {
repeated JobSummary jobs = 1;
uint32 total_count = 2;
}
message JobSummary {
string job_label = 1;
uint32 total_runs = 2;
uint32 successful_runs = 3;
uint32 failed_runs = 4;
uint32 cancelled_runs = 5;
double average_partitions_per_run = 6;
int64 last_run_timestamp = 7;
JobStatus last_run_status_code = 8; // Enum for programmatic use
string last_run_status_name = 9; // Human-readable string
repeated string recent_builds = 10;
}
//
// Tasks List
//
message TasksListRequest {
optional uint32 limit = 1;
}
message TasksListResponse {
repeated TaskSummary tasks = 1;
uint32 total_count = 2;
}
message TaskSummary {
string job_run_id = 1;
string job_label = 2;
string build_request_id = 3;
JobStatus status_code = 4; // Enum for programmatic use
string status_name = 5; // Human-readable string
repeated PartitionRef target_partitions = 6;
int64 scheduled_at = 7;
optional int64 started_at = 8;
optional int64 completed_at = 9;
optional int64 duration_ms = 10;
bool cancelled = 11;
string message = 12;
}
//
// Builds List
//
message BuildsListRequest {
optional uint32 limit = 1;
optional uint32 offset = 2;
optional string status_filter = 3;
}
message BuildsListResponse {
repeated BuildSummary builds = 1;
uint32 total_count = 2;
bool has_more = 3;
}
message BuildSummary {
string build_request_id = 1;
BuildRequestStatus status_code = 2; // Enum for programmatic use
string status_name = 3; // Human-readable string
repeated PartitionRef requested_partitions = 4;
uint32 total_jobs = 5;
uint32 completed_jobs = 6;
uint32 failed_jobs = 7;
uint32 cancelled_jobs = 8;
int64 requested_at = 9;
optional int64 started_at = 10;
optional int64 completed_at = 11;
optional int64 duration_ms = 12;
bool cancelled = 13;
}
///////////////////////////////////////////////////////////////////////////////////////////////
// Services
///////////////////////////////////////////////////////////////////////////////////////////////

755
databuild/event_log/mock.rs Normal file
View file

@ -0,0 +1,755 @@
use crate::*;
use crate::event_log::{BuildEventLog, BuildEventLogError, Result, QueryResult, BuildRequestSummary, PartitionSummary, ActivitySummary};
use async_trait::async_trait;
use std::sync::{Arc, Mutex};
use rusqlite::{Connection, params};
/// MockBuildEventLog provides an in-memory SQLite database for testing
///
/// This implementation makes it easy to specify test data and verify behavior
/// while using the real code paths for event writing and repository queries.
///
/// Key features:
/// - Uses in-memory SQLite for parallel test execution
/// - Provides event constructors with sensible defaults
/// - Allows easy specification of test scenarios
/// - Uses the same SQL schema as production SQLite implementation
pub struct MockBuildEventLog {
connection: Arc<Mutex<Connection>>,
}
impl MockBuildEventLog {
/// Create a new MockBuildEventLog with an in-memory SQLite database
pub async fn new() -> Result<Self> {
let mut conn = Connection::open(":memory:")
.map_err(|e| BuildEventLogError::ConnectionError(e.to_string()))?;
// Disable foreign key constraints for simplicity in testing
// conn.execute("PRAGMA foreign_keys = ON", [])
let mock = Self {
connection: Arc::new(Mutex::new(conn)),
};
// Initialize the schema
mock.initialize().await?;
Ok(mock)
}
/// Create a new MockBuildEventLog with predefined events
pub async fn with_events(events: Vec<BuildEvent>) -> Result<Self> {
let mock = Self::new().await?;
// Insert all provided events
for event in events {
mock.append_event(event).await?;
}
Ok(mock)
}
/// Get the number of events in the mock event log
pub async fn event_count(&self) -> Result<usize> {
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare("SELECT COUNT(*) FROM build_events")
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let count: i64 = stmt.query_row([], |row| row.get(0))
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
Ok(count as usize)
}
/// Get all events ordered by timestamp
pub async fn get_all_events(&self) -> Result<Vec<BuildEvent>> {
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT event_data FROM build_events ORDER BY timestamp ASC"
).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let rows = stmt.query_map([], |row| {
let event_data: String = row.get(0)?;
Ok(event_data)
}).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let mut events = Vec::new();
for row in rows {
let event_data = row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let event: BuildEvent = serde_json::from_str(&event_data)
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
events.push(event);
}
Ok(events)
}
/// Clear all events from the mock event log
pub async fn clear(&self) -> Result<()> {
let conn = self.connection.lock().unwrap();
// Clear all tables
conn.execute("DELETE FROM build_events", [])
.map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute("DELETE FROM build_request_events", [])
.map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute("DELETE FROM partition_events", [])
.map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute("DELETE FROM job_events", [])
.map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute("DELETE FROM delegation_events", [])
.map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute("DELETE FROM job_graph_events", [])
.map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
Ok(())
}
}
#[async_trait]
impl BuildEventLog for MockBuildEventLog {
async fn append_event(&self, event: BuildEvent) -> Result<()> {
let conn = self.connection.lock().unwrap();
// Serialize the entire event for storage
let event_data = serde_json::to_string(&event)
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
// Insert into main events table
conn.execute(
"INSERT INTO build_events (event_id, timestamp, build_request_id, event_type, event_data) VALUES (?1, ?2, ?3, ?4, ?5)",
params![
event.event_id,
event.timestamp,
event.build_request_id,
match &event.event_type {
Some(crate::build_event::EventType::BuildRequestEvent(_)) => "build_request",
Some(crate::build_event::EventType::PartitionEvent(_)) => "partition",
Some(crate::build_event::EventType::JobEvent(_)) => "job",
Some(crate::build_event::EventType::DelegationEvent(_)) => "delegation",
Some(crate::build_event::EventType::JobGraphEvent(_)) => "job_graph",
Some(crate::build_event::EventType::PartitionInvalidationEvent(_)) => "partition_invalidation",
Some(crate::build_event::EventType::TaskCancelEvent(_)) => "task_cancel",
Some(crate::build_event::EventType::BuildCancelEvent(_)) => "build_cancel",
None => "unknown",
},
event_data
],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
// Insert into specific event type table for better querying
match &event.event_type {
Some(crate::build_event::EventType::BuildRequestEvent(br_event)) => {
let partitions_json = serde_json::to_string(&br_event.requested_partitions)
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
conn.execute(
"INSERT INTO build_request_events (event_id, status, requested_partitions, message) VALUES (?1, ?2, ?3, ?4)",
params![
event.event_id,
br_event.status_code.to_string(),
partitions_json,
br_event.message
],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
}
Some(crate::build_event::EventType::PartitionEvent(p_event)) => {
conn.execute(
"INSERT INTO partition_events (event_id, partition_ref, status, message, job_run_id) VALUES (?1, ?2, ?3, ?4, ?5)",
params![
event.event_id,
p_event.partition_ref.as_ref().map(|r| &r.str).unwrap_or(&String::new()),
p_event.status_code.to_string(),
p_event.message,
if p_event.job_run_id.is_empty() { None } else { Some(&p_event.job_run_id) }
],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
}
Some(crate::build_event::EventType::JobEvent(j_event)) => {
let partitions_json = serde_json::to_string(&j_event.target_partitions)
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
let config_json = j_event.config.as_ref()
.map(|c| serde_json::to_string(c))
.transpose()
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
let manifests_json = serde_json::to_string(&j_event.manifests)
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
conn.execute(
"INSERT INTO job_events (event_id, job_run_id, job_label, target_partitions, status, message, config_json, manifests_json) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
params![
event.event_id,
j_event.job_run_id,
j_event.job_label.as_ref().map(|l| &l.label).unwrap_or(&String::new()),
partitions_json,
j_event.status_code.to_string(),
j_event.message,
config_json,
manifests_json
],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
}
Some(crate::build_event::EventType::DelegationEvent(d_event)) => {
conn.execute(
"INSERT INTO delegation_events (event_id, partition_ref, delegated_to_build_request_id, message) VALUES (?1, ?2, ?3, ?4)",
params![
event.event_id,
d_event.partition_ref.as_ref().map(|r| &r.str).unwrap_or(&String::new()),
d_event.delegated_to_build_request_id,
d_event.message
],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
}
Some(crate::build_event::EventType::JobGraphEvent(jg_event)) => {
let job_graph_json = match serde_json::to_string(&jg_event.job_graph) {
Ok(json) => json,
Err(e) => {
return Err(BuildEventLogError::DatabaseError(format!("Failed to serialize job graph: {}", e)));
}
};
conn.execute(
"INSERT INTO job_graph_events (event_id, job_graph_json, message) VALUES (?1, ?2, ?3)",
params![
event.event_id,
job_graph_json,
jg_event.message
],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
}
Some(crate::build_event::EventType::PartitionInvalidationEvent(_pi_event)) => {
// For now, just store in main events table
}
Some(crate::build_event::EventType::TaskCancelEvent(_tc_event)) => {
// For now, just store in main events table
}
Some(crate::build_event::EventType::BuildCancelEvent(_bc_event)) => {
// For now, just store in main events table
}
None => {}
}
Ok(())
}
async fn get_build_request_events(
&self,
build_request_id: &str,
since: Option<i64>
) -> Result<Vec<BuildEvent>> {
let conn = self.connection.lock().unwrap();
let (query, params): (String, Vec<_>) = match since {
Some(timestamp) => (
"SELECT event_data FROM build_events WHERE build_request_id = ?1 AND timestamp > ?2 ORDER BY timestamp ASC".to_string(),
vec![build_request_id.to_string(), timestamp.to_string()]
),
None => (
"SELECT event_data FROM build_events WHERE build_request_id = ?1 ORDER BY timestamp ASC".to_string(),
vec![build_request_id.to_string()]
)
};
let mut stmt = conn.prepare(&query)
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let rows = stmt.query_map(rusqlite::params_from_iter(params.iter()), |row| {
let event_data: String = row.get(0)?;
Ok(event_data)
}).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let mut events = Vec::new();
for row in rows {
let event_data = row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let event: BuildEvent = serde_json::from_str(&event_data)
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
events.push(event);
}
Ok(events)
}
async fn get_partition_events(
&self,
partition_ref: &str,
since: Option<i64>
) -> Result<Vec<BuildEvent>> {
let conn = self.connection.lock().unwrap();
let (query, params): (String, Vec<_>) = match since {
Some(timestamp) => (
"SELECT be.event_data FROM build_events be JOIN partition_events pe ON be.event_id = pe.event_id WHERE pe.partition_ref = ?1 AND be.timestamp > ?2 ORDER BY be.timestamp ASC".to_string(),
vec![partition_ref.to_string(), timestamp.to_string()]
),
None => (
"SELECT be.event_data FROM build_events be JOIN partition_events pe ON be.event_id = pe.event_id WHERE pe.partition_ref = ?1 ORDER BY be.timestamp ASC".to_string(),
vec![partition_ref.to_string()]
)
};
let mut stmt = conn.prepare(&query)
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let rows = stmt.query_map(rusqlite::params_from_iter(params.iter()), |row| {
let event_data: String = row.get(0)?;
Ok(event_data)
}).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let mut events = Vec::new();
for row in rows {
let event_data = row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let event: BuildEvent = serde_json::from_str(&event_data)
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
events.push(event);
}
Ok(events)
}
async fn get_job_run_events(
&self,
job_run_id: &str
) -> Result<Vec<BuildEvent>> {
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT be.event_data FROM build_events be JOIN job_events je ON be.event_id = je.event_id WHERE je.job_run_id = ?1 ORDER BY be.timestamp ASC"
).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let rows = stmt.query_map([job_run_id], |row| {
let event_data: String = row.get(0)?;
Ok(event_data)
}).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let mut events = Vec::new();
for row in rows {
let event_data = row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let event: BuildEvent = serde_json::from_str(&event_data)
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
events.push(event);
}
Ok(events)
}
async fn get_events_in_range(
&self,
start_time: i64,
end_time: i64
) -> Result<Vec<BuildEvent>> {
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT event_data FROM build_events WHERE timestamp >= ?1 AND timestamp <= ?2 ORDER BY timestamp ASC"
).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let rows = stmt.query_map([start_time, end_time], |row| {
let event_data: String = row.get(0)?;
Ok(event_data)
}).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let mut events = Vec::new();
for row in rows {
let event_data = row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let event: BuildEvent = serde_json::from_str(&event_data)
.map_err(|e| BuildEventLogError::SerializationError(e.to_string()))?;
events.push(event);
}
Ok(events)
}
async fn execute_query(&self, query: &str) -> Result<QueryResult> {
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare(query)
.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let column_names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
let rows = stmt.query_map([], |row| {
let mut values = Vec::new();
for i in 0..column_names.len() {
let value: String = row.get::<_, Option<String>>(i)?.unwrap_or_default();
values.push(value);
}
Ok(values)
}).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let mut result_rows = Vec::new();
for row in rows {
let values = row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
result_rows.push(values);
}
Ok(QueryResult {
columns: column_names,
rows: result_rows,
})
}
async fn get_latest_partition_status(
&self,
partition_ref: &str
) -> Result<Option<(PartitionStatus, i64)>> {
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT pe.status, be.timestamp FROM build_events be JOIN partition_events pe ON be.event_id = pe.event_id WHERE pe.partition_ref = ?1 ORDER BY be.timestamp DESC LIMIT 1"
).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let result = stmt.query_row([partition_ref], |row| {
let status_str: String = row.get(0)?;
let timestamp: i64 = row.get(1)?;
let status: i32 = status_str.parse().unwrap_or(0);
Ok((status, timestamp))
});
match result {
Ok((status, timestamp)) => {
let partition_status = match status {
1 => PartitionStatus::PartitionRequested,
2 => PartitionStatus::PartitionAnalyzed,
3 => PartitionStatus::PartitionBuilding,
4 => PartitionStatus::PartitionAvailable,
5 => PartitionStatus::PartitionFailed,
6 => PartitionStatus::PartitionDelegated,
_ => PartitionStatus::PartitionUnknown,
};
Ok(Some((partition_status, timestamp)))
}
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(BuildEventLogError::QueryError(e.to_string())),
}
}
async fn get_active_builds_for_partition(
&self,
partition_ref: &str
) -> Result<Vec<String>> {
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT DISTINCT be.build_request_id FROM build_events be JOIN partition_events pe ON be.event_id = pe.event_id WHERE pe.partition_ref = ?1 AND pe.status IN ('1', '2', '3') ORDER BY be.timestamp DESC"
).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let rows = stmt.query_map([partition_ref], |row| {
let build_request_id: String = row.get(0)?;
Ok(build_request_id)
}).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let mut build_ids = Vec::new();
for row in rows {
let build_id = row.map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
build_ids.push(build_id);
}
Ok(build_ids)
}
async fn initialize(&self) -> Result<()> {
let conn = self.connection.lock().unwrap();
// Create main events table
conn.execute(
"CREATE TABLE IF NOT EXISTS build_events (
event_id TEXT PRIMARY KEY,
timestamp INTEGER NOT NULL,
build_request_id TEXT NOT NULL,
event_type TEXT NOT NULL,
event_data TEXT NOT NULL
)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
// Create specific event type tables
conn.execute(
"CREATE TABLE IF NOT EXISTS build_request_events (
event_id TEXT PRIMARY KEY,
status TEXT NOT NULL,
requested_partitions TEXT NOT NULL,
message TEXT NOT NULL
)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute(
"CREATE TABLE IF NOT EXISTS partition_events (
event_id TEXT PRIMARY KEY,
partition_ref TEXT NOT NULL,
status TEXT NOT NULL,
message TEXT NOT NULL,
job_run_id TEXT
)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute(
"CREATE TABLE IF NOT EXISTS job_events (
event_id TEXT PRIMARY KEY,
job_run_id TEXT NOT NULL,
job_label TEXT NOT NULL,
target_partitions TEXT NOT NULL,
status TEXT NOT NULL,
message TEXT NOT NULL,
config_json TEXT,
manifests_json TEXT NOT NULL
)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute(
"CREATE TABLE IF NOT EXISTS delegation_events (
event_id TEXT PRIMARY KEY,
partition_ref TEXT NOT NULL,
delegated_to_build_request_id TEXT NOT NULL,
message TEXT NOT NULL
)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute(
"CREATE TABLE IF NOT EXISTS job_graph_events (
event_id TEXT PRIMARY KEY,
job_graph_json TEXT NOT NULL,
message TEXT NOT NULL
)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
// Create indexes for common queries
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_build_events_build_request_id ON build_events (build_request_id)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_build_events_timestamp ON build_events (timestamp)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_partition_events_partition_ref ON partition_events (partition_ref)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_job_events_job_run_id ON job_events (job_run_id)",
[],
).map_err(|e| BuildEventLogError::DatabaseError(e.to_string()))?;
Ok(())
}
async fn list_build_requests(
&self,
limit: u32,
offset: u32,
status_filter: Option<BuildRequestStatus>,
) -> Result<(Vec<BuildRequestSummary>, u32)> {
// For simplicity in the mock, return empty results
// Real implementation would query the database
Ok((vec![], 0))
}
async fn list_recent_partitions(
&self,
limit: u32,
offset: u32,
status_filter: Option<PartitionStatus>,
) -> Result<(Vec<PartitionSummary>, u32)> {
// For simplicity in the mock, return empty results
// Real implementation would query the database
Ok((vec![], 0))
}
async fn get_activity_summary(&self) -> Result<ActivitySummary> {
// For simplicity in the mock, return empty activity
Ok(ActivitySummary {
active_builds_count: 0,
recent_builds: vec![],
recent_partitions: vec![],
total_partitions_count: 0,
})
}
async fn get_build_request_for_available_partition(
&self,
partition_ref: &str
) -> Result<Option<String>> {
let conn = self.connection.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT be.build_request_id FROM build_events be JOIN partition_events pe ON be.event_id = pe.event_id WHERE pe.partition_ref = ?1 AND pe.status = '4' ORDER BY be.timestamp DESC LIMIT 1"
).map_err(|e| BuildEventLogError::QueryError(e.to_string()))?;
let result = stmt.query_row([partition_ref], |row| {
let build_request_id: String = row.get(0)?;
Ok(build_request_id)
});
match result {
Ok(build_request_id) => Ok(Some(build_request_id)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(BuildEventLogError::QueryError(e.to_string())),
}
}
}
/// Utility functions for creating test events with sensible defaults
pub mod test_events {
use super::*;
use crate::event_log::{generate_event_id, current_timestamp_nanos};
use uuid::Uuid;
/// Create a build request received event with random defaults
pub fn build_request_received(
build_request_id: Option<String>,
partitions: Vec<PartitionRef>,
) -> BuildEvent {
BuildEvent {
event_id: generate_event_id(),
timestamp: current_timestamp_nanos(),
build_request_id: build_request_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
event_type: Some(build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: BuildRequestStatus::BuildRequestReceived as i32,
status_name: BuildRequestStatus::BuildRequestReceived.to_display_string(),
requested_partitions: partitions,
message: "Build request received".to_string(),
})),
}
}
/// Create a build request event with specific status
pub fn build_request_event(
build_request_id: Option<String>,
partitions: Vec<PartitionRef>,
status: BuildRequestStatus,
) -> BuildEvent {
BuildEvent {
event_id: generate_event_id(),
timestamp: current_timestamp_nanos(),
build_request_id: build_request_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
event_type: Some(build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: status as i32,
status_name: status.to_display_string(),
requested_partitions: partitions,
message: format!("Build request status: {:?}", status),
})),
}
}
/// Create a partition status event with random defaults
pub fn partition_status(
build_request_id: Option<String>,
partition_ref: PartitionRef,
status: PartitionStatus,
job_run_id: Option<String>,
) -> BuildEvent {
BuildEvent {
event_id: generate_event_id(),
timestamp: current_timestamp_nanos(),
build_request_id: build_request_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
event_type: Some(build_event::EventType::PartitionEvent(PartitionEvent {
partition_ref: Some(partition_ref),
status_code: status as i32,
status_name: status.to_display_string(),
message: format!("Partition status: {:?}", status),
job_run_id: job_run_id.unwrap_or_default(),
})),
}
}
/// Create a job event with random defaults
pub fn job_event(
build_request_id: Option<String>,
job_run_id: Option<String>,
job_label: JobLabel,
target_partitions: Vec<PartitionRef>,
status: JobStatus,
) -> BuildEvent {
BuildEvent {
event_id: generate_event_id(),
timestamp: current_timestamp_nanos(),
build_request_id: build_request_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
event_type: Some(build_event::EventType::JobEvent(JobEvent {
job_run_id: job_run_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
job_label: Some(job_label),
target_partitions,
status_code: status as i32,
status_name: status.to_display_string(),
message: format!("Job status: {:?}", status),
config: None,
manifests: vec![],
})),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::test_events::*;
#[tokio::test]
async fn test_mock_build_event_log_basic() {
let mock = MockBuildEventLog::new().await.unwrap();
// Initially empty
assert_eq!(mock.event_count().await.unwrap(), 0);
// Add an event
let build_id = "test-build-123".to_string();
let partition = PartitionRef { str: "test/partition".to_string() };
let event = build_request_received(Some(build_id.clone()), vec![partition]);
mock.append_event(event).await.unwrap();
// Check event count
assert_eq!(mock.event_count().await.unwrap(), 1);
// Query events by build request
let events = mock.get_build_request_events(&build_id, None).await.unwrap();
assert_eq!(events.len(), 1);
// Clear events
mock.clear().await.unwrap();
assert_eq!(mock.event_count().await.unwrap(), 0);
}
#[tokio::test]
async fn test_mock_build_event_log_with_predefined_events() {
let build_id = "test-build-456".to_string();
let partition = PartitionRef { str: "data/users".to_string() };
let events = vec![
build_request_received(Some(build_id.clone()), vec![partition.clone()]),
partition_status(Some(build_id.clone()), partition.clone(), PartitionStatus::PartitionBuilding, None),
partition_status(Some(build_id.clone()), partition.clone(), PartitionStatus::PartitionAvailable, None),
];
let mock = MockBuildEventLog::with_events(events).await.unwrap();
// Should have 3 events
assert_eq!(mock.event_count().await.unwrap(), 3);
// Query partition events
let partition_events = mock.get_partition_events(&partition.str, None).await.unwrap();
assert_eq!(partition_events.len(), 2); // Two partition events
// Check latest partition status
let latest_status = mock.get_latest_partition_status(&partition.str).await.unwrap();
assert!(latest_status.is_some());
let (status, _timestamp) = latest_status.unwrap();
assert_eq!(status, PartitionStatus::PartitionAvailable);
}
#[tokio::test]
async fn test_event_constructors() {
let partition = PartitionRef { str: "test/data".to_string() };
let job_label = JobLabel { label: "//:test_job".to_string() };
// Test build request event constructor
let br_event = build_request_received(None, vec![partition.clone()]);
assert!(matches!(br_event.event_type, Some(build_event::EventType::BuildRequestEvent(_))));
// Test partition event constructor
let p_event = partition_status(None, partition.clone(), PartitionStatus::PartitionAvailable, None);
assert!(matches!(p_event.event_type, Some(build_event::EventType::PartitionEvent(_))));
// Test job event constructor
let j_event = job_event(None, None, job_label, vec![partition], JobStatus::JobCompleted);
assert!(matches!(j_event.event_type, Some(build_event::EventType::JobEvent(_))));
}
}

View file

@ -65,7 +65,17 @@ impl SqliteBuildEventLog {
.unwrap_or_default();
Some(crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status,
status_code: status,
status_name: match status {
1 => BuildRequestStatus::BuildRequestReceived.to_display_string(),
2 => BuildRequestStatus::BuildRequestPlanning.to_display_string(),
3 => BuildRequestStatus::BuildRequestExecuting.to_display_string(),
4 => BuildRequestStatus::BuildRequestCompleted.to_display_string(),
5 => BuildRequestStatus::BuildRequestFailed.to_display_string(),
6 => BuildRequestStatus::BuildRequestCancelled.to_display_string(),
7 => BuildRequestStatus::BuildRequestAnalysisCompleted.to_display_string(),
_ => BuildRequestStatus::BuildRequestUnknown.to_display_string(),
},
requested_partitions,
message,
}))
@ -81,7 +91,16 @@ impl SqliteBuildEventLog {
Some(crate::build_event::EventType::PartitionEvent(PartitionEvent {
partition_ref: Some(PartitionRef { str: partition_ref }),
status,
status_code: status,
status_name: match status {
1 => PartitionStatus::PartitionRequested.to_display_string(),
2 => PartitionStatus::PartitionAnalyzed.to_display_string(),
3 => PartitionStatus::PartitionBuilding.to_display_string(),
4 => PartitionStatus::PartitionAvailable.to_display_string(),
5 => PartitionStatus::PartitionFailed.to_display_string(),
6 => PartitionStatus::PartitionDelegated.to_display_string(),
_ => PartitionStatus::PartitionUnknown.to_display_string(),
},
message,
job_run_id,
}))
@ -108,7 +127,16 @@ impl SqliteBuildEventLog {
job_run_id,
job_label: Some(JobLabel { label: job_label }),
target_partitions,
status,
status_code: status,
status_name: match status {
1 => JobStatus::JobScheduled.to_display_string(),
2 => JobStatus::JobRunning.to_display_string(),
3 => JobStatus::JobCompleted.to_display_string(),
4 => JobStatus::JobFailed.to_display_string(),
5 => JobStatus::JobCancelled.to_display_string(),
6 => JobStatus::JobSkipped.to_display_string(),
_ => JobStatus::JobUnknown.to_display_string(),
},
message,
config,
manifests,
@ -186,7 +214,7 @@ impl BuildEventLog for SqliteBuildEventLog {
"INSERT INTO build_request_events (event_id, status, requested_partitions, message) VALUES (?1, ?2, ?3, ?4)",
params![
event.event_id,
br_event.status.to_string(),
br_event.status_code.to_string(),
partitions_json,
br_event.message
],
@ -198,7 +226,7 @@ impl BuildEventLog for SqliteBuildEventLog {
params![
event.event_id,
p_event.partition_ref.as_ref().map(|r| &r.str).unwrap_or(&String::new()),
p_event.status.to_string(),
p_event.status_code.to_string(),
p_event.message,
if p_event.job_run_id.is_empty() { None } else { Some(&p_event.job_run_id) }
],
@ -221,7 +249,7 @@ impl BuildEventLog for SqliteBuildEventLog {
j_event.job_run_id,
j_event.job_label.as_ref().map(|l| &l.label).unwrap_or(&String::new()),
partitions_json,
j_event.status.to_string(),
j_event.status_code.to_string(),
j_event.message,
config_json,
manifests_json

View file

@ -0,0 +1,452 @@
use crate::*;
use crate::event_log::{BuildEventLog, BuildEventLogError, Result, create_build_event, current_timestamp_nanos, generate_event_id};
use std::sync::Arc;
use log::debug;
/// Common interface for writing events to the build event log with validation
pub struct EventWriter {
event_log: Arc<dyn BuildEventLog>,
}
impl EventWriter {
/// Create a new EventWriter with the specified event log backend
pub fn new(event_log: Arc<dyn BuildEventLog>) -> Self {
Self { event_log }
}
/// Get access to the underlying event log for direct operations
pub fn event_log(&self) -> &dyn BuildEventLog {
self.event_log.as_ref()
}
/// Request a new build for the specified partitions
pub async fn request_build(
&self,
build_request_id: String,
requested_partitions: Vec<PartitionRef>,
) -> Result<()> {
debug!("Writing build request event for build: {}", build_request_id);
let event = create_build_event(
build_request_id,
build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: BuildRequestStatus::BuildRequestReceived as i32,
status_name: BuildRequestStatus::BuildRequestReceived.to_display_string(),
requested_partitions,
message: "Build request received".to_string(),
}),
);
self.event_log.append_event(event).await
}
/// Update build request status
pub async fn update_build_status(
&self,
build_request_id: String,
status: BuildRequestStatus,
message: String,
) -> Result<()> {
debug!("Updating build status for {}: {:?}", build_request_id, status);
let event = create_build_event(
build_request_id,
build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: status as i32,
status_name: status.to_display_string(),
requested_partitions: vec![],
message,
}),
);
self.event_log.append_event(event).await
}
/// Update build request status with partition list
pub async fn update_build_status_with_partitions(
&self,
build_request_id: String,
status: BuildRequestStatus,
requested_partitions: Vec<PartitionRef>,
message: String,
) -> Result<()> {
debug!("Updating build status for {}: {:?}", build_request_id, status);
let event = create_build_event(
build_request_id,
build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: status as i32,
status_name: status.to_display_string(),
requested_partitions,
message,
}),
);
self.event_log.append_event(event).await
}
/// Update partition status
pub async fn update_partition_status(
&self,
build_request_id: String,
partition_ref: PartitionRef,
status: PartitionStatus,
message: String,
job_run_id: Option<String>,
) -> Result<()> {
debug!("Updating partition status for {}: {:?}", partition_ref.str, status);
let event = BuildEvent {
event_id: generate_event_id(),
timestamp: current_timestamp_nanos(),
build_request_id,
event_type: Some(build_event::EventType::PartitionEvent(PartitionEvent {
partition_ref: Some(partition_ref),
status_code: status as i32,
status_name: status.to_display_string(),
message,
job_run_id: job_run_id.unwrap_or_default(),
})),
};
self.event_log.append_event(event).await
}
/// Invalidate a partition with a reason
pub async fn invalidate_partition(
&self,
build_request_id: String,
partition_ref: PartitionRef,
reason: String,
) -> Result<()> {
// First validate that the partition exists by checking its current status
let current_status = self.event_log.get_latest_partition_status(&partition_ref.str).await?;
if current_status.is_none() {
return Err(BuildEventLogError::QueryError(
format!("Cannot invalidate non-existent partition: {}", partition_ref.str)
));
}
let event = BuildEvent {
event_id: generate_event_id(),
timestamp: current_timestamp_nanos(),
build_request_id,
event_type: Some(build_event::EventType::PartitionInvalidationEvent(
PartitionInvalidationEvent {
partition_ref: Some(partition_ref),
reason,
}
)),
};
self.event_log.append_event(event).await
}
/// Schedule a job for execution
pub async fn schedule_job(
&self,
build_request_id: String,
job_run_id: String,
job_label: JobLabel,
target_partitions: Vec<PartitionRef>,
config: JobConfig,
) -> Result<()> {
debug!("Scheduling job {} for partitions: {:?}", job_label.label, target_partitions);
let event = BuildEvent {
event_id: generate_event_id(),
timestamp: current_timestamp_nanos(),
build_request_id,
event_type: Some(build_event::EventType::JobEvent(JobEvent {
job_run_id,
job_label: Some(job_label),
target_partitions,
status_code: JobStatus::JobScheduled as i32,
status_name: JobStatus::JobScheduled.to_display_string(),
message: "Job scheduled for execution".to_string(),
config: Some(config),
manifests: vec![],
})),
};
self.event_log.append_event(event).await
}
/// Update job status
pub async fn update_job_status(
&self,
build_request_id: String,
job_run_id: String,
job_label: JobLabel,
target_partitions: Vec<PartitionRef>,
status: JobStatus,
message: String,
manifests: Vec<PartitionManifest>,
) -> Result<()> {
debug!("Updating job {} status to {:?}", job_run_id, status);
let event = BuildEvent {
event_id: generate_event_id(),
timestamp: current_timestamp_nanos(),
build_request_id,
event_type: Some(build_event::EventType::JobEvent(JobEvent {
job_run_id,
job_label: Some(job_label),
target_partitions,
status_code: status as i32,
status_name: status.to_display_string(),
message,
config: None,
manifests,
})),
};
self.event_log.append_event(event).await
}
/// Cancel a task (job run) with a reason
pub async fn cancel_task(
&self,
build_request_id: String,
job_run_id: String,
reason: String,
) -> Result<()> {
// Validate that the job run exists and is in a cancellable state
let job_events = self.event_log.get_job_run_events(&job_run_id).await?;
if job_events.is_empty() {
return Err(BuildEventLogError::QueryError(
format!("Cannot cancel non-existent job run: {}", job_run_id)
));
}
// Find the latest job status
let latest_status = job_events.iter()
.rev()
.find_map(|e| match &e.event_type {
Some(build_event::EventType::JobEvent(job)) => Some(job.status_code),
_ => None,
});
match latest_status {
Some(status) if status == JobStatus::JobCompleted as i32 => {
return Err(BuildEventLogError::QueryError(
format!("Cannot cancel completed job run: {}", job_run_id)
));
}
Some(status) if status == JobStatus::JobFailed as i32 => {
return Err(BuildEventLogError::QueryError(
format!("Cannot cancel failed job run: {}", job_run_id)
));
}
Some(status) if status == JobStatus::JobCancelled as i32 => {
return Err(BuildEventLogError::QueryError(
format!("Job run already cancelled: {}", job_run_id)
));
}
_ => {}
}
let event = BuildEvent {
event_id: generate_event_id(),
timestamp: current_timestamp_nanos(),
build_request_id,
event_type: Some(build_event::EventType::TaskCancelEvent(TaskCancelEvent {
job_run_id,
reason,
})),
};
self.event_log.append_event(event).await
}
/// Cancel a build request with a reason
pub async fn cancel_build(
&self,
build_request_id: String,
reason: String,
) -> Result<()> {
// Validate that the build exists and is in a cancellable state
let build_events = self.event_log.get_build_request_events(&build_request_id, None).await?;
if build_events.is_empty() {
return Err(BuildEventLogError::QueryError(
format!("Cannot cancel non-existent build: {}", build_request_id)
));
}
// Find the latest build status
let latest_status = build_events.iter()
.rev()
.find_map(|e| match &e.event_type {
Some(build_event::EventType::BuildRequestEvent(br)) => Some(br.status_code),
_ => None,
});
match latest_status {
Some(status) if status == BuildRequestStatus::BuildRequestCompleted as i32 => {
return Err(BuildEventLogError::QueryError(
format!("Cannot cancel completed build: {}", build_request_id)
));
}
Some(status) if status == BuildRequestStatus::BuildRequestFailed as i32 => {
return Err(BuildEventLogError::QueryError(
format!("Cannot cancel failed build: {}", build_request_id)
));
}
Some(status) if status == BuildRequestStatus::BuildRequestCancelled as i32 => {
return Err(BuildEventLogError::QueryError(
format!("Build already cancelled: {}", build_request_id)
));
}
_ => {}
}
let event = BuildEvent {
event_id: generate_event_id(),
timestamp: current_timestamp_nanos(),
build_request_id: build_request_id.clone(),
event_type: Some(build_event::EventType::BuildCancelEvent(BuildCancelEvent {
reason,
})),
};
self.event_log.append_event(event).await?;
// Also emit a build request status update
self.update_build_status(
build_request_id,
BuildRequestStatus::BuildRequestCancelled,
"Build cancelled by user".to_string(),
).await
}
/// Record a delegation event when a partition build is delegated to another build
pub async fn record_delegation(
&self,
build_request_id: String,
partition_ref: PartitionRef,
delegated_to_build_request_id: String,
message: String,
) -> Result<()> {
debug!("Recording delegation of {} to build {}", partition_ref.str, delegated_to_build_request_id);
let event = create_build_event(
build_request_id,
build_event::EventType::DelegationEvent(DelegationEvent {
partition_ref: Some(partition_ref),
delegated_to_build_request_id,
message,
}),
);
self.event_log.append_event(event).await
}
/// Record the analyzed job graph
pub async fn record_job_graph(
&self,
build_request_id: String,
job_graph: JobGraph,
message: String,
) -> Result<()> {
debug!("Recording job graph for build: {}", build_request_id);
let event = BuildEvent {
event_id: generate_event_id(),
timestamp: current_timestamp_nanos(),
build_request_id,
event_type: Some(build_event::EventType::JobGraphEvent(JobGraphEvent {
job_graph: Some(job_graph),
message,
})),
};
self.event_log.append_event(event).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event_log::stdout::StdoutBuildEventLog;
#[tokio::test]
async fn test_event_writer_build_lifecycle() {
let event_log = Arc::new(StdoutBuildEventLog::new());
let writer = EventWriter::new(event_log);
let build_id = "test-build-123".to_string();
let partitions = vec![PartitionRef { str: "test/partition".to_string() }];
// Test build request
writer.request_build(build_id.clone(), partitions.clone()).await.unwrap();
// Test status updates
writer.update_build_status(
build_id.clone(),
BuildRequestStatus::BuildRequestPlanning,
"Starting planning".to_string(),
).await.unwrap();
writer.update_build_status(
build_id.clone(),
BuildRequestStatus::BuildRequestExecuting,
"Starting execution".to_string(),
).await.unwrap();
writer.update_build_status(
build_id.clone(),
BuildRequestStatus::BuildRequestCompleted,
"Build completed successfully".to_string(),
).await.unwrap();
}
#[tokio::test]
async fn test_event_writer_partition_and_job() {
let event_log = Arc::new(StdoutBuildEventLog::new());
let writer = EventWriter::new(event_log);
let build_id = "test-build-456".to_string();
let partition = PartitionRef { str: "data/users".to_string() };
let job_run_id = "job-run-789".to_string();
let job_label = JobLabel { label: "//:test_job".to_string() };
// Test partition status update
writer.update_partition_status(
build_id.clone(),
partition.clone(),
PartitionStatus::PartitionBuilding,
"Building partition".to_string(),
Some(job_run_id.clone()),
).await.unwrap();
// Test job scheduling
let config = JobConfig {
outputs: vec![partition.clone()],
inputs: vec![],
args: vec!["test".to_string()],
env: std::collections::HashMap::new(),
};
writer.schedule_job(
build_id.clone(),
job_run_id.clone(),
job_label.clone(),
vec![partition.clone()],
config,
).await.unwrap();
// Test job status update
writer.update_job_status(
build_id.clone(),
job_run_id,
job_label,
vec![partition],
JobStatus::JobCompleted,
"Job completed successfully".to_string(),
vec![],
).await.unwrap();
}
}

View file

@ -0,0 +1,144 @@
#[cfg(test)]
mod format_consistency_tests {
use super::*;
use crate::*;
use crate::repositories::partitions::PartitionsRepository;
use crate::event_log::mock::{MockBuildEventLog, test_events};
use std::sync::Arc;
#[tokio::test]
async fn test_partitions_list_json_format_consistency() {
// Create test data
let build_id = "test-build-123".to_string();
let partition1 = PartitionRef { str: "data/users".to_string() };
let partition2 = PartitionRef { str: "data/orders".to_string() };
let events = vec![
test_events::build_request_received(Some(build_id.clone()), vec![partition1.clone(), partition2.clone()]),
test_events::partition_status(Some(build_id.clone()), partition1.clone(), PartitionStatus::PartitionBuilding, None),
test_events::partition_status(Some(build_id.clone()), partition1.clone(), PartitionStatus::PartitionAvailable, None),
test_events::partition_status(Some(build_id.clone()), partition2.clone(), PartitionStatus::PartitionBuilding, None),
test_events::partition_status(Some(build_id.clone()), partition2.clone(), PartitionStatus::PartitionFailed, None),
];
let mock_log = Arc::new(MockBuildEventLog::with_events(events).await.unwrap());
let repository = PartitionsRepository::new(mock_log);
// Test the new unified protobuf format
let request = PartitionsListRequest {
limit: Some(10),
offset: None,
status_filter: None,
};
let response = repository.list_protobuf(request).await.unwrap();
// Serialize to JSON and verify structure
let json_value = serde_json::to_value(&response).unwrap();
// Verify top-level structure matches expected protobuf schema
assert!(json_value.get("partitions").is_some());
assert!(json_value.get("total_count").is_some());
assert!(json_value.get("has_more").is_some());
let partitions = json_value["partitions"].as_array().unwrap();
assert_eq!(partitions.len(), 2);
// Verify each partition has dual status fields
for partition in partitions {
assert!(partition.get("partition_ref").is_some());
assert!(partition.get("status_code").is_some(), "Missing status_code field");
assert!(partition.get("status_name").is_some(), "Missing status_name field");
assert!(partition.get("last_updated").is_some());
assert!(partition.get("builds_count").is_some());
assert!(partition.get("invalidation_count").is_some());
// Verify status fields are consistent
let status_code = partition["status_code"].as_i64().unwrap();
let status_name = partition["status_name"].as_str().unwrap();
// Map status codes to expected names
let expected_name = match status_code {
1 => "requested",
2 => "analyzed",
3 => "building",
4 => "available",
5 => "failed",
6 => "delegated",
_ => "unknown",
};
// Find the partition by status to verify correct mapping
if status_name == "available" {
assert_eq!(status_code, 4, "Available status should have code 4");
} else if status_name == "failed" {
assert_eq!(status_code, 5, "Failed status should have code 5");
}
}
// Verify JSON serialization produces expected field names (snake_case for JSON)
let json_str = serde_json::to_string_pretty(&response).unwrap();
assert!(json_str.contains("\"partitions\""));
assert!(json_str.contains("\"total_count\""));
assert!(json_str.contains("\"has_more\""));
assert!(json_str.contains("\"partition_ref\""));
assert!(json_str.contains("\"status_code\""));
assert!(json_str.contains("\"status_name\""));
assert!(json_str.contains("\"last_updated\""));
assert!(json_str.contains("\"builds_count\""));
assert!(json_str.contains("\"invalidation_count\""));
println!("✅ Partitions list JSON format test passed");
println!("Sample JSON output:\n{}", json_str);
}
#[tokio::test]
async fn test_status_conversion_utilities() {
use crate::status_utils::*;
// Test PartitionStatus conversions
let status = PartitionStatus::PartitionAvailable;
assert_eq!(status.to_display_string(), "available");
assert_eq!(PartitionStatus::from_display_string("available"), Some(status));
// Test JobStatus conversions
let job_status = JobStatus::JobCompleted;
assert_eq!(job_status.to_display_string(), "completed");
assert_eq!(JobStatus::from_display_string("completed"), Some(job_status));
// Test BuildRequestStatus conversions
let build_status = BuildRequestStatus::BuildRequestCompleted;
assert_eq!(build_status.to_display_string(), "completed");
assert_eq!(BuildRequestStatus::from_display_string("completed"), Some(build_status));
// Test invalid conversions
assert_eq!(PartitionStatus::from_display_string("invalid"), None);
println!("✅ Status conversion utilities test passed");
}
#[test]
fn test_protobuf_response_helper_functions() {
use crate::status_utils::list_response_helpers::*;
// Test PartitionSummary creation
let summary = create_partition_summary(
"test/partition".to_string(),
PartitionStatus::PartitionAvailable,
1234567890,
5,
2,
Some("build-123".to_string()),
);
assert_eq!(summary.partition_ref, "test/partition");
assert_eq!(summary.status_code, 4); // PartitionAvailable = 4
assert_eq!(summary.status_name, "available");
assert_eq!(summary.last_updated, 1234567890);
assert_eq!(summary.builds_count, 5);
assert_eq!(summary.invalidation_count, 2);
assert_eq!(summary.last_successful_build, Some("build-123".to_string()));
println!("✅ Protobuf response helper functions test passed");
}
}

View file

@ -200,7 +200,8 @@ async fn plan(
let event = create_build_event(
build_request_id.to_string(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestReceived as i32,
status_code: BuildRequestStatus::BuildRequestReceived as i32,
status_name: BuildRequestStatus::BuildRequestReceived.to_display_string(),
requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(),
message: "Analysis started".to_string(),
})
@ -260,7 +261,8 @@ async fn plan(
let event = create_build_event(
build_request_id.to_string(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestPlanning as i32,
status_code: BuildRequestStatus::BuildRequestPlanning as i32,
status_name: BuildRequestStatus::BuildRequestPlanning.to_display_string(),
requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(),
message: "Graph analysis in progress".to_string(),
})
@ -329,7 +331,8 @@ async fn plan(
let event = create_build_event(
build_request_id.to_string(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestAnalysisCompleted as i32,
status_code: BuildRequestStatus::BuildRequestAnalysisCompleted as i32,
status_name: BuildRequestStatus::BuildRequestAnalysisCompleted.to_display_string(),
requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(),
message: format!("Analysis completed successfully, {} tasks planned", nodes.len()),
})
@ -370,7 +373,8 @@ async fn plan(
let event = create_build_event(
build_request_id.to_string(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestFailed as i32,
status_code: BuildRequestStatus::BuildRequestFailed as i32,
status_name: BuildRequestStatus::BuildRequestFailed.to_display_string(),
requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(),
message: "No jobs found for requested partitions".to_string(),
})

View file

@ -430,7 +430,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let event = create_build_event(
build_request_id.clone(),
EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestExecuting as i32,
status_code: BuildRequestStatus::BuildRequestExecuting as i32,
status_name: BuildRequestStatus::BuildRequestExecuting.to_display_string(),
requested_partitions: graph.outputs.clone(),
message: format!("Starting execution of {} jobs", graph.nodes.len()),
})
@ -502,7 +503,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
job_run_id: job_run_id.clone(),
job_label: original_task.job.clone(),
target_partitions: original_task.config.as_ref().unwrap().outputs.clone(),
status: if result.success { JobStatus::JobCompleted as i32 } else { JobStatus::JobFailed as i32 },
status_code: if result.success { JobStatus::JobCompleted as i32 } else { JobStatus::JobFailed as i32 },
status_name: if result.success { JobStatus::JobCompleted.to_display_string() } else { JobStatus::JobFailed.to_display_string() },
message: if result.success { "Job completed successfully".to_string() } else { result.error_message.clone().unwrap_or_default() },
config: original_task.config.clone(),
manifests: vec![], // Would be populated from actual job output
@ -518,7 +520,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
build_request_id.clone(),
EventType::PartitionEvent(PartitionEvent {
partition_ref: Some(output_ref.clone()),
status: if result.success { PartitionStatus::PartitionAvailable as i32 } else { PartitionStatus::PartitionFailed as i32 },
status_code: if result.success { PartitionStatus::PartitionAvailable as i32 } else { PartitionStatus::PartitionFailed as i32 },
status_name: if result.success { PartitionStatus::PartitionAvailable.to_display_string() } else { PartitionStatus::PartitionFailed.to_display_string() },
message: if result.success { "Partition built successfully".to_string() } else { "Partition build failed".to_string() },
job_run_id: job_run_id.clone(),
})
@ -601,7 +604,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
job_run_id: job_run_id.clone(),
job_label: task_node.job.clone(),
target_partitions: task_node.config.as_ref().unwrap().outputs.clone(),
status: JobStatus::JobSkipped as i32,
status_code: JobStatus::JobSkipped as i32,
status_name: JobStatus::JobSkipped.to_display_string(),
message: "Job skipped - all target partitions already available".to_string(),
config: task_node.config.clone(),
manifests: vec![],
@ -638,7 +642,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
job_run_id: job_run_id.clone(),
job_label: task_node.job.clone(),
target_partitions: task_node.config.as_ref().unwrap().outputs.clone(),
status: JobStatus::JobScheduled as i32,
status_code: JobStatus::JobScheduled as i32,
status_name: JobStatus::JobScheduled.to_display_string(),
message: "Job scheduled for execution".to_string(),
config: task_node.config.clone(),
manifests: vec![],
@ -654,7 +659,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
build_request_id.clone(),
EventType::PartitionEvent(PartitionEvent {
partition_ref: Some(output_ref.clone()),
status: PartitionStatus::PartitionBuilding as i32,
status_code: PartitionStatus::PartitionBuilding as i32,
status_name: PartitionStatus::PartitionBuilding.to_display_string(),
message: "Partition build started".to_string(),
job_run_id: job_run_id.clone(),
})
@ -759,7 +765,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let event = create_build_event(
build_request_id.clone(),
EventType::BuildRequestEvent(BuildRequestEvent {
status: final_status as i32,
status_code: final_status as i32,
status_name: final_status.to_display_string(),
requested_partitions: graph.outputs.clone(),
message: format!("Execution completed: {} succeeded, {} failed", success_count, failure_count),
})

View file

@ -15,6 +15,13 @@ pub mod repositories;
pub mod mermaid_utils;
// Status conversion utilities
pub mod status_utils;
// Format consistency tests
#[cfg(test)]
mod format_consistency_test;
// Re-export commonly used types from event_log
pub use event_log::{BuildEventLog, BuildEventLogError, create_build_event_log};

View file

@ -43,7 +43,7 @@ pub fn extract_status_map(events: &[BuildEvent]) -> (HashMap<String, NodeStatus>
match &event.event_type {
Some(crate::build_event::EventType::JobEvent(job_event)) => {
if let Some(job_label) = &job_event.job_label {
let status = match job_event.status {
let status = match job_event.status_code {
1 => NodeStatus::Running, // JOB_SCHEDULED
2 => NodeStatus::Running, // JOB_RUNNING
3 => NodeStatus::Completed, // JOB_COMPLETED
@ -65,7 +65,7 @@ pub fn extract_status_map(events: &[BuildEvent]) -> (HashMap<String, NodeStatus>
}
Some(crate::build_event::EventType::PartitionEvent(partition_event)) => {
if let Some(partition_ref) = &partition_event.partition_ref {
let status = match partition_event.status {
let status = match partition_event.status_code {
1 => NodeStatus::Pending, // PARTITION_REQUESTED
2 => NodeStatus::Pending, // PARTITION_ANALYZED
3 => NodeStatus::Running, // PARTITION_BUILDING
@ -728,7 +728,7 @@ mod tests {
event1.event_type = Some(crate::build_event::EventType::JobEvent({
let mut job_event = JobEvent::default();
job_event.job_label = Some(JobLabel { label: "test_job".to_string() });
job_event.status = 2; // JOB_RUNNING
job_event.status_code = 2; // JOB_RUNNING
job_event
}));
@ -737,7 +737,7 @@ mod tests {
event2.event_type = Some(crate::build_event::EventType::PartitionEvent({
let mut partition_event = PartitionEvent::default();
partition_event.partition_ref = Some(PartitionRef { str: "test/partition".to_string() });
partition_event.status = 4; // PARTITION_AVAILABLE
partition_event.status_code = 4; // PARTITION_AVAILABLE
partition_event
}));
@ -758,7 +758,7 @@ mod tests {
let mut job_event = JobEvent::default();
job_event.job_label = Some(JobLabel { label: "same_job".to_string() });
job_event.target_partitions = vec![PartitionRef { str: "output1".to_string() }];
job_event.status = 2; // JOB_RUNNING
job_event.status_code = 2; // JOB_RUNNING
job_event
}));
@ -767,7 +767,7 @@ mod tests {
let mut job_event = JobEvent::default();
job_event.job_label = Some(JobLabel { label: "same_job".to_string() });
job_event.target_partitions = vec![PartitionRef { str: "output2".to_string() }];
job_event.status = 3; // JOB_COMPLETED
job_event.status_code = 3; // JOB_COMPLETED
job_event
}));
@ -810,7 +810,7 @@ mod tests {
partition_event.event_type = Some(crate::build_event::EventType::PartitionEvent({
let mut pe = PartitionEvent::default();
pe.partition_ref = Some(PartitionRef { str: "input/data".to_string() });
pe.status = 4; // PARTITION_AVAILABLE
pe.status_code = 4; // PARTITION_AVAILABLE
pe
}));
@ -819,7 +819,7 @@ mod tests {
let mut je = JobEvent::default();
je.job_label = Some(JobLabel { label: "job1".to_string() });
je.target_partitions = vec![PartitionRef { str: "intermediate/data".to_string() }];
je.status = 2; // JOB_RUNNING
je.status_code = 2; // JOB_RUNNING
je
}));

View file

@ -10,7 +10,8 @@ pub fn create_build_request_received_event(
create_build_event(
build_request_id,
build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestReceived as i32,
status_code: BuildRequestStatus::BuildRequestReceived as i32,
status_name: BuildRequestStatus::BuildRequestReceived.to_display_string(),
requested_partitions,
message: "Build request received".to_string(),
}),
@ -23,7 +24,8 @@ pub fn create_build_planning_started_event(
create_build_event(
build_request_id,
build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestPlanning as i32,
status_code: BuildRequestStatus::BuildRequestPlanning as i32,
status_name: BuildRequestStatus::BuildRequestPlanning.to_display_string(),
requested_partitions: vec![],
message: "Starting build planning".to_string(),
}),
@ -36,7 +38,8 @@ pub fn create_build_execution_started_event(
create_build_event(
build_request_id,
build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestExecuting as i32,
status_code: BuildRequestStatus::BuildRequestExecuting as i32,
status_name: BuildRequestStatus::BuildRequestExecuting.to_display_string(),
requested_partitions: vec![],
message: "Starting build execution".to_string(),
}),
@ -67,7 +70,8 @@ pub fn create_build_completed_event(
create_build_event(
build_request_id,
build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: status as i32,
status_code: status as i32,
status_name: status.to_display_string(),
requested_partitions: vec![],
message,
}),
@ -82,7 +86,8 @@ pub fn create_analysis_completed_event(
create_build_event(
build_request_id,
build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestAnalysisCompleted as i32,
status_code: BuildRequestStatus::BuildRequestAnalysisCompleted as i32,
status_name: BuildRequestStatus::BuildRequestAnalysisCompleted.to_display_string(),
requested_partitions,
message: format!("Analysis completed successfully, {} tasks planned", task_count),
}),

View file

@ -342,7 +342,7 @@ mod tests {
// Verify first event is build request received
if let Some(build_event::EventType::BuildRequestEvent(br_event)) = &emitted_events[0].event_type {
assert_eq!(br_event.status, BuildRequestStatus::BuildRequestReceived as i32);
assert_eq!(br_event.status_code, BuildRequestStatus::BuildRequestReceived as i32);
assert_eq!(br_event.requested_partitions, partitions);
} else {
panic!("First event should be BuildRequestEvent");
@ -368,7 +368,8 @@ mod tests {
job_run_id: "job-run-123".to_string(),
job_label: Some(JobLabel { label: "//:test_job".to_string() }),
target_partitions: vec![partition.clone()],
status: JobStatus::JobScheduled as i32,
status_code: JobStatus::JobScheduled as i32,
status_name: JobStatus::JobScheduled.to_display_string(),
message: "Job scheduled".to_string(),
config: None,
manifests: vec![],

View file

@ -0,0 +1,458 @@
use crate::*;
use crate::event_log::{BuildEventLog, BuildEventLogError, Result};
use std::sync::Arc;
use std::collections::HashMap;
use serde::Serialize;
/// Repository for querying build data from the build event log
pub struct BuildsRepository {
event_log: Arc<dyn BuildEventLog>,
}
/// Summary of a build request and its current status
#[derive(Debug, Clone, Serialize)]
pub struct BuildInfo {
pub build_request_id: String,
pub status: BuildRequestStatus,
pub requested_partitions: Vec<PartitionRef>,
pub requested_at: i64,
pub started_at: Option<i64>,
pub completed_at: Option<i64>,
pub duration_ms: Option<i64>,
pub total_jobs: usize,
pub completed_jobs: usize,
pub failed_jobs: usize,
pub cancelled_jobs: usize,
pub cancelled: bool,
pub cancel_reason: Option<String>,
}
/// Detailed timeline of a build's execution events
#[derive(Debug, Clone, Serialize)]
pub struct BuildEvent {
pub timestamp: i64,
pub event_type: String,
pub status: Option<BuildRequestStatus>,
pub message: String,
pub cancel_reason: Option<String>,
}
impl BuildsRepository {
/// Create a new BuildsRepository
pub fn new(event_log: Arc<dyn BuildEventLog>) -> Self {
Self { event_log }
}
/// List all builds with their current status
///
/// Returns a list of all build requests that have been made,
/// including their current status and execution details.
pub async fn list(&self, limit: Option<usize>) -> Result<Vec<BuildInfo>> {
// Get all events from the event log
let events = self.event_log.get_events_in_range(0, i64::MAX).await?;
let mut build_data: HashMap<String, BuildInfo> = HashMap::new();
let mut build_cancellations: HashMap<String, String> = HashMap::new();
let mut job_counts: HashMap<String, (usize, usize, usize, usize)> = HashMap::new(); // total, completed, failed, cancelled
// First pass: collect all build cancel events
for event in &events {
if let Some(build_event::EventType::BuildCancelEvent(bc_event)) = &event.event_type {
build_cancellations.insert(event.build_request_id.clone(), bc_event.reason.clone());
}
}
// Second pass: collect job statistics for each build
for event in &events {
if let Some(build_event::EventType::JobEvent(j_event)) = &event.event_type {
let build_id = &event.build_request_id;
let (total, completed, failed, cancelled) = job_counts.entry(build_id.clone()).or_insert((0, 0, 0, 0));
match j_event.status_code {
1 => *total = (*total).max(1), // JobScheduled - count unique jobs
3 => *completed += 1, // JobCompleted
4 => *failed += 1, // JobFailed
5 => *cancelled += 1, // JobCancelled
_ => {}
}
}
}
// Third pass: collect all build request events and build information
for event in events {
if let Some(build_event::EventType::BuildRequestEvent(br_event)) = &event.event_type {
let status = match br_event.status_code {
1 => BuildRequestStatus::BuildRequestReceived,
2 => BuildRequestStatus::BuildRequestPlanning,
3 => BuildRequestStatus::BuildRequestExecuting,
4 => BuildRequestStatus::BuildRequestCompleted,
5 => BuildRequestStatus::BuildRequestFailed,
6 => BuildRequestStatus::BuildRequestCancelled,
_ => BuildRequestStatus::BuildRequestUnknown,
};
// Create or update build info
let build = build_data.entry(event.build_request_id.clone()).or_insert_with(|| {
let (total_jobs, completed_jobs, failed_jobs, cancelled_jobs) =
job_counts.get(&event.build_request_id).unwrap_or(&(0, 0, 0, 0));
BuildInfo {
build_request_id: event.build_request_id.clone(),
status: BuildRequestStatus::BuildRequestUnknown,
requested_partitions: br_event.requested_partitions.clone(),
requested_at: event.timestamp,
started_at: None,
completed_at: None,
duration_ms: None,
total_jobs: *total_jobs,
completed_jobs: *completed_jobs,
failed_jobs: *failed_jobs,
cancelled_jobs: *cancelled_jobs,
cancelled: false,
cancel_reason: None,
}
});
// Update build with new information
build.status = status;
match status {
BuildRequestStatus::BuildRequestReceived => {
build.requested_at = event.timestamp;
}
BuildRequestStatus::BuildRequestExecuting => {
build.started_at = Some(event.timestamp);
}
BuildRequestStatus::BuildRequestCompleted |
BuildRequestStatus::BuildRequestFailed |
BuildRequestStatus::BuildRequestCancelled => {
build.completed_at = Some(event.timestamp);
if let Some(started) = build.started_at {
build.duration_ms = Some((event.timestamp - started) / 1_000_000); // Convert to ms
}
}
_ => {}
}
// Check if this build was cancelled
if let Some(cancel_reason) = build_cancellations.get(&event.build_request_id) {
build.cancelled = true;
build.cancel_reason = Some(cancel_reason.clone());
}
}
}
// Convert to vector and sort by requested time (most recent first)
let mut builds: Vec<BuildInfo> = build_data.into_values().collect();
builds.sort_by(|a, b| b.requested_at.cmp(&a.requested_at));
// Apply limit if specified
if let Some(limit) = limit {
builds.truncate(limit);
}
Ok(builds)
}
/// Show detailed information about a specific build
///
/// Returns the complete timeline of events for the specified build,
/// including all status changes and any cancellation events.
pub async fn show(&self, build_request_id: &str) -> Result<Option<(BuildInfo, Vec<BuildEvent>)>> {
// Get all events for this specific build
let build_events = self.event_log.get_build_request_events(build_request_id, None).await?;
if build_events.is_empty() {
return Ok(None);
}
let mut build_info: Option<BuildInfo> = None;
let mut timeline: Vec<BuildEvent> = Vec::new();
let mut job_counts = (0, 0, 0, 0); // total, completed, failed, cancelled
// Process all events to get job statistics
let all_events = self.event_log.get_events_in_range(0, i64::MAX).await?;
for event in &all_events {
if event.build_request_id == build_request_id {
if let Some(build_event::EventType::JobEvent(j_event)) = &event.event_type {
match j_event.status_code {
1 => job_counts.0 = job_counts.0.max(1), // JobScheduled - count unique jobs
3 => job_counts.1 += 1, // JobCompleted
4 => job_counts.2 += 1, // JobFailed
5 => job_counts.3 += 1, // JobCancelled
_ => {}
}
}
}
}
// Process build request events to build timeline
for event in &build_events {
if let Some(build_event::EventType::BuildRequestEvent(br_event)) = &event.event_type {
let status = match br_event.status_code {
1 => BuildRequestStatus::BuildRequestReceived,
2 => BuildRequestStatus::BuildRequestPlanning,
3 => BuildRequestStatus::BuildRequestExecuting,
4 => BuildRequestStatus::BuildRequestCompleted,
5 => BuildRequestStatus::BuildRequestFailed,
6 => BuildRequestStatus::BuildRequestCancelled,
_ => BuildRequestStatus::BuildRequestUnknown,
};
// Create or update build info
if build_info.is_none() {
build_info = Some(BuildInfo {
build_request_id: event.build_request_id.clone(),
status: BuildRequestStatus::BuildRequestUnknown,
requested_partitions: br_event.requested_partitions.clone(),
requested_at: event.timestamp,
started_at: None,
completed_at: None,
duration_ms: None,
total_jobs: job_counts.0,
completed_jobs: job_counts.1,
failed_jobs: job_counts.2,
cancelled_jobs: job_counts.3,
cancelled: false,
cancel_reason: None,
});
}
let build = build_info.as_mut().unwrap();
build.status = status;
match status {
BuildRequestStatus::BuildRequestReceived => {
build.requested_at = event.timestamp;
}
BuildRequestStatus::BuildRequestExecuting => {
build.started_at = Some(event.timestamp);
}
BuildRequestStatus::BuildRequestCompleted |
BuildRequestStatus::BuildRequestFailed |
BuildRequestStatus::BuildRequestCancelled => {
build.completed_at = Some(event.timestamp);
if let Some(started) = build.started_at {
build.duration_ms = Some((event.timestamp - started) / 1_000_000); // Convert to ms
}
}
_ => {}
}
// Add to timeline
timeline.push(BuildEvent {
timestamp: event.timestamp,
event_type: "build_status_change".to_string(),
status: Some(status),
message: format!("Build status: {:?}", status),
cancel_reason: None,
});
}
}
// Also check for build cancel events in all events
for event in all_events {
if event.build_request_id == build_request_id {
if let Some(build_event::EventType::BuildCancelEvent(bc_event)) = &event.event_type {
if let Some(build) = build_info.as_mut() {
build.cancelled = true;
build.cancel_reason = Some(bc_event.reason.clone());
}
timeline.push(BuildEvent {
timestamp: event.timestamp,
event_type: "build_cancel".to_string(),
status: None,
message: "Build cancelled".to_string(),
cancel_reason: Some(bc_event.reason.clone()),
});
}
}
}
// Sort timeline by timestamp
timeline.sort_by_key(|e| e.timestamp);
Ok(build_info.map(|info| (info, timeline)))
}
/// Cancel a build with a reason
///
/// This method uses the EventWriter to write a build cancellation event.
/// It validates that the build exists and is in a cancellable state.
pub async fn cancel(&self, build_request_id: &str, reason: String) -> Result<()> {
// First check if the build exists and get its current status
let build_info = self.show(build_request_id).await?;
if build_info.is_none() {
return Err(BuildEventLogError::QueryError(
format!("Cannot cancel non-existent build: {}", build_request_id)
));
}
let (build, _timeline) = build_info.unwrap();
// Check if build is in a cancellable state
match build.status {
BuildRequestStatus::BuildRequestCompleted => {
return Err(BuildEventLogError::QueryError(
format!("Cannot cancel completed build: {}", build_request_id)
));
}
BuildRequestStatus::BuildRequestFailed => {
return Err(BuildEventLogError::QueryError(
format!("Cannot cancel failed build: {}", build_request_id)
));
}
BuildRequestStatus::BuildRequestCancelled => {
return Err(BuildEventLogError::QueryError(
format!("Build already cancelled: {}", build_request_id)
));
}
_ => {}
}
// Use EventWriter to write the cancellation event
let event_writer = crate::event_log::writer::EventWriter::new(self.event_log.clone());
event_writer.cancel_build(build_request_id.to_string(), reason).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event_log::mock::{MockBuildEventLog, test_events};
#[tokio::test]
async fn test_builds_repository_list_empty() {
let mock_log = Arc::new(MockBuildEventLog::new().await.unwrap());
let repo = BuildsRepository::new(mock_log);
let builds = repo.list(None).await.unwrap();
assert!(builds.is_empty());
}
#[tokio::test]
async fn test_builds_repository_list_with_data() {
let build_id1 = "build-123".to_string();
let build_id2 = "build-456".to_string();
let partition1 = PartitionRef { str: "data/users".to_string() };
let partition2 = PartitionRef { str: "data/orders".to_string() };
// Create events for multiple builds
let events = vec![
test_events::build_request_event(Some(build_id1.clone()), vec![partition1.clone()], BuildRequestStatus::BuildRequestReceived),
test_events::build_request_event(Some(build_id1.clone()), vec![partition1.clone()], BuildRequestStatus::BuildRequestCompleted),
test_events::build_request_event(Some(build_id2.clone()), vec![partition2.clone()], BuildRequestStatus::BuildRequestReceived),
test_events::build_request_event(Some(build_id2.clone()), vec![partition2.clone()], BuildRequestStatus::BuildRequestFailed),
];
let mock_log = Arc::new(MockBuildEventLog::with_events(events).await.unwrap());
let repo = BuildsRepository::new(mock_log);
let builds = repo.list(None).await.unwrap();
assert_eq!(builds.len(), 2);
// Find builds by id
let build1 = builds.iter().find(|b| b.build_request_id == build_id1).unwrap();
let build2 = builds.iter().find(|b| b.build_request_id == build_id2).unwrap();
assert_eq!(build1.status, BuildRequestStatus::BuildRequestCompleted);
assert_eq!(build1.requested_partitions.len(), 1);
assert!(!build1.cancelled);
assert_eq!(build2.status, BuildRequestStatus::BuildRequestFailed);
assert_eq!(build2.requested_partitions.len(), 1);
assert!(!build2.cancelled);
}
#[tokio::test]
async fn test_builds_repository_show() {
let build_id = "build-789".to_string();
let partition = PartitionRef { str: "analytics/daily".to_string() };
let events = vec![
test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatus::BuildRequestReceived),
test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatus::BuildRequestPlanning),
test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatus::BuildRequestExecuting),
test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatus::BuildRequestCompleted),
];
let mock_log = Arc::new(MockBuildEventLog::with_events(events).await.unwrap());
let repo = BuildsRepository::new(mock_log);
let result = repo.show(&build_id).await.unwrap();
assert!(result.is_some());
let (info, timeline) = result.unwrap();
assert_eq!(info.build_request_id, build_id);
assert_eq!(info.status, BuildRequestStatus::BuildRequestCompleted);
assert!(!info.cancelled);
assert_eq!(timeline.len(), 4);
assert_eq!(timeline[0].status, Some(BuildRequestStatus::BuildRequestReceived));
assert_eq!(timeline[1].status, Some(BuildRequestStatus::BuildRequestPlanning));
assert_eq!(timeline[2].status, Some(BuildRequestStatus::BuildRequestExecuting));
assert_eq!(timeline[3].status, Some(BuildRequestStatus::BuildRequestCompleted));
}
#[tokio::test]
async fn test_builds_repository_show_nonexistent() {
let mock_log = Arc::new(MockBuildEventLog::new().await.unwrap());
let repo = BuildsRepository::new(mock_log);
let result = repo.show("nonexistent-build").await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_builds_repository_cancel() {
let build_id = "build-cancel-test".to_string();
let partition = PartitionRef { str: "test/data".to_string() };
// Start with a running build
let events = vec![
test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatus::BuildRequestReceived),
test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatus::BuildRequestExecuting),
];
let mock_log = Arc::new(MockBuildEventLog::with_events(events).await.unwrap());
let repo = BuildsRepository::new(mock_log.clone());
// Cancel the build
repo.cancel(&build_id, "User requested cancellation".to_string()).await.unwrap();
// Verify the cancellation was recorded
// Note: This test demonstrates the pattern, but the MockBuildEventLog would need
// to be enhanced to properly store build cancel events for full verification
// Try to cancel a non-existent build
let result = repo.cancel("nonexistent-build", "Should fail".to_string()).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_builds_repository_cancel_completed_build() {
let build_id = "completed-build".to_string();
let partition = PartitionRef { str: "test/data".to_string() };
// Create a completed build
let events = vec![
test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatus::BuildRequestReceived),
test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatus::BuildRequestCompleted),
];
let mock_log = Arc::new(MockBuildEventLog::with_events(events).await.unwrap());
let repo = BuildsRepository::new(mock_log);
// Try to cancel the completed build - should fail
let result = repo.cancel(&build_id, "Should fail".to_string()).await;
assert!(result.is_err());
if let Err(BuildEventLogError::QueryError(msg)) = result {
assert!(msg.contains("Cannot cancel completed build"));
} else {
panic!("Expected QueryError for completed build cancellation");
}
}
}

View file

@ -0,0 +1,422 @@
use crate::*;
use crate::event_log::{BuildEventLog, Result};
use std::sync::Arc;
use std::collections::HashMap;
use serde::Serialize;
/// Repository for querying job data from the build event log
pub struct JobsRepository {
event_log: Arc<dyn BuildEventLog>,
}
/// Summary of a job's execution history and statistics
#[derive(Debug, Clone, Serialize)]
pub struct JobInfo {
pub job_label: String,
pub total_runs: usize,
pub successful_runs: usize,
pub failed_runs: usize,
pub cancelled_runs: usize,
pub last_run_timestamp: i64,
pub last_run_status: JobStatus,
pub average_partitions_per_run: f64,
pub recent_builds: Vec<String>, // Build request IDs that used this job
}
/// Detailed information about a specific job execution
#[derive(Debug, Clone, Serialize)]
pub struct JobRunDetail {
pub job_run_id: String,
pub job_label: String,
pub build_request_id: String,
pub target_partitions: Vec<PartitionRef>,
pub status: JobStatus,
pub scheduled_at: i64,
pub started_at: Option<i64>,
pub completed_at: Option<i64>,
pub duration_ms: Option<i64>,
pub message: String,
pub config: Option<JobConfig>,
pub manifests: Vec<PartitionManifest>,
}
impl JobsRepository {
/// Create a new JobsRepository
pub fn new(event_log: Arc<dyn BuildEventLog>) -> Self {
Self { event_log }
}
/// List all jobs with their execution statistics
///
/// Returns a summary of all jobs that have been executed, including
/// success/failure statistics and recent activity.
pub async fn list(&self, limit: Option<usize>) -> Result<Vec<JobInfo>> {
// Get all job events from the event log
let events = self.event_log.get_events_in_range(0, i64::MAX).await?;
let mut job_data: HashMap<String, Vec<JobRunDetail>> = HashMap::new();
// Collect all job events and group by job label
for event in events {
if let Some(build_event::EventType::JobEvent(j_event)) = &event.event_type {
let job_label = j_event.job_label.as_ref()
.map(|l| l.label.clone())
.unwrap_or_else(|| "unknown".to_string());
let status = match j_event.status_code {
1 => JobStatus::JobScheduled,
2 => JobStatus::JobRunning,
3 => JobStatus::JobCompleted,
4 => JobStatus::JobFailed,
5 => JobStatus::JobCancelled,
6 => JobStatus::JobSkipped,
_ => JobStatus::JobUnknown,
};
// Create or update job run detail
let job_runs = job_data.entry(job_label.clone()).or_insert_with(Vec::new);
// Find existing run or create new one
if let Some(existing_run) = job_runs.iter_mut().find(|r| r.job_run_id == j_event.job_run_id) {
// Update existing run with new status
existing_run.status = status;
existing_run.message = j_event.message.clone();
match status {
JobStatus::JobRunning => {
existing_run.started_at = Some(event.timestamp);
}
JobStatus::JobCompleted | JobStatus::JobFailed | JobStatus::JobCancelled => {
existing_run.completed_at = Some(event.timestamp);
if let Some(started) = existing_run.started_at {
existing_run.duration_ms = Some((event.timestamp - started) / 1_000_000); // Convert to ms
}
existing_run.manifests = j_event.manifests.clone();
}
_ => {}
}
} else {
// Create new job run
let job_run = JobRunDetail {
job_run_id: j_event.job_run_id.clone(),
job_label: job_label.clone(),
build_request_id: event.build_request_id.clone(),
target_partitions: j_event.target_partitions.clone(),
status,
scheduled_at: event.timestamp,
started_at: if status == JobStatus::JobRunning { Some(event.timestamp) } else { None },
completed_at: None,
duration_ms: None,
message: j_event.message.clone(),
config: j_event.config.clone(),
manifests: j_event.manifests.clone(),
};
job_runs.push(job_run);
}
}
}
// Convert to JobInfo structs with statistics
let mut job_infos: Vec<JobInfo> = job_data.into_iter()
.map(|(job_label, job_runs)| {
let total_runs = job_runs.len();
let successful_runs = job_runs.iter().filter(|r| r.status == JobStatus::JobCompleted).count();
let failed_runs = job_runs.iter().filter(|r| r.status == JobStatus::JobFailed).count();
let cancelled_runs = job_runs.iter().filter(|r| r.status == JobStatus::JobCancelled).count();
let (last_run_timestamp, last_run_status) = job_runs.iter()
.max_by_key(|r| r.scheduled_at)
.map(|r| (r.scheduled_at, r.status.clone()))
.unwrap_or((0, JobStatus::JobUnknown));
let total_partitions: usize = job_runs.iter()
.map(|r| r.target_partitions.len())
.sum();
let average_partitions_per_run = if total_runs > 0 {
total_partitions as f64 / total_runs as f64
} else {
0.0
};
// Get recent unique build request IDs
let mut recent_builds: Vec<String> = job_runs.iter()
.map(|r| r.build_request_id.clone())
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
recent_builds.sort();
recent_builds.truncate(10); // Keep last 10 builds
JobInfo {
job_label,
total_runs,
successful_runs,
failed_runs,
cancelled_runs,
last_run_timestamp,
last_run_status,
average_partitions_per_run,
recent_builds,
}
})
.collect();
// Sort by last run timestamp (most recent first)
job_infos.sort_by(|a, b| b.last_run_timestamp.cmp(&a.last_run_timestamp));
// Apply limit if specified
if let Some(limit) = limit {
job_infos.truncate(limit);
}
Ok(job_infos)
}
/// Show detailed information about a specific job
///
/// Returns all execution runs for the specified job label, including
/// detailed timing, status, and output information.
pub async fn show(&self, job_label: &str) -> Result<Option<(JobInfo, Vec<JobRunDetail>)>> {
// Get all job events for this specific job
let events = self.event_log.get_events_in_range(0, i64::MAX).await?;
let mut job_runs: Vec<JobRunDetail> = Vec::new();
// Collect all job events for this job label
for event in events {
if let Some(build_event::EventType::JobEvent(j_event)) = &event.event_type {
let event_job_label = j_event.job_label.as_ref()
.map(|l| l.label.clone())
.unwrap_or_else(|| "unknown".to_string());
if event_job_label != job_label {
continue;
}
let status = match j_event.status_code {
1 => JobStatus::JobScheduled,
2 => JobStatus::JobRunning,
3 => JobStatus::JobCompleted,
4 => JobStatus::JobFailed,
5 => JobStatus::JobCancelled,
6 => JobStatus::JobSkipped,
_ => JobStatus::JobUnknown,
};
// Find existing run or create new one
if let Some(existing_run) = job_runs.iter_mut().find(|r| r.job_run_id == j_event.job_run_id) {
// Update existing run with new status
existing_run.status = status;
existing_run.message = j_event.message.clone();
match status {
JobStatus::JobRunning => {
existing_run.started_at = Some(event.timestamp);
}
JobStatus::JobCompleted | JobStatus::JobFailed | JobStatus::JobCancelled => {
existing_run.completed_at = Some(event.timestamp);
if let Some(started) = existing_run.started_at {
existing_run.duration_ms = Some((event.timestamp - started) / 1_000_000); // Convert to ms
}
existing_run.manifests = j_event.manifests.clone();
}
_ => {}
}
} else {
// Create new job run
let job_run = JobRunDetail {
job_run_id: j_event.job_run_id.clone(),
job_label: job_label.to_string(),
build_request_id: event.build_request_id.clone(),
target_partitions: j_event.target_partitions.clone(),
status,
scheduled_at: event.timestamp,
started_at: if status == JobStatus::JobRunning { Some(event.timestamp) } else { None },
completed_at: None,
duration_ms: None,
message: j_event.message.clone(),
config: j_event.config.clone(),
manifests: j_event.manifests.clone(),
};
job_runs.push(job_run);
}
}
}
if job_runs.is_empty() {
return Ok(None);
}
// Sort runs by scheduled time (most recent first)
job_runs.sort_by(|a, b| b.scheduled_at.cmp(&a.scheduled_at));
// Calculate job statistics
let total_runs = job_runs.len();
let successful_runs = job_runs.iter().filter(|r| r.status == JobStatus::JobCompleted).count();
let failed_runs = job_runs.iter().filter(|r| r.status == JobStatus::JobFailed).count();
let cancelled_runs = job_runs.iter().filter(|r| r.status == JobStatus::JobCancelled).count();
let (last_run_timestamp, last_run_status) = job_runs.iter()
.max_by_key(|r| r.scheduled_at)
.map(|r| (r.scheduled_at, r.status.clone()))
.unwrap_or((0, JobStatus::JobUnknown));
let total_partitions: usize = job_runs.iter()
.map(|r| r.target_partitions.len())
.sum();
let average_partitions_per_run = if total_runs > 0 {
total_partitions as f64 / total_runs as f64
} else {
0.0
};
// Get recent unique build request IDs
let mut recent_builds: Vec<String> = job_runs.iter()
.map(|r| r.build_request_id.clone())
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
recent_builds.sort();
recent_builds.truncate(10); // Keep last 10 builds
let job_info = JobInfo {
job_label: job_label.to_string(),
total_runs,
successful_runs,
failed_runs,
cancelled_runs,
last_run_timestamp,
last_run_status,
average_partitions_per_run,
recent_builds,
};
Ok(Some((job_info, job_runs)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event_log::mock::{MockBuildEventLog, test_events};
#[tokio::test]
async fn test_jobs_repository_list_empty() {
let mock_log = Arc::new(MockBuildEventLog::new().await.unwrap());
let repo = JobsRepository::new(mock_log);
let jobs = repo.list(None).await.unwrap();
assert!(jobs.is_empty());
}
#[tokio::test]
async fn test_jobs_repository_list_with_data() {
let build_id = "test-build-123".to_string();
let job_label1 = JobLabel { label: "//:process_data".to_string() };
let job_label2 = JobLabel { label: "//:generate_reports".to_string() };
let partition1 = PartitionRef { str: "data/users".to_string() };
let partition2 = PartitionRef { str: "reports/summary".to_string() };
// Create events for multiple jobs
let events = vec![
test_events::job_event(Some(build_id.clone()), Some("job-run-1".to_string()), job_label1.clone(), vec![partition1.clone()], JobStatus::JobScheduled),
test_events::job_event(Some(build_id.clone()), Some("job-run-1".to_string()), job_label1.clone(), vec![partition1.clone()], JobStatus::JobCompleted),
test_events::job_event(Some(build_id.clone()), Some("job-run-2".to_string()), job_label2.clone(), vec![partition2.clone()], JobStatus::JobScheduled),
test_events::job_event(Some(build_id.clone()), Some("job-run-2".to_string()), job_label2.clone(), vec![partition2.clone()], JobStatus::JobFailed),
];
let mock_log = Arc::new(MockBuildEventLog::with_events(events).await.unwrap());
let repo = JobsRepository::new(mock_log);
let jobs = repo.list(None).await.unwrap();
assert_eq!(jobs.len(), 2);
// Find jobs by label
let process_job = jobs.iter().find(|j| j.job_label == "//:process_data").unwrap();
let reports_job = jobs.iter().find(|j| j.job_label == "//:generate_reports").unwrap();
assert_eq!(process_job.total_runs, 1);
assert_eq!(process_job.successful_runs, 1);
assert_eq!(process_job.failed_runs, 0);
assert_eq!(process_job.last_run_status, JobStatus::JobCompleted);
assert_eq!(reports_job.total_runs, 1);
assert_eq!(reports_job.successful_runs, 0);
assert_eq!(reports_job.failed_runs, 1);
assert_eq!(reports_job.last_run_status, JobStatus::JobFailed);
}
#[tokio::test]
async fn test_jobs_repository_show() {
let build_id = "test-build-456".to_string();
let job_label = JobLabel { label: "//:analytics_job".to_string() };
let partition = PartitionRef { str: "analytics/daily".to_string() };
let events = vec![
test_events::job_event(Some(build_id.clone()), Some("job-run-123".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobScheduled),
test_events::job_event(Some(build_id.clone()), Some("job-run-123".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobRunning),
test_events::job_event(Some(build_id.clone()), Some("job-run-123".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobCompleted),
];
let mock_log = Arc::new(MockBuildEventLog::with_events(events).await.unwrap());
let repo = JobsRepository::new(mock_log);
let result = repo.show(&job_label.label).await.unwrap();
assert!(result.is_some());
let (info, runs) = result.unwrap();
assert_eq!(info.job_label, "//:analytics_job");
assert_eq!(info.total_runs, 1);
assert_eq!(info.successful_runs, 1);
assert_eq!(info.last_run_status, JobStatus::JobCompleted);
assert_eq!(runs.len(), 1);
let run = &runs[0];
assert_eq!(run.job_run_id, "job-run-123");
assert_eq!(run.status, JobStatus::JobCompleted);
assert_eq!(run.target_partitions.len(), 1);
assert_eq!(run.target_partitions[0].str, "analytics/daily");
}
#[tokio::test]
async fn test_jobs_repository_show_nonexistent() {
let mock_log = Arc::new(MockBuildEventLog::new().await.unwrap());
let repo = JobsRepository::new(mock_log);
let result = repo.show("//:nonexistent_job").await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_jobs_repository_statistics() {
let build_id = "test-build-789".to_string();
let job_label = JobLabel { label: "//:batch_processor".to_string() };
let partition = PartitionRef { str: "batch/data".to_string() };
// Create multiple runs with different outcomes
let events = vec![
// First run - successful
test_events::job_event(Some(build_id.clone()), Some("run-1".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobScheduled),
test_events::job_event(Some(build_id.clone()), Some("run-1".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobCompleted),
// Second run - failed
test_events::job_event(Some(build_id.clone()), Some("run-2".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobScheduled),
test_events::job_event(Some(build_id.clone()), Some("run-2".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobFailed),
// Third run - cancelled
test_events::job_event(Some(build_id.clone()), Some("run-3".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobScheduled),
test_events::job_event(Some(build_id.clone()), Some("run-3".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobCancelled),
];
let mock_log = Arc::new(MockBuildEventLog::with_events(events).await.unwrap());
let repo = JobsRepository::new(mock_log);
let result = repo.show(&job_label.label).await.unwrap();
assert!(result.is_some());
let (info, _runs) = result.unwrap();
assert_eq!(info.total_runs, 3);
assert_eq!(info.successful_runs, 1);
assert_eq!(info.failed_runs, 1);
assert_eq!(info.cancelled_runs, 1);
assert_eq!(info.average_partitions_per_run, 1.0);
}
}

View file

@ -0,0 +1,17 @@
/// Repository pattern implementations for reading from the build event log
///
/// This module provides read-only repository interfaces that query the build event log
/// for different types of data. Each repository focuses on a specific domain:
///
/// - PartitionsRepository: Query partition status and history
/// - JobsRepository: Query job execution data
/// - TasksRepository: Query task (job run) information
/// - BuildsRepository: Query build request data
///
/// All repositories work with any BuildEventLog implementation and provide
/// a clean separation between read and write operations.
pub mod partitions;
pub mod jobs;
pub mod tasks;
pub mod builds;

View file

@ -0,0 +1,394 @@
use crate::*;
use crate::event_log::{BuildEventLog, BuildEventLogError, Result};
use crate::status_utils::list_response_helpers;
use std::sync::Arc;
use std::collections::HashMap;
use serde::Serialize;
/// Repository for querying partition data from the build event log
pub struct PartitionsRepository {
event_log: Arc<dyn BuildEventLog>,
}
/// Summary of a partition's current state and history
#[derive(Debug, Clone, Serialize)]
pub struct PartitionInfo {
pub partition_ref: String,
pub current_status: PartitionStatus,
pub last_updated: i64,
pub builds_count: usize,
pub last_successful_build: Option<String>,
pub invalidation_count: usize,
}
/// Detailed partition status with timeline
#[derive(Debug, Clone, Serialize)]
pub struct PartitionStatusEvent {
pub timestamp: i64,
pub status: PartitionStatus,
pub message: String,
pub build_request_id: String,
pub job_run_id: Option<String>,
}
impl PartitionsRepository {
/// Create a new PartitionsRepository
pub fn new(event_log: Arc<dyn BuildEventLog>) -> Self {
Self { event_log }
}
/// List all partitions with their current status
///
/// Returns a list of all partitions that have been referenced in the build event log,
/// along with their current status and summary information.
pub async fn list(&self, limit: Option<usize>) -> Result<Vec<PartitionInfo>> {
// Get all partition events from the event log
let events = self.event_log.get_events_in_range(0, i64::MAX).await?;
let mut partition_data: HashMap<String, Vec<PartitionStatusEvent>> = HashMap::new();
// Collect all partition events
for event in events {
if let Some(build_event::EventType::PartitionEvent(p_event)) = &event.event_type {
if let Some(partition_ref) = &p_event.partition_ref {
let status = match p_event.status_code {
1 => PartitionStatus::PartitionRequested,
2 => PartitionStatus::PartitionAnalyzed,
3 => PartitionStatus::PartitionBuilding,
4 => PartitionStatus::PartitionAvailable,
5 => PartitionStatus::PartitionFailed,
6 => PartitionStatus::PartitionDelegated,
_ => PartitionStatus::PartitionUnknown,
};
let status_event = PartitionStatusEvent {
timestamp: event.timestamp,
status,
message: p_event.message.clone(),
build_request_id: event.build_request_id.clone(),
job_run_id: if p_event.job_run_id.is_empty() { None } else { Some(p_event.job_run_id.clone()) },
};
partition_data.entry(partition_ref.str.clone())
.or_insert_with(Vec::new)
.push(status_event);
}
}
// Also check for partition invalidation events
if let Some(build_event::EventType::PartitionInvalidationEvent(pi_event)) = &event.event_type {
if let Some(partition_ref) = &pi_event.partition_ref {
let status_event = PartitionStatusEvent {
timestamp: event.timestamp,
status: PartitionStatus::PartitionUnknown, // Invalidated
message: format!("Invalidated: {}", pi_event.reason),
build_request_id: event.build_request_id.clone(),
job_run_id: None,
};
partition_data.entry(partition_ref.str.clone())
.or_insert_with(Vec::new)
.push(status_event);
}
}
}
// Convert to PartitionInfo structs
let mut partition_infos: Vec<PartitionInfo> = partition_data.into_iter()
.map(|(partition_ref, mut events)| {
// Sort events by timestamp
events.sort_by_key(|e| e.timestamp);
// Get current status from latest event
let (current_status, last_updated) = events.last()
.map(|e| (e.status.clone(), e.timestamp))
.unwrap_or((PartitionStatus::PartitionUnknown, 0));
// Count builds and find last successful build
let builds: std::collections::HashSet<String> = events.iter()
.map(|e| e.build_request_id.clone())
.collect();
let last_successful_build = events.iter()
.rev()
.find(|e| e.status == PartitionStatus::PartitionAvailable)
.map(|e| e.build_request_id.clone());
// Count invalidations
let invalidation_count = events.iter()
.filter(|e| e.message.starts_with("Invalidated:"))
.count();
PartitionInfo {
partition_ref,
current_status,
last_updated,
builds_count: builds.len(),
last_successful_build,
invalidation_count,
}
})
.collect();
// Sort by most recently updated
partition_infos.sort_by(|a, b| b.last_updated.cmp(&a.last_updated));
// Apply limit if specified
if let Some(limit) = limit {
partition_infos.truncate(limit);
}
Ok(partition_infos)
}
/// Show detailed information about a specific partition
///
/// Returns the complete timeline of status changes for the specified partition,
/// including all builds that have referenced it.
pub async fn show(&self, partition_ref: &str) -> Result<Option<(PartitionInfo, Vec<PartitionStatusEvent>)>> {
// Get all events for this partition
let events = self.event_log.get_partition_events(partition_ref, None).await?;
if events.is_empty() {
return Ok(None);
}
let mut status_events = Vec::new();
let mut builds = std::collections::HashSet::new();
// Process partition events
for event in &events {
if let Some(build_event::EventType::PartitionEvent(p_event)) = &event.event_type {
let status = match p_event.status_code {
1 => PartitionStatus::PartitionRequested,
2 => PartitionStatus::PartitionAnalyzed,
3 => PartitionStatus::PartitionBuilding,
4 => PartitionStatus::PartitionAvailable,
5 => PartitionStatus::PartitionFailed,
6 => PartitionStatus::PartitionDelegated,
_ => PartitionStatus::PartitionUnknown,
};
status_events.push(PartitionStatusEvent {
timestamp: event.timestamp,
status,
message: p_event.message.clone(),
build_request_id: event.build_request_id.clone(),
job_run_id: if p_event.job_run_id.is_empty() { None } else { Some(p_event.job_run_id.clone()) },
});
builds.insert(event.build_request_id.clone());
}
}
// Also check for invalidation events in all events
let all_events = self.event_log.get_events_in_range(0, i64::MAX).await?;
let mut invalidation_count = 0;
for event in all_events {
if let Some(build_event::EventType::PartitionInvalidationEvent(pi_event)) = &event.event_type {
if let Some(partition) = &pi_event.partition_ref {
if partition.str == partition_ref {
status_events.push(PartitionStatusEvent {
timestamp: event.timestamp,
status: PartitionStatus::PartitionUnknown, // Invalidated
message: format!("Invalidated: {}", pi_event.reason),
build_request_id: event.build_request_id.clone(),
job_run_id: None,
});
invalidation_count += 1;
}
}
}
}
// Sort events by timestamp
status_events.sort_by_key(|e| e.timestamp);
// Get current status from latest event
let (current_status, last_updated) = status_events.last()
.map(|e| (e.status.clone(), e.timestamp))
.unwrap_or((PartitionStatus::PartitionUnknown, 0));
// Find last successful build
let last_successful_build = status_events.iter()
.rev()
.find(|e| e.status == PartitionStatus::PartitionAvailable)
.map(|e| e.build_request_id.clone());
let partition_info = PartitionInfo {
partition_ref: partition_ref.to_string(),
current_status,
last_updated,
builds_count: builds.len(),
last_successful_build,
invalidation_count,
};
Ok(Some((partition_info, status_events)))
}
/// Invalidate a partition with a reason
///
/// This method uses the EventWriter to write a partition invalidation event.
/// It validates that the partition exists before invalidating it.
pub async fn invalidate(&self, partition_ref: &str, reason: String, build_request_id: String) -> Result<()> {
// First check if the partition exists
let partition_exists = self.show(partition_ref).await?.is_some();
if !partition_exists {
return Err(BuildEventLogError::QueryError(
format!("Cannot invalidate non-existent partition: {}", partition_ref)
));
}
// Use EventWriter to write the invalidation event
let event_writer = crate::event_log::writer::EventWriter::new(self.event_log.clone());
let partition = PartitionRef { str: partition_ref.to_string() };
event_writer.invalidate_partition(build_request_id, partition, reason).await
}
/// List partitions returning protobuf response format with dual status fields
///
/// This method provides the unified CLI/Service response format with both
/// status codes (enum values) and status names (human-readable strings).
pub async fn list_protobuf(&self, request: PartitionsListRequest) -> Result<PartitionsListResponse> {
// Get legacy format data
let partition_infos = self.list(request.limit.map(|l| l as usize)).await?;
// Convert to protobuf format with dual status fields
let partitions: Vec<PartitionSummary> = partition_infos.into_iter()
.map(|info| {
list_response_helpers::create_partition_summary(
info.partition_ref,
info.current_status,
info.last_updated,
info.builds_count,
info.invalidation_count,
info.last_successful_build,
)
})
.collect();
// TODO: Implement proper pagination with offset and has_more
// For now, return simple response without full pagination support
let total_count = partitions.len() as u32;
let has_more = false; // This would be calculated based on actual total vs returned
Ok(PartitionsListResponse {
partitions,
total_count,
has_more,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event_log::mock::{MockBuildEventLog, test_events};
#[tokio::test]
async fn test_partitions_repository_list_empty() {
let mock_log = Arc::new(MockBuildEventLog::new().await.unwrap());
let repo = PartitionsRepository::new(mock_log);
let partitions = repo.list(None).await.unwrap();
assert!(partitions.is_empty());
}
#[tokio::test]
async fn test_partitions_repository_list_with_data() {
let build_id = "test-build-123".to_string();
let partition1 = PartitionRef { str: "data/users".to_string() };
let partition2 = PartitionRef { str: "data/orders".to_string() };
// Create events for multiple partitions
let events = vec![
test_events::build_request_received(Some(build_id.clone()), vec![partition1.clone(), partition2.clone()]),
test_events::partition_status(Some(build_id.clone()), partition1.clone(), PartitionStatus::PartitionBuilding, None),
test_events::partition_status(Some(build_id.clone()), partition1.clone(), PartitionStatus::PartitionAvailable, None),
test_events::partition_status(Some(build_id.clone()), partition2.clone(), PartitionStatus::PartitionBuilding, None),
test_events::partition_status(Some(build_id.clone()), partition2.clone(), PartitionStatus::PartitionFailed, None),
];
let mock_log = Arc::new(MockBuildEventLog::with_events(events).await.unwrap());
let repo = PartitionsRepository::new(mock_log);
let partitions = repo.list(None).await.unwrap();
assert_eq!(partitions.len(), 2);
// Find partitions by name
let users_partition = partitions.iter().find(|p| p.partition_ref == "data/users").unwrap();
let orders_partition = partitions.iter().find(|p| p.partition_ref == "data/orders").unwrap();
assert_eq!(users_partition.current_status, PartitionStatus::PartitionAvailable);
assert_eq!(orders_partition.current_status, PartitionStatus::PartitionFailed);
assert_eq!(users_partition.builds_count, 1);
assert_eq!(orders_partition.builds_count, 1);
}
#[tokio::test]
async fn test_partitions_repository_show() {
let build_id = "test-build-456".to_string();
let partition = PartitionRef { str: "analytics/metrics".to_string() };
let events = vec![
test_events::partition_status(Some(build_id.clone()), partition.clone(), PartitionStatus::PartitionRequested, None),
test_events::partition_status(Some(build_id.clone()), partition.clone(), PartitionStatus::PartitionBuilding, None),
test_events::partition_status(Some(build_id.clone()), partition.clone(), PartitionStatus::PartitionAvailable, None),
];
let mock_log = Arc::new(MockBuildEventLog::with_events(events).await.unwrap());
let repo = PartitionsRepository::new(mock_log);
let result = repo.show(&partition.str).await.unwrap();
assert!(result.is_some());
let (info, timeline) = result.unwrap();
assert_eq!(info.partition_ref, "analytics/metrics");
assert_eq!(info.current_status, PartitionStatus::PartitionAvailable);
assert_eq!(info.builds_count, 1);
assert_eq!(timeline.len(), 3);
// Verify timeline order
assert_eq!(timeline[0].status, PartitionStatus::PartitionRequested);
assert_eq!(timeline[1].status, PartitionStatus::PartitionBuilding);
assert_eq!(timeline[2].status, PartitionStatus::PartitionAvailable);
}
#[tokio::test]
async fn test_partitions_repository_show_nonexistent() {
let mock_log = Arc::new(MockBuildEventLog::new().await.unwrap());
let repo = PartitionsRepository::new(mock_log);
let result = repo.show("nonexistent/partition").await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_partitions_repository_invalidate() {
let build_id = "test-build-789".to_string();
let partition = PartitionRef { str: "temp/data".to_string() };
// Start with an existing partition
let events = vec![
test_events::partition_status(Some(build_id.clone()), partition.clone(), PartitionStatus::PartitionAvailable, None),
];
let mock_log = Arc::new(MockBuildEventLog::with_events(events).await.unwrap());
let repo = PartitionsRepository::new(mock_log.clone());
// Invalidate the partition
repo.invalidate(&partition.str, "Test invalidation".to_string(), build_id.clone()).await.unwrap();
// Verify the invalidation was recorded
// Note: This test demonstrates the pattern, but the MockBuildEventLog would need
// to be enhanced to properly store invalidation events for full verification
// Try to invalidate a non-existent partition
let result = repo.invalidate("nonexistent/partition", "Should fail".to_string(), build_id).await;
assert!(result.is_err());
}
}

View file

@ -0,0 +1,440 @@
use crate::*;
use crate::event_log::{BuildEventLog, BuildEventLogError, Result};
use std::sync::Arc;
use std::collections::HashMap;
use serde::Serialize;
/// Repository for querying task (job run) data from the build event log
pub struct TasksRepository {
event_log: Arc<dyn BuildEventLog>,
}
/// Summary of a task's execution
#[derive(Debug, Clone, Serialize)]
pub struct TaskInfo {
pub job_run_id: String,
pub job_label: String,
pub build_request_id: String,
pub status: JobStatus,
pub target_partitions: Vec<PartitionRef>,
pub scheduled_at: i64,
pub started_at: Option<i64>,
pub completed_at: Option<i64>,
pub duration_ms: Option<i64>,
pub message: String,
pub config: Option<JobConfig>,
pub manifests: Vec<PartitionManifest>,
pub cancelled: bool,
pub cancel_reason: Option<String>,
}
/// Detailed timeline of a task's execution events
#[derive(Debug, Clone, Serialize)]
pub struct TaskEvent {
pub timestamp: i64,
pub event_type: String,
pub status: Option<JobStatus>,
pub message: String,
pub cancel_reason: Option<String>,
}
impl TasksRepository {
/// Create a new TasksRepository
pub fn new(event_log: Arc<dyn BuildEventLog>) -> Self {
Self { event_log }
}
/// List all tasks with their current status
///
/// Returns a list of all job runs (tasks) that have been executed,
/// including their current status and execution details.
pub async fn list(&self, limit: Option<usize>) -> Result<Vec<TaskInfo>> {
// Get all events from the event log
let events = self.event_log.get_events_in_range(0, i64::MAX).await?;
let mut task_data: HashMap<String, TaskInfo> = HashMap::new();
let mut task_cancellations: HashMap<String, String> = HashMap::new();
// First pass: collect all task cancel events
for event in &events {
if let Some(build_event::EventType::TaskCancelEvent(tc_event)) = &event.event_type {
task_cancellations.insert(tc_event.job_run_id.clone(), tc_event.reason.clone());
}
}
// Second pass: collect all job events and build task information
for event in events {
if let Some(build_event::EventType::JobEvent(j_event)) = &event.event_type {
let job_label = j_event.job_label.as_ref()
.map(|l| l.label.clone())
.unwrap_or_else(|| "unknown".to_string());
let status = match j_event.status_code {
1 => JobStatus::JobScheduled,
2 => JobStatus::JobRunning,
3 => JobStatus::JobCompleted,
4 => JobStatus::JobFailed,
5 => JobStatus::JobCancelled,
6 => JobStatus::JobSkipped,
_ => JobStatus::JobUnknown,
};
// Create or update task info
let task = task_data.entry(j_event.job_run_id.clone()).or_insert_with(|| {
TaskInfo {
job_run_id: j_event.job_run_id.clone(),
job_label: job_label.clone(),
build_request_id: event.build_request_id.clone(),
status: JobStatus::JobUnknown,
target_partitions: j_event.target_partitions.clone(),
scheduled_at: event.timestamp,
started_at: None,
completed_at: None,
duration_ms: None,
message: String::new(),
config: None,
manifests: vec![],
cancelled: false,
cancel_reason: None,
}
});
// Update task with new information
task.status = status;
task.message = j_event.message.clone();
match status {
JobStatus::JobScheduled => {
task.scheduled_at = event.timestamp;
if let Some(config) = &j_event.config {
task.config = Some(config.clone());
}
}
JobStatus::JobRunning => {
task.started_at = Some(event.timestamp);
}
JobStatus::JobCompleted | JobStatus::JobFailed | JobStatus::JobCancelled => {
task.completed_at = Some(event.timestamp);
if let Some(started) = task.started_at {
task.duration_ms = Some((event.timestamp - started) / 1_000_000); // Convert to ms
}
task.manifests = j_event.manifests.clone();
}
_ => {}
}
// Check if this task was cancelled
if let Some(cancel_reason) = task_cancellations.get(&j_event.job_run_id) {
task.cancelled = true;
task.cancel_reason = Some(cancel_reason.clone());
}
}
}
// Convert to vector and sort by scheduled time (most recent first)
let mut tasks: Vec<TaskInfo> = task_data.into_values().collect();
tasks.sort_by(|a, b| b.scheduled_at.cmp(&a.scheduled_at));
// Apply limit if specified
if let Some(limit) = limit {
tasks.truncate(limit);
}
Ok(tasks)
}
/// Show detailed information about a specific task
///
/// Returns the complete timeline of events for the specified task,
/// including all status changes and any cancellation events.
pub async fn show(&self, job_run_id: &str) -> Result<Option<(TaskInfo, Vec<TaskEvent>)>> {
// Get all events for this specific job run
let job_events = self.event_log.get_job_run_events(job_run_id).await?;
if job_events.is_empty() {
return Ok(None);
}
let mut task_info: Option<TaskInfo> = None;
let mut timeline: Vec<TaskEvent> = Vec::new();
// Process job events to build task information
for event in &job_events {
if let Some(build_event::EventType::JobEvent(j_event)) = &event.event_type {
let job_label = j_event.job_label.as_ref()
.map(|l| l.label.clone())
.unwrap_or_else(|| "unknown".to_string());
let status = match j_event.status_code {
1 => JobStatus::JobScheduled,
2 => JobStatus::JobRunning,
3 => JobStatus::JobCompleted,
4 => JobStatus::JobFailed,
5 => JobStatus::JobCancelled,
6 => JobStatus::JobSkipped,
_ => JobStatus::JobUnknown,
};
// Create or update task info
if task_info.is_none() {
task_info = Some(TaskInfo {
job_run_id: j_event.job_run_id.clone(),
job_label: job_label.clone(),
build_request_id: event.build_request_id.clone(),
status: JobStatus::JobUnknown,
target_partitions: j_event.target_partitions.clone(),
scheduled_at: event.timestamp,
started_at: None,
completed_at: None,
duration_ms: None,
message: String::new(),
config: None,
manifests: vec![],
cancelled: false,
cancel_reason: None,
});
}
let task = task_info.as_mut().unwrap();
task.status = status;
task.message = j_event.message.clone();
match status {
JobStatus::JobScheduled => {
task.scheduled_at = event.timestamp;
if let Some(config) = &j_event.config {
task.config = Some(config.clone());
}
}
JobStatus::JobRunning => {
task.started_at = Some(event.timestamp);
}
JobStatus::JobCompleted | JobStatus::JobFailed | JobStatus::JobCancelled => {
task.completed_at = Some(event.timestamp);
if let Some(started) = task.started_at {
task.duration_ms = Some((event.timestamp - started) / 1_000_000); // Convert to ms
}
task.manifests = j_event.manifests.clone();
}
_ => {}
}
// Add to timeline
timeline.push(TaskEvent {
timestamp: event.timestamp,
event_type: "job_status_change".to_string(),
status: Some(status),
message: j_event.message.clone(),
cancel_reason: None,
});
}
}
// Also check for task cancel events in all events
let all_events = self.event_log.get_events_in_range(0, i64::MAX).await?;
for event in all_events {
if let Some(build_event::EventType::TaskCancelEvent(tc_event)) = &event.event_type {
if tc_event.job_run_id == job_run_id {
if let Some(task) = task_info.as_mut() {
task.cancelled = true;
task.cancel_reason = Some(tc_event.reason.clone());
}
timeline.push(TaskEvent {
timestamp: event.timestamp,
event_type: "task_cancel".to_string(),
status: None,
message: "Task cancelled".to_string(),
cancel_reason: Some(tc_event.reason.clone()),
});
}
}
}
// Sort timeline by timestamp
timeline.sort_by_key(|e| e.timestamp);
Ok(task_info.map(|info| (info, timeline)))
}
/// Cancel a task with a reason
///
/// This method uses the EventWriter to write a task cancellation event.
/// It validates that the task exists and is in a cancellable state.
pub async fn cancel(&self, job_run_id: &str, reason: String, build_request_id: String) -> Result<()> {
// First check if the task exists and get its current status
let task_info = self.show(job_run_id).await?;
if task_info.is_none() {
return Err(BuildEventLogError::QueryError(
format!("Cannot cancel non-existent task: {}", job_run_id)
));
}
let (task, _timeline) = task_info.unwrap();
// Check if task is in a cancellable state
match task.status {
JobStatus::JobCompleted => {
return Err(BuildEventLogError::QueryError(
format!("Cannot cancel completed task: {}", job_run_id)
));
}
JobStatus::JobFailed => {
return Err(BuildEventLogError::QueryError(
format!("Cannot cancel failed task: {}", job_run_id)
));
}
JobStatus::JobCancelled => {
return Err(BuildEventLogError::QueryError(
format!("Task already cancelled: {}", job_run_id)
));
}
_ => {}
}
// Use EventWriter to write the cancellation event
let event_writer = crate::event_log::writer::EventWriter::new(self.event_log.clone());
event_writer.cancel_task(build_request_id, job_run_id.to_string(), reason).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event_log::mock::{MockBuildEventLog, test_events};
#[tokio::test]
async fn test_tasks_repository_list_empty() {
let mock_log = Arc::new(MockBuildEventLog::new().await.unwrap());
let repo = TasksRepository::new(mock_log);
let tasks = repo.list(None).await.unwrap();
assert!(tasks.is_empty());
}
#[tokio::test]
async fn test_tasks_repository_list_with_data() {
let build_id = "test-build-123".to_string();
let job_label = JobLabel { label: "//:process_data".to_string() };
let partition = PartitionRef { str: "data/users".to_string() };
// Create events for multiple tasks
let events = vec![
test_events::job_event(Some(build_id.clone()), Some("task-1".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobScheduled),
test_events::job_event(Some(build_id.clone()), Some("task-1".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobCompleted),
test_events::job_event(Some(build_id.clone()), Some("task-2".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobScheduled),
test_events::job_event(Some(build_id.clone()), Some("task-2".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobFailed),
];
let mock_log = Arc::new(MockBuildEventLog::with_events(events).await.unwrap());
let repo = TasksRepository::new(mock_log);
let tasks = repo.list(None).await.unwrap();
assert_eq!(tasks.len(), 2);
// Find tasks by job run id
let task1 = tasks.iter().find(|t| t.job_run_id == "task-1").unwrap();
let task2 = tasks.iter().find(|t| t.job_run_id == "task-2").unwrap();
assert_eq!(task1.status, JobStatus::JobCompleted);
assert_eq!(task1.job_label, "//:process_data");
assert!(!task1.cancelled);
assert_eq!(task2.status, JobStatus::JobFailed);
assert_eq!(task2.job_label, "//:process_data");
assert!(!task2.cancelled);
}
#[tokio::test]
async fn test_tasks_repository_show() {
let build_id = "test-build-456".to_string();
let job_label = JobLabel { label: "//:analytics_task".to_string() };
let partition = PartitionRef { str: "analytics/daily".to_string() };
let events = vec![
test_events::job_event(Some(build_id.clone()), Some("task-123".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobScheduled),
test_events::job_event(Some(build_id.clone()), Some("task-123".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobRunning),
test_events::job_event(Some(build_id.clone()), Some("task-123".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobCompleted),
];
let mock_log = Arc::new(MockBuildEventLog::with_events(events).await.unwrap());
let repo = TasksRepository::new(mock_log);
let result = repo.show("task-123").await.unwrap();
assert!(result.is_some());
let (info, timeline) = result.unwrap();
assert_eq!(info.job_run_id, "task-123");
assert_eq!(info.job_label, "//:analytics_task");
assert_eq!(info.status, JobStatus::JobCompleted);
assert!(!info.cancelled);
assert_eq!(timeline.len(), 3);
assert_eq!(timeline[0].status, Some(JobStatus::JobScheduled));
assert_eq!(timeline[1].status, Some(JobStatus::JobRunning));
assert_eq!(timeline[2].status, Some(JobStatus::JobCompleted));
}
#[tokio::test]
async fn test_tasks_repository_show_nonexistent() {
let mock_log = Arc::new(MockBuildEventLog::new().await.unwrap());
let repo = TasksRepository::new(mock_log);
let result = repo.show("nonexistent-task").await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_tasks_repository_cancel() {
let build_id = "test-build-789".to_string();
let job_label = JobLabel { label: "//:batch_task".to_string() };
let partition = PartitionRef { str: "batch/data".to_string() };
// Start with a running task
let events = vec![
test_events::job_event(Some(build_id.clone()), Some("task-456".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobScheduled),
test_events::job_event(Some(build_id.clone()), Some("task-456".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobRunning),
];
let mock_log = Arc::new(MockBuildEventLog::with_events(events).await.unwrap());
let repo = TasksRepository::new(mock_log.clone());
// Cancel the task
repo.cancel("task-456", "User requested cancellation".to_string(), build_id.clone()).await.unwrap();
// Verify the cancellation was recorded
// Note: This test demonstrates the pattern, but the MockBuildEventLog would need
// to be enhanced to properly store task cancel events for full verification
// Try to cancel a non-existent task
let result = repo.cancel("nonexistent-task", "Should fail".to_string(), build_id).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_tasks_repository_cancel_completed_task() {
let build_id = "test-build-999".to_string();
let job_label = JobLabel { label: "//:completed_task".to_string() };
let partition = PartitionRef { str: "test/data".to_string() };
// Create a completed task
let events = vec![
test_events::job_event(Some(build_id.clone()), Some("completed-task".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobScheduled),
test_events::job_event(Some(build_id.clone()), Some("completed-task".to_string()), job_label.clone(), vec![partition.clone()], JobStatus::JobCompleted),
];
let mock_log = Arc::new(MockBuildEventLog::with_events(events).await.unwrap());
let repo = TasksRepository::new(mock_log);
// Try to cancel the completed task - should fail
let result = repo.cancel("completed-task", "Should fail".to_string(), build_id).await;
assert!(result.is_err());
if let Err(BuildEventLogError::QueryError(msg)) = result {
assert!(msg.contains("Cannot cancel completed task"));
} else {
panic!("Expected QueryError for completed task cancellation");
}
}
}

View file

@ -168,10 +168,10 @@ pub async fn get_build_status(
// Extract information from build request events
if let Some(crate::build_event::EventType::BuildRequestEvent(req_event)) = &event.event_type {
info!("Processing BuildRequestEvent: status={}, message='{}'", req_event.status, req_event.message);
info!("Processing BuildRequestEvent: status={}, message='{}'", req_event.status_code, req_event.message);
// Update status with the latest event - convert from i32 to enum
status = Some(match req_event.status {
status = Some(match req_event.status_code {
0 => BuildRequestStatus::BuildRequestUnknown, // Default protobuf value - should not happen in production
1 => BuildRequestStatus::BuildRequestReceived,
2 => BuildRequestStatus::BuildRequestPlanning,
@ -288,7 +288,7 @@ pub struct CancelBuildRequest {
pub async fn cancel_build_request(
State(service): State<ServiceState>,
Path(CancelBuildRequest { build_request_id }): Path<CancelBuildRequest>,
) -> Result<Json<serde_json::Value>, (StatusCode, Json<ErrorResponse>)> {
) -> Result<Json<BuildCancelResponse>, (StatusCode, Json<ErrorResponse>)> {
// Update build request state
{
let mut active_builds = service.active_builds.write().await;
@ -309,7 +309,8 @@ pub async fn cancel_build_request(
let event = create_build_event(
build_request_id.clone(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status: BuildRequestStatus::BuildRequestCancelled as i32,
status_code: BuildRequestStatus::BuildRequestCancelled as i32,
status_name: BuildRequestStatus::BuildRequestCancelled.to_display_string(),
requested_partitions: vec![],
message: "Build request cancelled".to_string(),
}),
@ -321,10 +322,10 @@ pub async fn cancel_build_request(
info!("Build request {} cancelled", build_request_id);
Ok(Json(serde_json::json!({
"cancelled": true,
"build_request_id": build_request_id
})))
Ok(Json(BuildCancelResponse {
cancelled: true,
build_request_id,
}))
}
#[derive(Deserialize, JsonSchema)]
@ -801,6 +802,48 @@ pub async fn list_partitions(
}
}
// New unified protobuf-based handler for future migration
pub async fn list_partitions_unified(
State(service): State<ServiceState>,
Query(params): Query<HashMap<String, String>>,
) -> Result<Json<crate::PartitionsListResponse>, (StatusCode, Json<ErrorResponse>)> {
let limit = params.get("limit")
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(20)
.min(100); // Cap at 100
let offset = params.get("offset")
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(0);
let status_filter = params.get("status")
.and_then(|s| crate::PartitionStatus::from_display_string(s));
// Use repository with protobuf response format
let repository = crate::repositories::partitions::PartitionsRepository::new(service.event_log.clone());
let request = crate::PartitionsListRequest {
limit: Some(limit),
offset: Some(offset),
status_filter: status_filter.map(|s| s.to_display_string()),
};
match repository.list_protobuf(request).await {
Ok(response) => {
Ok(Json(response))
}
Err(e) => {
error!("Failed to list partitions: {}", e);
Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: format!("Failed to list partitions: {}", e),
}),
))
}
}
}
pub async fn get_activity_summary(
State(service): State<ServiceState>,
) -> Result<Json<ActivityResponse>, (StatusCode, Json<ErrorResponse>)> {
@ -1235,15 +1278,15 @@ pub async fn invalidate_partition(
State(service): State<ServiceState>,
Path(PartitionInvalidatePathRequest { partition_ref }): Path<PartitionInvalidatePathRequest>,
Json(request): Json<InvalidatePartitionRequest>,
) -> Result<Json<serde_json::Value>, (StatusCode, Json<ErrorResponse>)> {
) -> Result<Json<PartitionInvalidateResponse>, (StatusCode, Json<ErrorResponse>)> {
let repository = PartitionsRepository::new(service.event_log.clone());
match repository.invalidate(&partition_ref, request.reason.clone(), request.build_request_id).await {
Ok(()) => Ok(Json(serde_json::json!({
"invalidated": true,
"partition_ref": partition_ref,
"reason": request.reason
}))),
Ok(()) => Ok(Json(PartitionInvalidateResponse {
invalidated: true,
partition_ref,
reason: request.reason,
})),
Err(e) => {
error!("Failed to invalidate partition: {}", e);
Err((
@ -1478,15 +1521,15 @@ pub async fn cancel_task(
State(service): State<ServiceState>,
Path(TaskCancelPathRequest { job_run_id }): Path<TaskCancelPathRequest>,
Json(request): Json<CancelTaskRequest>,
) -> Result<Json<serde_json::Value>, (StatusCode, Json<ErrorResponse>)> {
) -> Result<Json<TaskCancelResponse>, (StatusCode, Json<ErrorResponse>)> {
let repository = TasksRepository::new(service.event_log.clone());
match repository.cancel(&job_run_id, request.reason.clone(), request.build_request_id).await {
Ok(()) => Ok(Json(serde_json::json!({
"cancelled": true,
"job_run_id": job_run_id,
"reason": request.reason
}))),
Ok(()) => Ok(Json(TaskCancelResponse {
cancelled: true,
job_run_id,
reason: request.reason,
})),
Err(e) => {
error!("Failed to cancel task: {}", e);
Err((
@ -1615,15 +1658,14 @@ pub async fn cancel_build_repository(
State(service): State<ServiceState>,
Path(BuildCancelPathRequest { build_request_id }): Path<BuildCancelPathRequest>,
Json(request): Json<CancelBuildRepositoryRequest>,
) -> Result<Json<serde_json::Value>, (StatusCode, Json<ErrorResponse>)> {
) -> Result<Json<BuildCancelRepositoryResponse>, (StatusCode, Json<ErrorResponse>)> {
let repository = BuildsRepository::new(service.event_log.clone());
match repository.cancel(&build_request_id, request.reason.clone()).await {
Ok(()) => Ok(Json(serde_json::json!({
"cancelled": true,
"build_request_id": build_request_id,
"reason": request.reason
}))),
Ok(()) => Ok(Json(BuildCancelRepositoryResponse {
cancelled: true,
build_request_id,
})),
Err(e) => {
error!("Failed to cancel build: {}", e);
Err((

View file

@ -93,14 +93,48 @@ pub struct AnalyzeRequest {
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
pub struct AnalyzeResponse {
#[schemars(schema_with = "job_graph_schema")]
pub job_graph: serde_json::Value,
}
fn job_graph_schema(_gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema {
schemars::schema::Schema::Object(schemars::schema::SchemaObject {
instance_type: Some(schemars::schema::SingleOrVec::Single(Box::new(schemars::schema::InstanceType::Object))),
..Default::default()
})
}
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
pub struct ErrorResponse {
pub error: String,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
pub struct BuildCancelResponse {
pub cancelled: bool,
pub build_request_id: String,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
pub struct BuildCancelRepositoryResponse {
pub cancelled: bool,
pub build_request_id: String,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
pub struct PartitionInvalidateResponse {
pub invalidated: bool,
pub partition_ref: String,
pub reason: String,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
pub struct TaskCancelResponse {
pub cancelled: bool,
pub job_run_id: String,
pub reason: String,
}
// List endpoints request/response types
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
pub struct BuildsListResponse {

239
databuild/status_utils.rs Normal file
View file

@ -0,0 +1,239 @@
use crate::*;
/// Utilities for converting status enums to human-readable strings
/// This provides consistent status naming across CLI and Service interfaces
impl PartitionStatus {
/// Convert partition status to human-readable string matching current CLI/service format
pub fn to_display_string(&self) -> String {
match self {
PartitionStatus::PartitionUnknown => "unknown".to_string(),
PartitionStatus::PartitionRequested => "requested".to_string(),
PartitionStatus::PartitionAnalyzed => "analyzed".to_string(),
PartitionStatus::PartitionBuilding => "building".to_string(),
PartitionStatus::PartitionAvailable => "available".to_string(),
PartitionStatus::PartitionFailed => "failed".to_string(),
PartitionStatus::PartitionDelegated => "delegated".to_string(),
}
}
/// Parse a display string back to enum (for filtering, etc.)
pub fn from_display_string(s: &str) -> Option<Self> {
match s {
"unknown" => Some(PartitionStatus::PartitionUnknown),
"requested" => Some(PartitionStatus::PartitionRequested),
"analyzed" => Some(PartitionStatus::PartitionAnalyzed),
"building" => Some(PartitionStatus::PartitionBuilding),
"available" => Some(PartitionStatus::PartitionAvailable),
"failed" => Some(PartitionStatus::PartitionFailed),
"delegated" => Some(PartitionStatus::PartitionDelegated),
_ => None,
}
}
}
impl JobStatus {
/// Convert job status to human-readable string matching current CLI/service format
pub fn to_display_string(&self) -> String {
match self {
JobStatus::JobUnknown => "unknown".to_string(),
JobStatus::JobScheduled => "scheduled".to_string(),
JobStatus::JobRunning => "running".to_string(),
JobStatus::JobCompleted => "completed".to_string(),
JobStatus::JobFailed => "failed".to_string(),
JobStatus::JobCancelled => "cancelled".to_string(),
JobStatus::JobSkipped => "skipped".to_string(),
}
}
/// Parse a display string back to enum
pub fn from_display_string(s: &str) -> Option<Self> {
match s {
"unknown" => Some(JobStatus::JobUnknown),
"scheduled" => Some(JobStatus::JobScheduled),
"running" => Some(JobStatus::JobRunning),
"completed" => Some(JobStatus::JobCompleted),
"failed" => Some(JobStatus::JobFailed),
"cancelled" => Some(JobStatus::JobCancelled),
"skipped" => Some(JobStatus::JobSkipped),
_ => None,
}
}
}
impl BuildRequestStatus {
/// Convert build request status to human-readable string matching current CLI/service format
pub fn to_display_string(&self) -> String {
match self {
BuildRequestStatus::BuildRequestUnknown => "unknown".to_string(),
BuildRequestStatus::BuildRequestReceived => "received".to_string(),
BuildRequestStatus::BuildRequestPlanning => "planning".to_string(),
BuildRequestStatus::BuildRequestAnalysisCompleted => "analysis_completed".to_string(),
BuildRequestStatus::BuildRequestExecuting => "executing".to_string(),
BuildRequestStatus::BuildRequestCompleted => "completed".to_string(),
BuildRequestStatus::BuildRequestFailed => "failed".to_string(),
BuildRequestStatus::BuildRequestCancelled => "cancelled".to_string(),
}
}
/// Parse a display string back to enum
pub fn from_display_string(s: &str) -> Option<Self> {
match s {
"unknown" => Some(BuildRequestStatus::BuildRequestUnknown),
"received" => Some(BuildRequestStatus::BuildRequestReceived),
"planning" => Some(BuildRequestStatus::BuildRequestPlanning),
"analysis_completed" => Some(BuildRequestStatus::BuildRequestAnalysisCompleted),
"executing" => Some(BuildRequestStatus::BuildRequestExecuting),
"completed" => Some(BuildRequestStatus::BuildRequestCompleted),
"failed" => Some(BuildRequestStatus::BuildRequestFailed),
"cancelled" => Some(BuildRequestStatus::BuildRequestCancelled),
_ => None,
}
}
}
/// Helper functions for creating protobuf list responses with dual status fields
pub mod list_response_helpers {
use super::*;
/// Create a PartitionSummary from repository data
pub fn create_partition_summary(
partition_ref: String,
status: PartitionStatus,
last_updated: i64,
builds_count: usize,
invalidation_count: usize,
last_successful_build: Option<String>,
) -> PartitionSummary {
PartitionSummary {
partition_ref,
status_code: status as i32,
status_name: status.to_display_string(),
last_updated,
builds_count: builds_count as u32,
invalidation_count: invalidation_count as u32,
last_successful_build,
}
}
/// Create a JobSummary from repository data
pub fn create_job_summary(
job_label: String,
total_runs: usize,
successful_runs: usize,
failed_runs: usize,
cancelled_runs: usize,
average_partitions_per_run: f64,
last_run_timestamp: i64,
last_run_status: JobStatus,
recent_builds: Vec<String>,
) -> JobSummary {
JobSummary {
job_label,
total_runs: total_runs as u32,
successful_runs: successful_runs as u32,
failed_runs: failed_runs as u32,
cancelled_runs: cancelled_runs as u32,
average_partitions_per_run,
last_run_timestamp,
last_run_status_code: last_run_status as i32,
last_run_status_name: last_run_status.to_display_string(),
recent_builds,
}
}
/// Create a TaskSummary from repository data
pub fn create_task_summary(
job_run_id: String,
job_label: String,
build_request_id: String,
status: JobStatus,
target_partitions: Vec<PartitionRef>,
scheduled_at: i64,
started_at: Option<i64>,
completed_at: Option<i64>,
duration_ms: Option<i64>,
cancelled: bool,
message: String,
) -> TaskSummary {
TaskSummary {
job_run_id,
job_label,
build_request_id,
status_code: status as i32,
status_name: status.to_display_string(),
target_partitions,
scheduled_at,
started_at,
completed_at,
duration_ms,
cancelled,
message,
}
}
/// Create a BuildSummary from repository data
pub fn create_build_summary(
build_request_id: String,
status: BuildRequestStatus,
requested_partitions: Vec<PartitionRef>,
total_jobs: usize,
completed_jobs: usize,
failed_jobs: usize,
cancelled_jobs: usize,
requested_at: i64,
started_at: Option<i64>,
completed_at: Option<i64>,
duration_ms: Option<i64>,
cancelled: bool,
) -> BuildSummary {
BuildSummary {
build_request_id,
status_code: status as i32,
status_name: status.to_display_string(),
requested_partitions,
total_jobs: total_jobs as u32,
completed_jobs: completed_jobs as u32,
failed_jobs: failed_jobs as u32,
cancelled_jobs: cancelled_jobs as u32,
requested_at,
started_at,
completed_at,
duration_ms,
cancelled,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_partition_status_conversions() {
let status = PartitionStatus::PartitionAvailable;
assert_eq!(status.to_display_string(), "available");
assert_eq!(PartitionStatus::from_display_string("available"), Some(status));
}
#[test]
fn test_job_status_conversions() {
let status = JobStatus::JobCompleted;
assert_eq!(status.to_display_string(), "completed");
assert_eq!(JobStatus::from_display_string("completed"), Some(status));
}
#[test]
fn test_build_request_status_conversions() {
let status = BuildRequestStatus::BuildRequestCompleted;
assert_eq!(status.to_display_string(), "completed");
assert_eq!(BuildRequestStatus::from_display_string("completed"), Some(status));
}
#[test]
fn test_invalid_display_string() {
assert_eq!(PartitionStatus::from_display_string("invalid"), None);
assert_eq!(JobStatus::from_display_string("invalid"), None);
assert_eq!(BuildRequestStatus::from_display_string("invalid"), None);
}
}

View file

@ -1,4 +1,5 @@
- Remove manual reference of enum values, e.g. [here](../databuild/repositories/builds/mod.rs:85)
- Status indicator for page selection
- On build request detail page, show aggregated job results
- Use path based navigation instead of hashbang?

View file

@ -6,6 +6,12 @@
set -euo pipefail
# First make sure the build succeeds
bazel build //...
# Then make sure the core tests succeed
bazel test //...
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TESTS_DIR="$SCRIPT_DIR/tests/end_to_end"

View file

@ -83,15 +83,15 @@ for i in {1..60}; do
STATUS=$(echo "$STATUS_RESPONSE" | jq -r '.status' 2>/dev/null || echo "UNKNOWN")
case "$STATUS" in
"completed"|"COMPLETED")
"completed"|"COMPLETED"|"BuildRequestCompleted")
echo "[INFO] Service build completed"
break
;;
"failed"|"FAILED")
"failed"|"FAILED"|"BuildRequestFailed")
echo "[ERROR] Service build failed: $STATUS_RESPONSE"
exit 1
;;
"running"|"RUNNING"|"pending"|"PENDING"|"planning"|"PLANNING"|"executing"|"EXECUTING")
"running"|"RUNNING"|"pending"|"PENDING"|"planning"|"PLANNING"|"executing"|"EXECUTING"|"BuildRequestPlanning"|"BuildRequestExecuting"|"BuildRequestReceived")
echo "[INFO] Build status: $STATUS"
sleep 2
;;

View file

@ -92,15 +92,15 @@ for i in {1..30}; do
STATUS=$(echo "$STATUS_RESPONSE" | jq -r '.status' 2>/dev/null || echo "UNKNOWN")
case "$STATUS" in
"completed"|"COMPLETED")
"completed"|"COMPLETED"|"BuildRequestCompleted")
echo "[INFO] Service build completed"
break
;;
"failed"|"FAILED")
"failed"|"FAILED"|"BuildRequestFailed")
echo "[ERROR] Service build failed: $STATUS_RESPONSE"
exit 1
;;
"running"|"RUNNING"|"pending"|"PENDING"|"planning"|"PLANNING"|"executing"|"EXECUTING")
"running"|"RUNNING"|"pending"|"PENDING"|"planning"|"PLANNING"|"executing"|"EXECUTING"|"BuildRequestPlanning"|"BuildRequestExecuting"|"BuildRequestReceived")
echo "[INFO] Build status: $STATUS"
sleep 2
;;