Compare commits

...

3 commits

Author SHA1 Message Date
a78c6fc5fb resilience design
Some checks failed
/ setup (push) Has been cancelled
2025-08-21 22:09:29 -07:00
8ba4820654 Big bump 2025-08-20 23:34:37 -07:00
bfcf7cdfd2 Update databuild.proto for v2 2025-08-20 20:05:50 -07:00
26 changed files with 852 additions and 303 deletions

View file

@ -900,7 +900,7 @@ async fn handle_builds_command(matches: &ArgMatches, event_log_uri: &str) -> Res
} }
_ => { _ => {
println!("Build: {}", detail.build_request_id); println!("Build: {}", detail.build_request_id);
println!("Status: {} ({})", detail.status_name, detail.status_code); println!("Status: {} ({})", detail.status.clone().unwrap().name, detail.status.unwrap().code);
println!("Requested partitions: {}", detail.requested_partitions.len()); println!("Requested partitions: {}", detail.requested_partitions.len());
println!("Total jobs: {}", detail.total_jobs); println!("Total jobs: {}", detail.total_jobs);
println!("Completed jobs: {}", detail.completed_jobs); println!("Completed jobs: {}", detail.completed_jobs);
@ -958,11 +958,7 @@ async fn handle_builds_command(matches: &ArgMatches, event_log_uri: &str) -> Res
println!("\nTimeline ({} events):", detail.timeline.len()); println!("\nTimeline ({} events):", detail.timeline.len());
for event in detail.timeline { for event in detail.timeline {
let timestamp = format_timestamp(event.timestamp); let timestamp = format_timestamp(event.timestamp);
let status_info = if let Some(ref status_name) = event.status_name { let status_info = event.status.unwrap().name;
format!(" -> {}", status_name)
} else {
String::new()
};
println!(" {} [{}]{} {}", timestamp, event.event_type, status_info, event.message); println!(" {} [{}]{} {}", timestamp, event.event_type, status_info, event.message);
if let Some(ref reason) = event.cancel_reason { if let Some(ref reason) = event.cancel_reason {

View file

@ -44,6 +44,7 @@ genrule(
"typescript_generated/src/models/BuildRequest.ts", "typescript_generated/src/models/BuildRequest.ts",
"typescript_generated/src/models/BuildRequestResponse.ts", "typescript_generated/src/models/BuildRequestResponse.ts",
"typescript_generated/src/models/BuildSummary.ts", "typescript_generated/src/models/BuildSummary.ts",
"typescript_generated/src/models/BuildRequestStatus.ts",
"typescript_generated/src/models/BuildTimelineEvent.ts", "typescript_generated/src/models/BuildTimelineEvent.ts",
"typescript_generated/src/models/BuildsListApiResponse.ts", "typescript_generated/src/models/BuildsListApiResponse.ts",
"typescript_generated/src/models/BuildsListResponse.ts", "typescript_generated/src/models/BuildsListResponse.ts",
@ -118,6 +119,7 @@ genrule(
cp $$TEMP_DIR/src/models/BuildRequest.ts $(location typescript_generated/src/models/BuildRequest.ts) cp $$TEMP_DIR/src/models/BuildRequest.ts $(location typescript_generated/src/models/BuildRequest.ts)
cp $$TEMP_DIR/src/models/BuildRequestResponse.ts $(location typescript_generated/src/models/BuildRequestResponse.ts) cp $$TEMP_DIR/src/models/BuildRequestResponse.ts $(location typescript_generated/src/models/BuildRequestResponse.ts)
cp $$TEMP_DIR/src/models/BuildSummary.ts $(location typescript_generated/src/models/BuildSummary.ts) cp $$TEMP_DIR/src/models/BuildSummary.ts $(location typescript_generated/src/models/BuildSummary.ts)
cp $$TEMP_DIR/src/models/BuildRequestStatus.ts $(location typescript_generated/src/models/BuildRequestStatus.ts)
cp $$TEMP_DIR/src/models/BuildTimelineEvent.ts $(location typescript_generated/src/models/BuildTimelineEvent.ts) cp $$TEMP_DIR/src/models/BuildTimelineEvent.ts $(location typescript_generated/src/models/BuildTimelineEvent.ts)
cp $$TEMP_DIR/src/models/BuildsListApiResponse.ts $(location typescript_generated/src/models/BuildsListApiResponse.ts) cp $$TEMP_DIR/src/models/BuildsListApiResponse.ts $(location typescript_generated/src/models/BuildsListApiResponse.ts)
cp $$TEMP_DIR/src/models/BuildsListResponse.ts $(location typescript_generated/src/models/BuildsListResponse.ts) cp $$TEMP_DIR/src/models/BuildsListResponse.ts $(location typescript_generated/src/models/BuildsListResponse.ts)

View file

@ -220,8 +220,8 @@ export const RecentActivity: TypedComponent<RecentActivityAttrs> = {
}, build.build_request_id) }, build.build_request_id)
]), ]),
m('td', [ m('td', [
// KEY FIX: build.status_name is now always a string, prevents runtime errors // KEY FIX: build.status.name is now always a string, prevents runtime errors
m(BuildStatusBadge, { status: build.status_name }) m(BuildStatusBadge, { status: build.status.name })
]), ]),
m('td.text-sm.opacity-70', formatTime(build.requested_at)), m('td.text-sm.opacity-70', formatTime(build.requested_at)),
]) ])
@ -276,7 +276,7 @@ export const RecentActivity: TypedComponent<RecentActivityAttrs> = {
}, partition.partition_ref.str) }, partition.partition_ref.str)
]), ]),
m('td', [ m('td', [
// KEY FIX: partition.status_name is now always a string, prevents runtime errors // KEY FIX: partition.status.name is now always a string, prevents runtime errors
m(PartitionStatusBadge, { status: partition.status_name }) m(PartitionStatusBadge, { status: partition.status_name })
]), ]),
m('td.text-sm.opacity-70', m('td.text-sm.opacity-70',
@ -402,8 +402,8 @@ export const BuildStatus: TypedComponent<BuildStatusAttrs> = {
startPolling() { startPolling() {
// Use different poll intervals based on build status // Use different poll intervals based on build status
const isActive = this.data?.status_name === 'EXECUTING' || const isActive = this.data?.status.name === 'EXECUTING' ||
this.data?.status_name === 'PLANNING'; this.data?.status.name === 'PLANNING';
const interval = isActive ? 2000 : 10000; // 2s for active, 10s for completed const interval = isActive ? 2000 : 10000; // 2s for active, 10s for completed
pollingManager.startPolling(`build-status-${this.buildId}`, () => { pollingManager.startPolling(`build-status-${this.buildId}`, () => {
@ -457,7 +457,7 @@ export const BuildStatus: TypedComponent<BuildStatusAttrs> = {
{stage: 'Build Requested', time: build.requested_at, icon: '🕚'}, {stage: 'Build Requested', time: build.requested_at, icon: '🕚'},
...(build.started_at ? [{stage: 'Build Started', time: build.started_at, icon: '▶️'}] : []), ...(build.started_at ? [{stage: 'Build Started', time: build.started_at, icon: '▶️'}] : []),
// ...(this.data.events as BuildEvent[]).filter(ev => ev.job_event !== null).map((ev) => ({ // ...(this.data.events as BuildEvent[]).filter(ev => ev.job_event !== null).map((ev) => ({
// stage: ev.job_event.status_name, time: ev.timestamp, icon: '🙃' // stage: ev.job_event.status.name, time: ev.timestamp, icon: '🙃'
// })), // })),
...(build.completed_at ? [{stage: 'Build Completed', time: build.completed_at, icon: '✅'}] : []), ...(build.completed_at ? [{stage: 'Build Completed', time: build.completed_at, icon: '✅'}] : []),
]; ];
@ -472,7 +472,7 @@ export const BuildStatus: TypedComponent<BuildStatusAttrs> = {
m('.stat.bg-base-100.shadow.rounded-lg.p-4', [ m('.stat.bg-base-100.shadow.rounded-lg.p-4', [
m('.stat-title', 'Status'), m('.stat-title', 'Status'),
m('.stat-value.text-2xl', [ m('.stat-value.text-2xl', [
m(BuildStatusBadge, { status: build.status_name, size: 'lg' }) m(BuildStatusBadge, { status: build.status.name, size: 'lg' })
]) ])
]), ]),
m('.stat.bg-base-100.shadow.rounded-lg.p-4', [ m('.stat.bg-base-100.shadow.rounded-lg.p-4', [
@ -518,7 +518,7 @@ export const BuildStatus: TypedComponent<BuildStatusAttrs> = {
m('.partition-status.flex.justify-between.items-center', [ m('.partition-status.flex.justify-between.items-center', [
// CLEAN: Always string status, no nested object access // CLEAN: Always string status, no nested object access
m(PartitionStatusBadge, { m(PartitionStatusBadge, {
status: partitionStatus?.status_name || 'Loading...', status: partitionStatus?.status.name || 'Loading...',
size: 'sm' size: 'sm'
}), }),
partitionStatus?.last_updated ? partitionStatus?.last_updated ?
@ -905,15 +905,15 @@ export const PartitionStatus: TypedComponent<PartitionStatusAttrs> = {
// Update status based on event type // Update status based on event type
if (event.eventType === 'build_request') { if (event.eventType === 'build_request') {
if (event.message?.includes('completed') || event.message?.includes('successful')) { if (event.message?.includes('completed') || event.message?.includes('successful')) {
build.status_name = 'Completed'; build.status.name = 'Completed';
build.completedAt = event.timestamp; build.completedAt = event.timestamp;
} else if (event.message?.includes('failed') || event.message?.includes('error')) { } else if (event.message?.includes('failed') || event.message?.includes('error')) {
build.status_name = 'Failed'; build.status.name = 'Failed';
build.completedAt = event.timestamp; build.completedAt = event.timestamp;
} else if (event.message?.includes('executing') || event.message?.includes('running')) { } else if (event.message?.includes('executing') || event.message?.includes('running')) {
build.status_name = 'Executing'; build.status.name = 'Executing';
} else if (event.message?.includes('planning')) { } else if (event.message?.includes('planning')) {
build.status_name = 'Planning'; build.status.name = 'Planning';
} }
} }
} }
@ -1012,7 +1012,7 @@ export const PartitionStatus: TypedComponent<PartitionStatusAttrs> = {
]), ]),
m('div.partition-meta.flex.gap-4.items-center.mb-4', [ m('div.partition-meta.flex.gap-4.items-center.mb-4', [
m(PartitionStatusBadge, { status: this.data?.status_name || 'Unknown', size: 'lg' }), m(PartitionStatusBadge, { status: this.data?.status.name || 'Unknown', size: 'lg' }),
this.data?.last_updated ? this.data?.last_updated ?
m('.timestamp.text-sm.opacity-70', m('.timestamp.text-sm.opacity-70',
`Last updated: ${formatDateTime(this.data.last_updated)}`) : null, `Last updated: ${formatDateTime(this.data.last_updated)}`) : null,
@ -1051,7 +1051,7 @@ export const PartitionStatus: TypedComponent<PartitionStatusAttrs> = {
}, build.id) }, build.id)
]), ]),
m('td', [ m('td', [
m(BuildStatusBadge, { status: build.status_name }) m(BuildStatusBadge, { status: build.status.name })
]), ]),
m('td.text-sm.opacity-70', m('td.text-sm.opacity-70',
formatDateTime(build.startedAt)), formatDateTime(build.startedAt)),

View file

@ -38,8 +38,7 @@ const apiClient = new DefaultApi(apiConfig);
function transformBuildSummary(apiResponse: BuildSummary): DashboardBuild { function transformBuildSummary(apiResponse: BuildSummary): DashboardBuild {
return { return {
build_request_id: apiResponse.build_request_id, build_request_id: apiResponse.build_request_id,
status_code: apiResponse.status_code, status: apiResponse.status!,
status_name: apiResponse.status_name,
requested_partitions: apiResponse.requested_partitions, // Keep as PartitionRef array requested_partitions: apiResponse.requested_partitions, // Keep as PartitionRef array
total_jobs: apiResponse.total_jobs, total_jobs: apiResponse.total_jobs,
completed_jobs: apiResponse.completed_jobs, completed_jobs: apiResponse.completed_jobs,
@ -56,8 +55,7 @@ function transformBuildSummary(apiResponse: BuildSummary): DashboardBuild {
function transformBuildDetail(apiResponse: BuildDetailResponse): DashboardBuild { function transformBuildDetail(apiResponse: BuildDetailResponse): DashboardBuild {
return { return {
build_request_id: apiResponse.build_request_id, build_request_id: apiResponse.build_request_id,
status_code: apiResponse.status_code, status: apiResponse.status!,
status_name: apiResponse.status_name,
requested_partitions: apiResponse.requested_partitions, // Keep as PartitionRef array requested_partitions: apiResponse.requested_partitions, // Keep as PartitionRef array
total_jobs: apiResponse.total_jobs, total_jobs: apiResponse.total_jobs,
completed_jobs: apiResponse.completed_jobs, completed_jobs: apiResponse.completed_jobs,

View file

@ -7,7 +7,8 @@ import {
BuildDetailResponse, BuildDetailResponse,
PartitionSummary, PartitionSummary,
JobSummary, JobSummary,
ActivityResponse ActivityResponse,
BuildRequestStatus
} from '../client/typescript_generated/src/index'; } from '../client/typescript_generated/src/index';
// Import types directly since we're now in the same ts_project // Import types directly since we're now in the same ts_project
@ -26,8 +27,7 @@ import {
function transformBuildSummary(apiResponse: BuildSummary): DashboardBuild { function transformBuildSummary(apiResponse: BuildSummary): DashboardBuild {
return { return {
build_request_id: apiResponse.build_request_id, build_request_id: apiResponse.build_request_id,
status_code: apiResponse.status_code, status: apiResponse.status!,
status_name: apiResponse.status_name,
requested_partitions: apiResponse.requested_partitions, // Keep as PartitionRef array requested_partitions: apiResponse.requested_partitions, // Keep as PartitionRef array
total_jobs: apiResponse.total_jobs, total_jobs: apiResponse.total_jobs,
completed_jobs: apiResponse.completed_jobs, completed_jobs: apiResponse.completed_jobs,
@ -44,8 +44,7 @@ function transformBuildSummary(apiResponse: BuildSummary): DashboardBuild {
function transformBuildDetail(apiResponse: BuildDetailResponse): DashboardBuild { function transformBuildDetail(apiResponse: BuildDetailResponse): DashboardBuild {
return { return {
build_request_id: apiResponse.build_request_id, build_request_id: apiResponse.build_request_id,
status_code: apiResponse.status_code, status: apiResponse.status!,
status_name: apiResponse.status_name,
requested_partitions: apiResponse.requested_partitions, // Keep as PartitionRef array requested_partitions: apiResponse.requested_partitions, // Keep as PartitionRef array
total_jobs: apiResponse.total_jobs, total_jobs: apiResponse.total_jobs,
completed_jobs: apiResponse.completed_jobs, completed_jobs: apiResponse.completed_jobs,
@ -98,8 +97,7 @@ function transformActivityResponse(apiResponse: ActivityResponse): DashboardActi
// Test Data Mocks // Test Data Mocks
const mockBuildSummary: BuildSummary = { const mockBuildSummary: BuildSummary = {
build_request_id: 'build-123', build_request_id: 'build-123',
status_code: 4, // BUILD_REQUEST_COMPLETED status: {code: 4, name: 'COMPLETED'},
status_name: 'COMPLETED',
requested_partitions: [{ str: 'partition-1' }, { str: 'partition-2' }], requested_partitions: [{ str: 'partition-1' }, { str: 'partition-2' }],
total_jobs: 5, total_jobs: 5,
completed_jobs: 5, completed_jobs: 5,
@ -151,12 +149,12 @@ o.spec('Transformation Functions', () => {
const result = transformBuildSummary(mockBuildSummary); const result = transformBuildSummary(mockBuildSummary);
// The key fix: status_name should be a string, status_code a number // The key fix: status_name should be a string, status_code a number
o(typeof result.status_code).equals('number'); o(typeof result.status?.code).equals('number');
o(typeof result.status_name).equals('string'); o(typeof result.status?.name).equals('string');
o(result.status_name).equals('COMPLETED'); o(result.status.name).equals('COMPLETED');
// This should not throw (preventing the original runtime error) // This should not throw (preventing the original runtime error)
o(() => result.status_name.toLowerCase()).notThrows('status_name.toLowerCase should work'); o(() => result.status.name.toLowerCase()).notThrows('status_name.toLowerCase should work');
}); });
o('transformBuildSummary handles null optional fields', () => { o('transformBuildSummary handles null optional fields', () => {
@ -219,7 +217,7 @@ o.spec('Transformation Functions', () => {
// All nested objects should be properly transformed // All nested objects should be properly transformed
o(result.recent_builds.length).equals(1); o(result.recent_builds.length).equals(1);
o(typeof result.recent_builds[0]?.status_name).equals('string'); o(typeof result.recent_builds[0]?.status.name).equals('string');
o(result.recent_partitions.length).equals(1); o(result.recent_partitions.length).equals(1);
o(typeof result.recent_partitions[0]?.partition_ref).equals('object'); o(typeof result.recent_partitions[0]?.partition_ref).equals('object');
@ -233,8 +231,8 @@ o.spec('Transformation Functions', () => {
// 1. status_name.toLowerCase() - should not crash // 1. status_name.toLowerCase() - should not crash
result.recent_builds.forEach((build: DashboardBuild) => { result.recent_builds.forEach((build: DashboardBuild) => {
o(() => build.status_name.toLowerCase()).notThrows('build.status_name.toLowerCase should work'); o(() => build.status.name.toLowerCase()).notThrows('build.status.name.toLowerCase should work');
o(build.status_name.toLowerCase()).equals('completed'); o(build.status.name.toLowerCase()).equals('completed');
}); });
// 2. partition_ref.str access - should access string property // 2. partition_ref.str access - should access string property

View file

@ -11,7 +11,8 @@ import {
JobMetricsResponse, JobMetricsResponse,
JobDailyStats, JobDailyStats,
JobRunSummary, JobRunSummary,
PartitionRef PartitionRef,
BuildRequestStatus
} from '../client/typescript_generated/src/index'; } from '../client/typescript_generated/src/index';
// Dashboard-optimized types - canonical frontend types independent of backend schema // Dashboard-optimized types - canonical frontend types independent of backend schema
@ -19,8 +20,7 @@ import {
export interface DashboardBuild { export interface DashboardBuild {
build_request_id: string; build_request_id: string;
status_code: number; status: BuildRequestStatus;
status_name: string;
requested_partitions: PartitionRef[]; requested_partitions: PartitionRef[];
total_jobs: number; total_jobs: number;
completed_jobs: number; completed_jobs: number;
@ -66,8 +66,7 @@ export interface DashboardActivity {
// Dashboard timeline event types for consistent UI handling // Dashboard timeline event types for consistent UI handling
export interface DashboardBuildTimelineEvent { export interface DashboardBuildTimelineEvent {
timestamp: number; timestamp: number;
status_code: number; status: BuildRequestStatus;
status_name: string;
message: string; message: string;
event_type: string; event_type: string;
cancel_reason?: string; cancel_reason?: string;
@ -75,8 +74,7 @@ export interface DashboardBuildTimelineEvent {
export interface DashboardPartitionTimelineEvent { export interface DashboardPartitionTimelineEvent {
timestamp: number; timestamp: number;
status_code: number; status: BuildRequestStatus;
status_name: string;
message: string; message: string;
build_request_id: string; build_request_id: string;
job_run_id?: string; job_run_id?: string;
@ -249,8 +247,8 @@ export function isDashboardActivity(data: any): data is DashboardActivity {
export function isDashboardBuild(data: any): data is DashboardBuild { export function isDashboardBuild(data: any): data is DashboardBuild {
return data && return data &&
typeof data.build_request_id === 'string' && typeof data.build_request_id === 'string' &&
typeof data.status_code === 'number' && typeof data.status?.code === 'number' &&
typeof data.status_name === 'string' && typeof data.status?.name === 'string' &&
typeof data.requested_at === 'number' && typeof data.requested_at === 'number' &&
Array.isArray(data.requested_partitions); Array.isArray(data.requested_partitions);
} }

View file

@ -50,7 +50,7 @@ message JobConfigureResponse { repeated JobConfig configs = 1; }
// Implemented by the job.cfg bazel rule // Implemented by the job.cfg bazel rule
service JobConfigure { service JobConfigure {
rpc Configure(JobConfigureRequest) returns (JobConfigureResponse); rpc Configure(JobConfigureRequest) returns (JobConfigureResponse);
} }
// //
@ -86,7 +86,7 @@ message JobExecuteResponse { repeated PartitionManifest manifests = 1; }
// Implemented by the job.exec bazel rule // Implemented by the job.exec bazel rule
service JobExecute { service JobExecute {
rpc Execute(JobExecuteRequest) returns (JobExecuteResponse); rpc Execute(JobExecuteRequest) returns (JobExecuteResponse);
} }
/////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////
@ -119,7 +119,7 @@ message GraphLookupResponse { repeated TaskRef task_refs = 1; }
// Implemented per graph // Implemented per graph
service GraphLookup { service GraphLookup {
rpc Lookup(GraphLookupRequest) returns (GraphLookupResponse); rpc Lookup(GraphLookupRequest) returns (GraphLookupResponse);
} }
// Request message for graph analyze service // Request message for graph analyze service
@ -202,23 +202,46 @@ enum JobStatus {
} }
// Build request lifecycle // Build request lifecycle
enum BuildRequestStatus { enum BuildRequestStatusCode {
// Not good
BUILD_REQUEST_UNKNOWN = 0; BUILD_REQUEST_UNKNOWN = 0;
BUILD_REQUEST_RECEIVED = 1; // Build request received and queued // Build request received
BUILD_REQUEST_PLANNING = 2; // Graph analysis in progress BUILD_REQUEST_RECEIVED = 1;
BUILD_REQUEST_ANALYSIS_COMPLETED = 7; // Graph analysis completed successfully // Graph analysis in progress
BUILD_REQUEST_EXECUTING = 3; // Jobs are being executed BUILD_REQUEST_PLANNING = 2;
BUILD_REQUEST_COMPLETED = 4; // All requested partitions built // Graph analysis completed successfully
BUILD_REQUEST_FAILED = 5; // Build request failed BUILD_REQUEST_ANALYSIS_COMPLETED = 7;
BUILD_REQUEST_CANCELLED = 6; // Build request cancelled // Jobs are being executed
BUILD_REQUEST_EXECUTING = 3;
// All requested partitions built
BUILD_REQUEST_COMPLETED = 4;
// Build precondition failed (e.g. required external data was not available)
BUILD_REQUEST_PRECONDITION_FAILED = 8;
// Build request failed
BUILD_REQUEST_FAILED = 5;
// Build request cancelled
BUILD_REQUEST_CANCELLED = 6;
}
message BuildRequestStatus {
// Enum for programmatic use
BuildRequestStatusCode code = 1;
// Human readable string
string name = 2;
} }
// Build request lifecycle event // Build request lifecycle event
message BuildRequestEvent { message BuildRequestEvent {
BuildRequestStatus status_code = 1; // Enum for programmatic use // The status that this event indicates
string status_name = 2; // Human-readable string BuildRequestStatus status = 1;
// Output partitions requested to be built as part of this build
repeated PartitionRef requested_partitions = 3; repeated PartitionRef requested_partitions = 3;
string message = 4; // Optional status message // Optional status message
string message = 4;
// The comment attached to the request - contains arbitrary text
optional string comment = 5;
// The id of the want that triggered this build
optional string want_id = 6;
} }
// Partition state change event // Partition state change event
@ -272,20 +295,67 @@ message BuildCancelEvent {
string reason = 1; // Reason for cancellation string reason = 1; // Reason for cancellation
} }
// Partition Want message WantEvent {
message WantSource { repeated PartitionRef requested_partitions = 1;
// TODO // Unique identifier
string want_id = 2;
// How this want was created
WantSource source = 3;
string comment = 4;
} }
message PartitionWant { message PartitionWant {
PartitionRef partition_ref = 1; // Partition being requested string want_id = 1;
uint64 created_at = 2; // Server time when want registered // The ref we want to materialize
optional uint64 data_timestamp = 3; // Business time this partition represents PartitionRef ref = 2;
optional uint64 ttl_seconds = 4; // Give up after this long (from created_at) // Server time when want registered
optional uint64 sla_seconds = 5; // SLA violation after this long (from data_timestamp) uint64 created_at = 3;
repeated string external_dependencies = 6; // Cross-graph dependencies // Business time this partition represents
string want_id = 7; // Unique identifier uint64 data_timestamp = 4;
WantSource source = 8; // How this want was created // Give up after this long (from created_at)
optional uint64 ttl_seconds = 5;
// SLA violation after this long (from data_timestamp)
optional uint64 sla_seconds = 6;
// Cross-graph dependencies determined in the analysis phase triggered upon want submission
// These are per-partition, since wants can be partially, marginally materialized
repeated string external_dependencies = 7;
}
message WantSource {
// The source of the want
SourceType source_type = 1;
// TODO implement something to record want actual want source for external requests when we have real use case
}
message SourceType {
SourceTypeCode code = 1;
string name = 2;
}
enum SourceTypeCode {
// Manual CLI request
CLI_MANUAL = 0;
// Manual dashboard request
DASHBOARD_MANUAL = 1;
// Scheduled/triggered job
SCHEDULED = 2;
// External API call
API_REQUEST = 3;
}
// Marks a partition as tainted, so that it will be rebuilt if a data dep points to it, and will be rebuilt if a live
// want points to it.
message TaintEvent {
// The list of partitions to be tainted
repeated PartitionRef refs = 1;
// When the taint was created
uint64 created_at = 2;
// The source of the taint event
SourceType source_type = 3;
// Free text comment attached to the taint
string comment = 4;
} }
// Individual build event // Individual build event
@ -293,7 +363,7 @@ message BuildEvent {
// Event metadata // Event metadata
string event_id = 1; // UUID for this event string event_id = 1; // UUID for this event
int64 timestamp = 2; // Unix timestamp (nanoseconds) int64 timestamp = 2; // Unix timestamp (nanoseconds)
string build_request_id = 3; // UUID of the build request optional string build_request_id = 3;
// Event type and payload (one of) // Event type and payload (one of)
oneof event_type { oneof event_type {
@ -305,6 +375,8 @@ message BuildEvent {
PartitionInvalidationEvent partition_invalidation_event = 15; PartitionInvalidationEvent partition_invalidation_event = 15;
JobRunCancelEvent job_run_cancel_event = 16; JobRunCancelEvent job_run_cancel_event = 16;
BuildCancelEvent build_cancel_event = 17; BuildCancelEvent build_cancel_event = 17;
WantEvent want_event = 18;
TaintEvent taint_event = 19;
} }
} }
@ -460,18 +532,18 @@ message BuildsListResponse {
message BuildSummary { message BuildSummary {
string build_request_id = 1; string build_request_id = 1;
BuildRequestStatus status_code = 2; // Enum for programmatic use BuildRequestStatus status = 2;
string status_name = 3; // Human-readable string repeated PartitionRef requested_partitions = 3;
repeated PartitionRef requested_partitions = 4; uint32 total_jobs = 4;
uint32 total_jobs = 5; uint32 completed_jobs = 5;
uint32 completed_jobs = 6; uint32 failed_jobs = 6;
uint32 failed_jobs = 7; uint32 cancelled_jobs = 7;
uint32 cancelled_jobs = 8; int64 requested_at = 8;
int64 requested_at = 9; optional int64 started_at = 9;
optional int64 started_at = 10; optional int64 completed_at = 10;
optional int64 completed_at = 11; optional int64 duration_ms = 11;
optional int64 duration_ms = 12; bool cancelled = 12;
bool cancelled = 13; optional string comment = 13;
} }
// //
@ -501,29 +573,27 @@ message BuildDetailRequest {
message BuildDetailResponse { message BuildDetailResponse {
string build_request_id = 1; string build_request_id = 1;
BuildRequestStatus status_code = 2; // Enum for programmatic use BuildRequestStatus status = 2;
string status_name = 3; // Human-readable string repeated PartitionRef requested_partitions = 3;
repeated PartitionRef requested_partitions = 4; uint32 total_jobs = 4;
uint32 total_jobs = 5; uint32 completed_jobs = 5;
uint32 completed_jobs = 6; uint32 failed_jobs = 6;
uint32 failed_jobs = 7; uint32 cancelled_jobs = 7;
uint32 cancelled_jobs = 8; int64 requested_at = 8;
int64 requested_at = 9; optional int64 started_at = 9;
optional int64 started_at = 10; optional int64 completed_at = 10;
optional int64 completed_at = 11; optional int64 duration_ms = 11;
optional int64 duration_ms = 12; bool cancelled = 12;
bool cancelled = 13; optional string cancel_reason = 13;
optional string cancel_reason = 14; repeated BuildTimelineEvent timeline = 14;
repeated BuildTimelineEvent timeline = 15;
} }
message BuildTimelineEvent { message BuildTimelineEvent {
int64 timestamp = 1; int64 timestamp = 1;
optional BuildRequestStatus status_code = 2; // Enum for programmatic use optional BuildRequestStatus status = 2;
optional string status_name = 3; // Human-readable string string message = 3;
string message = 4; string event_type = 4;
string event_type = 5; optional string cancel_reason = 5;
optional string cancel_reason = 6;
} }
// //
@ -641,20 +711,377 @@ message JobLogsResponse {
} }
/////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////
// Services // Currently unused - implemented via HTTP REST API instead
/////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////
// Partition Want (Future feature - currently unused)
// message WantSource {
// // TODO
// }
// message PartitionWant {
// PartitionRef partition_ref = 1; // Partition being requested
// uint64 created_at = 2; // Server time when want registered
// optional uint64 data_timestamp = 3; // Business time this partition represents
// optional uint64 ttl_seconds = 4; // Give up after this long (from created_at)
// optional uint64 sla_seconds = 5; // SLA violation after this long (from data_timestamp)
// repeated string external_dependencies = 6; // Cross-graph dependencies
// string want_id = 7; // Unique identifier
// WantSource source = 8; // How this want was created
// }
// Service for job configuration and graph analysis // Service for job configuration and graph analysis
// service DataBuildService {
// // Get job configurations for the specified output references
// // rpc GetJobConfigs(JobConfigureRequest) returns (JobConfigureResponse) {}
// // Analyze and get the job graph for the specified output references
// rpc AnalyzeGraph(GraphAnalyzeRequest) returns (GraphAnalyzeResponse);
// // Execute the specified job graph (implemented by databuild)
// rpc Execute(JobGraph) returns (GraphExecuteResponse);
// // User-facing: build the desired partitions
// rpc Build(GraphBuildRequest) returns (GraphBuildResponse);
// }
///////////////////////////////////////////////////////////////////////////////////////////////
// DataBuildService - v2 of service and CLI interface below
///////////////////////////////////////////////////////////////////////////////////////////////
// The service that vends all build status information
// Core objects are:
// - Build events - events emitted as part of the build process that indicate status/state
// - BuildRequests - the literal request to build 1+ partitions
// - Partitions - Atomic units of data that represent results of jobs, and act as sufficiency signals for other jobs
// - Jobs - the units of work that build partitions (a single run of one is a JobRun)
// - JobRuns - the specific runs of Jobs
// - Wants - the recorded "want" to build a partition, which will be acted on ASAP
// - Taints - invalidate built partitions, in cases where the result should not be used or should be rebuilt
// Each of these will have a list page, and all but build events will have a summary page.
service DataBuildService { service DataBuildService {
// // Get job configurations for the specified output references // Build events - exposes literal events from build event log with filters
// rpc GetJobConfigs(JobConfigureRequest) returns (JobConfigureResponse) {} rpc GetBuildEvents(ListBuildEventsRequest) returns (ListBuildEventsResponse);
// Analyze and get the job graph for the specified output references // For batched requests
rpc AnalyzeGraph(GraphAnalyzeRequest) returns (GraphAnalyzeResponse); rpc Batched(BatchedRequest) returns (BatchedResponse);
// Execute the specified job graph (implemented by databuild) // BUILDS
rpc Execute(JobGraph) returns (GraphExecuteResponse); // List the available build requests with limited metadata about them (requested partitions, status, requested time, etc)
rpc ListBuildRequests(ListBuildsRequest) returns (ListBuildsResponse);
// Get build status, summary, and paginated lists of produced partitions, and other related metadata
rpc GetBuildSummary(BuildSummaryRequest) returns (BuildSummaryResponse);
// Get a mermaid description of the build request graph with its current status rendered
rpc GetBuildMermaid(BuildSummaryRequest) returns (BuildMermaidResponse);
// User-facing: build the desired partitions // PARTITIONS
rpc Build(GraphBuildRequest) returns (GraphBuildResponse); // List partitions (built, building, wanted)
rpc ListPartitions(ListPartitionsRequest) returns (ListPartitionsResponse);
// Get details about a specific partition (status, created at, past builds, job runs that built or are building it, etc)
rpc GetPartitionsSummary(PartitionSummaryRequest) returns (PartitionSummaryResponse);
// JOBS
// List jobs described in the graph plus metadata (success rate, last result, last run at, etc)
rpc ListJobs(ListJobsRequest) returns (ListJobsResponse);
// Get details for a specific job
rpc GetJobSummary(JobSummaryRequest) returns (JobSummaryResponse);
// JOB RUNS
// List job runs plus basic metadata (job they ran, result, runtime, etc)
rpc ListJobRuns(ListJobRunsRequest) returns (ListJobRunsResponse);
// Get details of a specific job run (above details plus produced partitions, paginated logs, etc)
rpc GetJobRunSummary(JobRunSummaryRequest) returns (JobRunSummaryResponse);
// Wants
// List wants plus metadata (wanted partitions, created at, status)
rpc ListWants(ListWantsRequest) returns (ListWantsResponse);
// Get details for a want (above plus reasons for want being in current state, etc)
rpc GetWantSummary(WantSummaryRequest) returns (WantSummaryResponse);
// Register a want (list of partition refs, with user, reason, etc)
rpc PutWants(PutWantsRequest) returns (PutWantsResponse);
// Taints
// List taints plus metadata (tainted partitions, created at, status)
rpc ListTaints(ListTaintsRequest) returns (ListTaintsResponse);
// Summarize the requested taint
rpc GetTaintSummary(TaintSummaryRequest) returns (TaintSummaryResponse);
// Register a taint (list of partition refs, with user, reason, etc)
rpc PutTaints(PutTaintsRequest) returns (PutTaintsResponse);
}
message RequestContainer {
ListBuildEventsResponse list_build_events = 1;
BuildSummaryRequest build_request_status = 2;
// TODO
}
message ResponseContainer {
ListBuildEventsResponse list_build_events = 1;
BuildSummaryResponse build_request_status = 2;
// TODO
}
message ErrorContainer {
string error_message = 1;
}
message BatchedRequest {
map<string, RequestContainer> requests = 1;
}
message BatchedResponse {
map<string, ResponseContainer> responses = 1;
map<string, ErrorContainer> errors = 2;
}
// BEL events
message ListBuildEventsRequest {
EventFilter filters = 1;
// Either one of the following must be provided
// Scrolls backwards from the specified timestamp
uint64 max_timestamp_ns = 2;
// Scrolls forward from the specified timestamp
uint64 min_timestamp_ns = 3;
}
message ListBuildEventsResponse {
// Resulting events are ordered
repeated BuildEvent events = 1;
bool has_more = 2;
}
// BUILD REQUESTS
// ANDed filters
message ListBuildsRequest {
// The max time the service will search until to find build requests
uint64 started_until = 1;
// Filters returned build requests those that currently have this status
repeated string build_status = 2;
// Filters build requests to those that built one of these partitions
repeated string built_partition = 3;
// Filters build requests to those that output one of these partitions (excluding those that were not explicitly
// requested in the build request)
repeated string output_partition = 4;
// Filters by jobs that were run as part of the build
repeated string run_jobs = 5;
// Filters by the ID of the want that triggered the build
repeated string triggering_want_ids = 6;
// Filters by contains match against build request comment
string comment_contains = 7;
}
// Ordered and paginated by build start time
message ListBuildsResponse {
// Resulting builds
repeated BuildSummary builds = 1;
// Paging bounds for requesting next page
uint64 min_started = 2;
// Indicates if there are more to request
bool has_more = 3;
}
message BuildSummaryRequest {
string build_id = 1;
}
message BuildSummaryResponse {
string build_id = 1;
// Overall status of the build
BuildRequestStatusCode status = 2;
// Summary of the build
BuildSummary summary = 3;
// Partitions produced by the build
repeated PartitionBuildStatus partitions = 4;
}
message PartitionBuildStatus {
PartitionRef ref = 1;
PartitionStatus status = 2;
}
message BuildMermaidResponse {
string build_id = 1;
string mermaid = 2;
}
// PARTITIONS
message ListPartitionsRequest {
// Optional regex filter
string ref_pattern = 1;
// Optional ORing partition status filter
repeated PartitionStatus partition_status = 2;
// Basic pagination mechanism - returns partitions sorted after the provided ref
string last_partition = 3;
}
message ListPartitionsResponse {
repeated PartitionSummaryV2 refs = 1;
}
message PartitionStatusV2 {
PartitionStatus code = 1;
string name = 2;
}
message PartitionSummaryV2 {
PartitionRef partition_ref = 1;
PartitionStatusV2 status = 2;
uint64 last_updated = 4;
uint64 last_invalidated_at = 6;
repeated string past_build_request = 7;
}
message PartitionSummaryRequest {
PartitionRef ref = 1;
}
message PartitionSummaryResponse {
PartitionSummaryV2 partition = 1;
}
// JOBS
// No query params - if you need to paginate here something is insane or you're google
message ListJobsRequest {}
message ListJobsResponse {
repeated JobSummary jobs = 1;
}
message JobSummaryRequest {
string job_label = 1;
}
message JobSummaryResponse {
JobSummary job = 1;
}
// JOB RUNS
// Paginates backwards
message ListJobRunsRequest {
// Filters to job runs started until this point
uint64 started_until = 1;
// ORing filter matching job run IDs
repeated string job_run_ids = 2;
// ORing filters to job runs that are defined by one of these job labels
repeated string job_labels = 3;
// ORing filters to job runs that were involved in one of these build requests
repeated string build_reqeust_ids = 4;
// ORing filters to partitions produced by these job runs
repeated string built_partition_refs = 5;
}
message ListJobRunsResponse {
repeated JobRunSummary job_runs = 1;
uint64 min_start_at = 2;
}
message JobRunSummaryRequest {
string job_run_id = 1;
}
message JobRunSummaryResponse {
JobRunSummary job_run = 1;
}
// WANTS
message ListWantsRequest {
// Filters the latest time the want could been requested until
uint64 requested_until = 1;
// Filters to wants whose ttl expires after ttl_until (allows querying "currently wanted"
uint64 ttl_until = 2;
}
message ListWantsResponse {
repeated PartitionWantSummary wants = 1;
uint64 min_requested_at = 2;
}
message LabeledPartitionBuildStatus {
PartitionRef ref = 1;
PartitionBuildStatus status = 2;
}
message PartitionWantSummary {
PartitionWant want = 1;
repeated PartitionSummary partitions = 2;
repeated LabeledPartitionBuildStatus external_partitions = 3;
string comment = 4;
}
message WantSummaryRequest {
string want_id = 1;
}
message WantSummaryResponse {
PartitionWantSummary want = 1;
}
message IndividualWantRequest {
PartitionRef ref = 1;
uint64 date_timestamp = 2;
uint64 ttl_seconds = 3;
uint64 sla_seconds = 4;
}
message PutWantsRequest {
repeated IndividualWantRequest wants = 1;
WantSource source = 2;
string comment = 3;
}
message CreatedWant {
PartitionRef ref = 1;
string want_id = 2;
}
message PutWantsResponse {
repeated CreatedWant wants = 1;
}
// TAINTS
message ListTaintsRequest {
uint64 tainted_at_until = 1;
}
message ListTaintsResponse {
repeated PartitionTaintSummary taints = 1;
uint64 min_tainted_at = 2;
}
message PartitionTaintSummary {
string taint_id = 1;
repeated PartitionRef refs = 2;
uint64 tainted_at = 3;
SourceType source = 4;
string comment = 5;
}
message TaintSummaryRequest {
string taint_id = 1;
}
message TaintSummaryResponse {
PartitionTaintSummary taint = 1;
}
message PutTaintsRequest {
repeated PartitionRef refs = 1;
SourceType source = 2;
string comment = 3;
}
message PutTaintsResponse {
string taint_id = 1;
} }

View file

@ -204,6 +204,8 @@ impl MockBuildEventLog {
Some(crate::build_event::EventType::PartitionInvalidationEvent(_)) => "partition_invalidation", Some(crate::build_event::EventType::PartitionInvalidationEvent(_)) => "partition_invalidation",
Some(crate::build_event::EventType::JobRunCancelEvent(_)) => "job_run_cancel", Some(crate::build_event::EventType::JobRunCancelEvent(_)) => "job_run_cancel",
Some(crate::build_event::EventType::BuildCancelEvent(_)) => "build_cancel", Some(crate::build_event::EventType::BuildCancelEvent(_)) => "build_cancel",
Some(crate::build_event::EventType::WantEvent(_)) => "want",
Some(crate::build_event::EventType::TaintEvent(_)) => "taint",
None => "unknown", None => "unknown",
}, },
event_data event_data
@ -220,7 +222,7 @@ impl MockBuildEventLog {
"INSERT INTO build_request_events (event_id, status, requested_partitions, message) VALUES (?1, ?2, ?3, ?4)", "INSERT INTO build_request_events (event_id, status, requested_partitions, message) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![ rusqlite::params![
event.event_id, event.event_id,
br_event.status_code.to_string(), br_event.clone().status.unwrap().code.to_string(),
partitions_json, partitions_json,
br_event.message br_event.message
], ],
@ -379,12 +381,13 @@ pub mod test_events {
BuildEvent { BuildEvent {
event_id: generate_event_id(), event_id: generate_event_id(),
timestamp: current_timestamp_nanos(), timestamp: current_timestamp_nanos(),
build_request_id: build_request_id.unwrap_or_else(|| Uuid::new_v4().to_string()), build_request_id,
event_type: Some(build_event::EventType::BuildRequestEvent(BuildRequestEvent { event_type: Some(build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: BuildRequestStatus::BuildRequestReceived as i32, status: Some(BuildRequestStatusCode::BuildRequestReceived.status()),
status_name: BuildRequestStatus::BuildRequestReceived.to_display_string(),
requested_partitions: partitions, requested_partitions: partitions,
message: "Build request received".to_string(), message: "Build request received".to_string(),
comment: None,
want_id: None,
})), })),
} }
} }
@ -398,12 +401,13 @@ pub mod test_events {
BuildEvent { BuildEvent {
event_id: generate_event_id(), event_id: generate_event_id(),
timestamp: current_timestamp_nanos(), timestamp: current_timestamp_nanos(),
build_request_id: build_request_id.unwrap_or_else(|| Uuid::new_v4().to_string()), build_request_id,
event_type: Some(build_event::EventType::BuildRequestEvent(BuildRequestEvent { event_type: Some(build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: status as i32, status: Some(status.clone()),
status_name: status.to_display_string(),
requested_partitions: partitions, requested_partitions: partitions,
message: format!("Build request status: {:?}", status), message: format!("Build request status: {:?}", status.name),
comment: None,
want_id: None,
})), })),
} }
} }
@ -418,7 +422,7 @@ pub mod test_events {
BuildEvent { BuildEvent {
event_id: generate_event_id(), event_id: generate_event_id(),
timestamp: current_timestamp_nanos(), timestamp: current_timestamp_nanos(),
build_request_id: build_request_id.unwrap_or_else(|| Uuid::new_v4().to_string()), build_request_id,
event_type: Some(build_event::EventType::PartitionEvent(PartitionEvent { event_type: Some(build_event::EventType::PartitionEvent(PartitionEvent {
partition_ref: Some(partition_ref), partition_ref: Some(partition_ref),
status_code: status as i32, status_code: status as i32,
@ -440,7 +444,7 @@ pub mod test_events {
BuildEvent { BuildEvent {
event_id: generate_event_id(), event_id: generate_event_id(),
timestamp: current_timestamp_nanos(), timestamp: current_timestamp_nanos(),
build_request_id: build_request_id.unwrap_or_else(|| Uuid::new_v4().to_string()), build_request_id,
event_type: Some(build_event::EventType::JobEvent(JobEvent { event_type: Some(build_event::EventType::JobEvent(JobEvent {
job_run_id: job_run_id.unwrap_or_else(|| Uuid::new_v4().to_string()), job_run_id: job_run_id.unwrap_or_else(|| Uuid::new_v4().to_string()),
job_label: Some(job_label), job_label: Some(job_label),
@ -563,9 +567,10 @@ impl BELStorage for MockBELStorage {
// Apply filtering based on EventFilter // Apply filtering based on EventFilter
events.retain(|event| { events.retain(|event| {
// Filter by build request IDs if specified // Filter by build request IDs if specified
if !filter.build_request_ids.is_empty() { if !filter.build_request_ids.is_empty() {
if !filter.build_request_ids.contains(&event.build_request_id) { if !filter.build_request_ids.contains(&event.build_request_id.clone().unwrap()) {
return false; return false;
} }
} }

View file

@ -85,7 +85,7 @@ pub fn create_build_event(
BuildEvent { BuildEvent {
event_id: generate_event_id(), event_id: generate_event_id(),
timestamp: current_timestamp_nanos(), timestamp: current_timestamp_nanos(),
build_request_id, build_request_id: Some(build_request_id.clone()),
event_type: Some(event_type), event_type: Some(event_type),
} }
} }

View file

@ -38,15 +38,16 @@ impl BELQueryEngine {
}; };
let events = self.storage.list_events(0, filter).await?; let events = self.storage.list_events(0, filter).await?;
let mut active_builds = Vec::new(); let mut active_builds: Vec<String> = Vec::new();
let mut build_states: HashMap<String, BuildRequestStatus> = HashMap::new(); let mut build_states: HashMap<String, BuildRequestStatusCode> = HashMap::new();
// Process events chronologically to track build states // Process events chronologically to track build states
for event in events.events { for event in events.events {
let build_request_id = event.build_request_id.clone().unwrap();
match &event.event_type { match &event.event_type {
Some(crate::build_event::EventType::BuildRequestEvent(br_event)) => { Some(crate::build_event::EventType::BuildRequestEvent(br_event)) => {
if let Ok(status) = BuildRequestStatus::try_from(br_event.status_code) { if let Ok(code) = BuildRequestStatusCode::try_from(br_event.clone().status.unwrap().code) {
build_states.insert(event.build_request_id.clone(), status); build_states.insert(build_request_id.clone(), code);
} }
} }
Some(crate::build_event::EventType::PartitionEvent(p_event)) => { Some(crate::build_event::EventType::PartitionEvent(p_event)) => {
@ -56,15 +57,15 @@ impl BELQueryEngine {
if let Ok(status) = PartitionStatus::try_from(p_event.status_code) { if let Ok(status) = PartitionStatus::try_from(p_event.status_code) {
if matches!(status, PartitionStatus::PartitionBuilding | PartitionStatus::PartitionAnalyzed) { if matches!(status, PartitionStatus::PartitionBuilding | PartitionStatus::PartitionAnalyzed) {
// Check if the build request is still active // Check if the build request is still active
if let Some(build_status) = build_states.get(&event.build_request_id) { if let Some(build_status) = build_states.get(&build_request_id) {
if matches!(build_status, if matches!(build_status,
BuildRequestStatus::BuildRequestReceived | BuildRequestStatusCode::BuildRequestReceived |
BuildRequestStatus::BuildRequestPlanning | BuildRequestStatusCode::BuildRequestPlanning |
BuildRequestStatus::BuildRequestExecuting | BuildRequestStatusCode::BuildRequestExecuting |
BuildRequestStatus::BuildRequestAnalysisCompleted BuildRequestStatusCode::BuildRequestAnalysisCompleted
) { ) {
if !active_builds.contains(&event.build_request_id) { if !active_builds.contains(&build_request_id) {
active_builds.push(event.build_request_id.clone()); active_builds.push(build_request_id.clone());
} }
} }
} }
@ -97,12 +98,12 @@ impl BELQueryEngine {
return Err(BuildEventLogError::QueryError(format!("Build request '{}' not found", build_id))); return Err(BuildEventLogError::QueryError(format!("Build request '{}' not found", build_id)));
} }
let mut status = BuildRequestStatus::BuildRequestUnknown; let mut status = BuildRequestStatusCode::BuildRequestUnknown.status();
let mut requested_partitions = Vec::new(); let mut requested_partitions = Vec::new();
let mut created_at = 0i64; let mut created_at = 0i64;
let mut updated_at = 0i64; let mut updated_at = 0i64;
for event in events.events { for event in events.events.iter().filter(|event| event.build_request_id.is_some()) {
if event.timestamp > 0 { if event.timestamp > 0 {
if created_at == 0 || event.timestamp < created_at { if created_at == 0 || event.timestamp < created_at {
created_at = event.timestamp; created_at = event.timestamp;
@ -113,7 +114,7 @@ impl BELQueryEngine {
} }
if let Some(crate::build_event::EventType::BuildRequestEvent(br_event)) = &event.event_type { if let Some(crate::build_event::EventType::BuildRequestEvent(br_event)) = &event.event_type {
if let Ok(event_status) = BuildRequestStatus::try_from(br_event.status_code) { if let Ok(event_status) = BuildRequestStatus::try_from(br_event.status.clone().unwrap()) {
status = event_status; status = event_status;
} }
if !br_event.requested_partitions.is_empty() { if !br_event.requested_partitions.is_empty() {
@ -148,20 +149,20 @@ impl BELQueryEngine {
let mut build_summaries: HashMap<String, BuildRequestSummary> = HashMap::new(); let mut build_summaries: HashMap<String, BuildRequestSummary> = HashMap::new();
// Aggregate by build request ID // Aggregate by build request ID
for event in events.events { for event in events.events.iter().filter(|event| event.build_request_id.is_some()) {
if let Some(crate::build_event::EventType::BuildRequestEvent(br_event)) = &event.event_type { if let Some(crate::build_event::EventType::BuildRequestEvent(br_event)) = &event.event_type {
let build_id = &event.build_request_id; let build_id = &event.build_request_id.clone().unwrap();
let entry = build_summaries.entry(build_id.clone()).or_insert_with(|| { let entry = build_summaries.entry(build_id.clone()).or_insert_with(|| {
BuildRequestSummary { BuildRequestSummary {
build_request_id: build_id.clone(), build_request_id: build_id.clone(),
status: BuildRequestStatus::BuildRequestUnknown, status: BuildRequestStatusCode::BuildRequestUnknown.status(),
requested_partitions: Vec::new(), requested_partitions: Vec::new(),
created_at: event.timestamp, created_at: event.timestamp,
updated_at: event.timestamp, updated_at: event.timestamp,
} }
}); });
if let Ok(status) = BuildRequestStatus::try_from(br_event.status_code) { if let Ok(status) = BuildRequestStatus::try_from(br_event.status.clone().unwrap()) {
entry.status = status; entry.status = status;
} }
entry.updated_at = event.timestamp.max(entry.updated_at); entry.updated_at = event.timestamp.max(entry.updated_at);
@ -179,8 +180,8 @@ impl BELQueryEngine {
// Apply status filter if provided // Apply status filter if provided
if let Some(status_filter) = &request.status_filter { if let Some(status_filter) = &request.status_filter {
if let Ok(filter_status) = status_filter.parse::<i32>() { if let Ok(filter_status) = status_filter.parse::<i32>() {
if let Ok(status) = BuildRequestStatus::try_from(filter_status) { if let Ok(status) = BuildRequestStatusCode::try_from(filter_status) {
builds.retain(|b| b.status == status); builds.retain(|b| b.status.code == status as i32);
} }
} }
} }
@ -194,8 +195,7 @@ impl BELQueryEngine {
.take(limit) .take(limit)
.map(|summary| BuildSummary { .map(|summary| BuildSummary {
build_request_id: summary.build_request_id, build_request_id: summary.build_request_id,
status_code: summary.status as i32, status: Some(summary.status),
status_name: summary.status.to_display_string(),
requested_partitions: summary.requested_partitions.into_iter() requested_partitions: summary.requested_partitions.into_iter()
.map(|s| PartitionRef { str: s }) .map(|s| PartitionRef { str: s })
.collect(), .collect(),
@ -208,6 +208,7 @@ impl BELQueryEngine {
completed_at: None, // TODO: Implement completed_at: None, // TODO: Implement
duration_ms: None, // TODO: Implement duration_ms: None, // TODO: Implement
cancelled: false, // TODO: Implement cancelled: false, // TODO: Implement
comment: None,
}) })
.collect(); .collect();
@ -228,18 +229,18 @@ impl BELQueryEngine {
let active_builds_count = builds_response.builds.iter() let active_builds_count = builds_response.builds.iter()
.filter(|b| matches!( .filter(|b| matches!(
BuildRequestStatus::try_from(b.status_code).unwrap_or(BuildRequestStatus::BuildRequestUnknown), BuildRequestStatusCode::try_from(b.status.clone().unwrap().code).unwrap_or(BuildRequestStatusCode::BuildRequestUnknown),
BuildRequestStatus::BuildRequestReceived | BuildRequestStatusCode::BuildRequestReceived |
BuildRequestStatus::BuildRequestPlanning | BuildRequestStatusCode::BuildRequestPlanning |
BuildRequestStatus::BuildRequestExecuting | BuildRequestStatusCode::BuildRequestExecuting |
BuildRequestStatus::BuildRequestAnalysisCompleted BuildRequestStatusCode::BuildRequestAnalysisCompleted
)) ))
.count() as u32; .count() as u32;
let recent_builds = builds_response.builds.into_iter() let recent_builds = builds_response.builds.into_iter()
.map(|b| BuildRequestSummary { .map(|b| BuildRequestSummary {
build_request_id: b.build_request_id, build_request_id: b.build_request_id,
status: BuildRequestStatus::try_from(b.status_code).unwrap_or(BuildRequestStatus::BuildRequestUnknown), status: b.status.unwrap_or(BuildRequestStatusCode::BuildRequestUnknown.status()),
requested_partitions: b.requested_partitions.into_iter().map(|p| p.str).collect(), requested_partitions: b.requested_partitions.into_iter().map(|p| p.str).collect(),
created_at: b.requested_at, created_at: b.requested_at,
updated_at: b.completed_at.unwrap_or(b.requested_at), updated_at: b.completed_at.unwrap_or(b.requested_at),
@ -299,7 +300,7 @@ impl BELQueryEngine {
if partition_event_ref.str == partition_ref { if partition_event_ref.str == partition_ref {
if let Ok(status) = PartitionStatus::try_from(p_event.status_code) { if let Ok(status) = PartitionStatus::try_from(p_event.status_code) {
if status == PartitionStatus::PartitionAvailable && event.timestamp >= latest_timestamp { if status == PartitionStatus::PartitionAvailable && event.timestamp >= latest_timestamp {
latest_available_build_id = Some(event.build_request_id.clone()); latest_available_build_id = event.build_request_id.clone();
latest_timestamp = event.timestamp; latest_timestamp = event.timestamp;
} }
} }

View file

@ -35,10 +35,11 @@ impl EventWriter {
let event = create_build_event( let event = create_build_event(
build_request_id, build_request_id,
build_event::EventType::BuildRequestEvent(BuildRequestEvent { build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: BuildRequestStatus::BuildRequestReceived as i32, status: Some(BuildRequestStatusCode::BuildRequestReceived.status()),
status_name: BuildRequestStatus::BuildRequestReceived.to_display_string(),
requested_partitions, requested_partitions,
message: "Build request received".to_string(), message: "Build request received".to_string(),
comment: None,
want_id: None,
}), }),
); );
@ -57,10 +58,11 @@ impl EventWriter {
let event = create_build_event( let event = create_build_event(
build_request_id, build_request_id,
build_event::EventType::BuildRequestEvent(BuildRequestEvent { build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: status as i32, status: Some(status),
status_name: status.to_display_string(),
requested_partitions: vec![], requested_partitions: vec![],
message, message,
comment: None,
want_id: None,
}), }),
); );
@ -80,10 +82,11 @@ impl EventWriter {
let event = create_build_event( let event = create_build_event(
build_request_id, build_request_id,
build_event::EventType::BuildRequestEvent(BuildRequestEvent { build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: status as i32, status: Some(status),
status_name: status.to_display_string(),
requested_partitions, requested_partitions,
message, message,
comment: None,
want_id: None,
}), }),
); );
@ -104,7 +107,7 @@ impl EventWriter {
let event = BuildEvent { let event = BuildEvent {
event_id: generate_event_id(), event_id: generate_event_id(),
timestamp: current_timestamp_nanos(), timestamp: current_timestamp_nanos(),
build_request_id, build_request_id: Some(build_request_id),
event_type: Some(build_event::EventType::PartitionEvent(PartitionEvent { event_type: Some(build_event::EventType::PartitionEvent(PartitionEvent {
partition_ref: Some(partition_ref), partition_ref: Some(partition_ref),
status_code: status as i32, status_code: status as i32,
@ -136,7 +139,7 @@ impl EventWriter {
let event = BuildEvent { let event = BuildEvent {
event_id: generate_event_id(), event_id: generate_event_id(),
timestamp: current_timestamp_nanos(), timestamp: current_timestamp_nanos(),
build_request_id, build_request_id: Some(build_request_id),
event_type: Some(build_event::EventType::PartitionInvalidationEvent( event_type: Some(build_event::EventType::PartitionInvalidationEvent(
PartitionInvalidationEvent { PartitionInvalidationEvent {
partition_ref: Some(partition_ref), partition_ref: Some(partition_ref),
@ -162,7 +165,7 @@ impl EventWriter {
let event = BuildEvent { let event = BuildEvent {
event_id: generate_event_id(), event_id: generate_event_id(),
timestamp: current_timestamp_nanos(), timestamp: current_timestamp_nanos(),
build_request_id, build_request_id: Some(build_request_id),
event_type: Some(build_event::EventType::JobEvent(JobEvent { event_type: Some(build_event::EventType::JobEvent(JobEvent {
job_run_id, job_run_id,
job_label: Some(job_label), job_label: Some(job_label),
@ -194,7 +197,7 @@ impl EventWriter {
let event = BuildEvent { let event = BuildEvent {
event_id: generate_event_id(), event_id: generate_event_id(),
timestamp: current_timestamp_nanos(), timestamp: current_timestamp_nanos(),
build_request_id, build_request_id: Some(build_request_id),
event_type: Some(build_event::EventType::JobEvent(JobEvent { event_type: Some(build_event::EventType::JobEvent(JobEvent {
job_run_id, job_run_id,
job_label: Some(job_label), job_label: Some(job_label),
@ -256,7 +259,7 @@ impl EventWriter {
let event = BuildEvent { let event = BuildEvent {
event_id: generate_event_id(), event_id: generate_event_id(),
timestamp: current_timestamp_nanos(), timestamp: current_timestamp_nanos(),
build_request_id, build_request_id: Some(build_request_id),
event_type: Some(build_event::EventType::JobRunCancelEvent(JobRunCancelEvent { event_type: Some(build_event::EventType::JobRunCancelEvent(JobRunCancelEvent {
job_run_id, job_run_id,
reason, reason,
@ -285,22 +288,22 @@ impl EventWriter {
let latest_status = build_events.iter() let latest_status = build_events.iter()
.rev() .rev()
.find_map(|e| match &e.event_type { .find_map(|e| match &e.event_type {
Some(build_event::EventType::BuildRequestEvent(br)) => Some(br.status_code), Some(build_event::EventType::BuildRequestEvent(br)) => Some(br.clone().status.unwrap().code),
_ => None, _ => None,
}); });
match latest_status { match latest_status {
Some(status) if status == BuildRequestStatus::BuildRequestCompleted as i32 => { Some(status) if status == BuildRequestStatusCode::BuildRequestCompleted as i32 => {
return Err(BuildEventLogError::QueryError( return Err(BuildEventLogError::QueryError(
format!("Cannot cancel completed build: {}", build_request_id) format!("Cannot cancel completed build: {}", build_request_id)
)); ));
} }
Some(status) if status == BuildRequestStatus::BuildRequestFailed as i32 => { Some(status) if status == BuildRequestStatusCode::BuildRequestFailed as i32 => {
return Err(BuildEventLogError::QueryError( return Err(BuildEventLogError::QueryError(
format!("Cannot cancel failed build: {}", build_request_id) format!("Cannot cancel failed build: {}", build_request_id)
)); ));
} }
Some(status) if status == BuildRequestStatus::BuildRequestCancelled as i32 => { Some(status) if status == BuildRequestStatusCode::BuildRequestCancelled as i32 => {
return Err(BuildEventLogError::QueryError( return Err(BuildEventLogError::QueryError(
format!("Build already cancelled: {}", build_request_id) format!("Build already cancelled: {}", build_request_id)
)); ));
@ -311,7 +314,7 @@ impl EventWriter {
let event = BuildEvent { let event = BuildEvent {
event_id: generate_event_id(), event_id: generate_event_id(),
timestamp: current_timestamp_nanos(), timestamp: current_timestamp_nanos(),
build_request_id: build_request_id.clone(), build_request_id: Some(build_request_id.clone()),
event_type: Some(build_event::EventType::BuildCancelEvent(BuildCancelEvent { event_type: Some(build_event::EventType::BuildCancelEvent(BuildCancelEvent {
reason, reason,
})), })),
@ -322,7 +325,7 @@ impl EventWriter {
// Also emit a build request status update // Also emit a build request status update
self.update_build_status( self.update_build_status(
build_request_id, build_request_id,
BuildRequestStatus::BuildRequestCancelled, BuildRequestStatusCode::BuildRequestCancelled.status(),
"Build cancelled by user".to_string(), "Build cancelled by user".to_string(),
).await ).await
} }
@ -361,7 +364,7 @@ impl EventWriter {
let event = BuildEvent { let event = BuildEvent {
event_id: generate_event_id(), event_id: generate_event_id(),
timestamp: current_timestamp_nanos(), timestamp: current_timestamp_nanos(),
build_request_id, build_request_id: Some(build_request_id),
event_type: Some(build_event::EventType::JobGraphEvent(JobGraphEvent { event_type: Some(build_event::EventType::JobGraphEvent(JobGraphEvent {
job_graph: Some(job_graph), job_graph: Some(job_graph),
message, message,
@ -391,19 +394,19 @@ mod tests {
// Test status updates // Test status updates
writer.update_build_status( writer.update_build_status(
build_id.clone(), build_id.clone(),
BuildRequestStatus::BuildRequestPlanning, BuildRequestStatusCode::BuildRequestPlanning.status(),
"Starting planning".to_string(), "Starting planning".to_string(),
).await.unwrap(); ).await.unwrap();
writer.update_build_status( writer.update_build_status(
build_id.clone(), build_id.clone(),
BuildRequestStatus::BuildRequestExecuting, BuildRequestStatusCode::BuildRequestExecuting.status(),
"Starting execution".to_string(), "Starting execution".to_string(),
).await.unwrap(); ).await.unwrap();
writer.update_build_status( writer.update_build_status(
build_id.clone(), build_id.clone(),
BuildRequestStatus::BuildRequestCompleted, BuildRequestStatusCode::BuildRequestCompleted.status(),
"Build completed successfully".to_string(), "Build completed successfully".to_string(),
).await.unwrap(); ).await.unwrap();
} }

View file

@ -107,9 +107,8 @@ mod format_consistency_tests {
assert_eq!(JobStatus::from_display_string("completed"), Some(job_status)); assert_eq!(JobStatus::from_display_string("completed"), Some(job_status));
// Test BuildRequestStatus conversions // Test BuildRequestStatus conversions
let build_status = BuildRequestStatus::BuildRequestCompleted; let build_status = BuildRequestStatusCode::BuildRequestCompleted.status();
assert_eq!(build_status.to_display_string(), "completed"); assert_eq!(build_status.name, "completed");
assert_eq!(BuildRequestStatus::from_display_string("completed"), Some(build_status));
// Test invalid conversions // Test invalid conversions
assert_eq!(PartitionStatus::from_display_string("invalid"), None); assert_eq!(PartitionStatus::from_display_string("invalid"), None);

View file

@ -203,10 +203,11 @@ async fn plan(
let event = create_build_event( let event = create_build_event(
build_request_id.to_string(), build_request_id.to_string(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent { crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: BuildRequestStatus::BuildRequestReceived as i32, status: Some(BuildRequestStatusCode::BuildRequestReceived.status()),
status_name: BuildRequestStatus::BuildRequestReceived.to_display_string(),
requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(), requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(),
message: "Analysis started".to_string(), message: "Analysis started".to_string(),
comment: None,
want_id: None,
}) })
); );
if let Err(e) = query_engine_ref.append_event(event).await { if let Err(e) = query_engine_ref.append_event(event).await {
@ -264,10 +265,11 @@ async fn plan(
let event = create_build_event( let event = create_build_event(
build_request_id.to_string(), build_request_id.to_string(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent { crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: BuildRequestStatus::BuildRequestPlanning as i32, status: Some(BuildRequestStatusCode::BuildRequestPlanning.status()),
status_name: BuildRequestStatus::BuildRequestPlanning.to_display_string(),
requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(), requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(),
message: "Graph analysis in progress".to_string(), message: "Graph analysis in progress".to_string(),
comment: None,
want_id: None,
}) })
); );
if let Err(e) = query_engine_ref.append_event(event).await { if let Err(e) = query_engine_ref.append_event(event).await {
@ -334,10 +336,11 @@ async fn plan(
let event = create_build_event( let event = create_build_event(
build_request_id.to_string(), build_request_id.to_string(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent { crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: BuildRequestStatus::BuildRequestAnalysisCompleted as i32, status: Some(BuildRequestStatusCode::BuildRequestAnalysisCompleted.status()),
status_name: BuildRequestStatus::BuildRequestAnalysisCompleted.to_display_string(),
requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(), requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(),
message: format!("Analysis completed successfully, {} tasks planned", nodes.len()), message: format!("Analysis completed successfully, {} tasks planned", nodes.len()),
comment: None,
want_id: None,
}) })
); );
if let Err(e) = query_engine.append_event(event).await { if let Err(e) = query_engine.append_event(event).await {
@ -376,10 +379,11 @@ async fn plan(
let event = create_build_event( let event = create_build_event(
build_request_id.to_string(), build_request_id.to_string(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent { crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: BuildRequestStatus::BuildRequestFailed as i32, status: Some(BuildRequestStatusCode::BuildRequestFailed.status()),
status_name: BuildRequestStatus::BuildRequestFailed.to_display_string(),
requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(), requested_partitions: output_refs.iter().map(|s| PartitionRef { str: s.clone() }).collect(),
message: "No jobs found for requested partitions".to_string(), message: "No jobs found for requested partitions".to_string(),
comment: None,
want_id: None,
}) })
); );
if let Err(e) = query_engine.append_event(event).await { if let Err(e) = query_engine.append_event(event).await {

View file

@ -1,4 +1,4 @@
use databuild::{JobGraph, Task, JobStatus, BuildRequestStatus, PartitionStatus, BuildRequestEvent, JobEvent, PartitionEvent, PartitionRef}; use databuild::{JobGraph, Task, JobStatus, BuildRequestStatus, BuildRequestStatusCode, PartitionStatus, BuildRequestEvent, JobEvent, PartitionEvent, PartitionRef};
use databuild::event_log::{create_bel_query_engine, create_build_event}; use databuild::event_log::{create_bel_query_engine, create_build_event};
use databuild::build_event::EventType; use databuild::build_event::EventType;
use databuild::log_collector::{LogCollector, LogCollectorError}; use databuild::log_collector::{LogCollector, LogCollectorError};
@ -460,10 +460,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let event = create_build_event( let event = create_build_event(
build_request_id.clone(), build_request_id.clone(),
EventType::BuildRequestEvent(BuildRequestEvent { EventType::BuildRequestEvent(BuildRequestEvent {
status_code: BuildRequestStatus::BuildRequestExecuting as i32, status: Some(BuildRequestStatusCode::BuildRequestExecuting.status()),
status_name: BuildRequestStatus::BuildRequestExecuting.to_display_string(),
requested_partitions: graph.outputs.clone(), requested_partitions: graph.outputs.clone(),
message: format!("Starting execution of {} jobs", graph.nodes.len()), message: format!("Starting execution of {} jobs", graph.nodes.len()),
comment: None,
want_id: None,
}) })
); );
if let Err(e) = query_engine.append_event(event).await { if let Err(e) = query_engine.append_event(event).await {
@ -787,18 +788,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Log final build request status (existing detailed event) // Log final build request status (existing detailed event)
if let Some(ref query_engine) = build_event_log { if let Some(ref query_engine) = build_event_log {
let final_status = if failure_count > 0 || fail_fast_triggered { let final_status = if failure_count > 0 || fail_fast_triggered {
BuildRequestStatus::BuildRequestFailed BuildRequestStatusCode::BuildRequestFailed
} else { } else {
BuildRequestStatus::BuildRequestCompleted BuildRequestStatusCode::BuildRequestCompleted
}; };
let event = create_build_event( let event = create_build_event(
build_request_id.clone(), build_request_id.clone(),
EventType::BuildRequestEvent(BuildRequestEvent { EventType::BuildRequestEvent(BuildRequestEvent {
status_code: final_status as i32, status: Some(final_status.status()),
status_name: final_status.to_display_string(),
requested_partitions: graph.outputs.clone(), requested_partitions: graph.outputs.clone(),
message: format!("Execution completed: {} succeeded, {} failed", success_count, failure_count), message: format!("Execution completed: {} succeeded, {} failed", success_count, failure_count),
comment: None,
want_id: None,
}) })
); );
if let Err(e) = query_engine.append_event(event).await { if let Err(e) = query_engine.append_event(event).await {

View file

@ -10,10 +10,11 @@ pub fn create_build_request_received_event(
create_build_event( create_build_event(
build_request_id, build_request_id,
build_event::EventType::BuildRequestEvent(BuildRequestEvent { build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: BuildRequestStatus::BuildRequestReceived as i32, status: Some(BuildRequestStatusCode::BuildRequestReceived.status()),
status_name: BuildRequestStatus::BuildRequestReceived.to_display_string(),
requested_partitions, requested_partitions,
message: "Build request received".to_string(), message: "Build request received".to_string(),
comment: None,
want_id: None,
}), }),
) )
} }
@ -24,10 +25,11 @@ pub fn create_build_planning_started_event(
create_build_event( create_build_event(
build_request_id, build_request_id,
build_event::EventType::BuildRequestEvent(BuildRequestEvent { build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: BuildRequestStatus::BuildRequestPlanning as i32, status: Some(BuildRequestStatusCode::BuildRequestPlanning.status()),
status_name: BuildRequestStatus::BuildRequestPlanning.to_display_string(),
requested_partitions: vec![], requested_partitions: vec![],
message: "Starting build planning".to_string(), message: "Starting build planning".to_string(),
comment: None,
want_id: None,
}), }),
) )
} }
@ -38,10 +40,11 @@ pub fn create_build_execution_started_event(
create_build_event( create_build_event(
build_request_id, build_request_id,
build_event::EventType::BuildRequestEvent(BuildRequestEvent { build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: BuildRequestStatus::BuildRequestExecuting as i32, status: Some(BuildRequestStatusCode::BuildRequestExecuting.status()),
status_name: BuildRequestStatus::BuildRequestExecuting.to_display_string(),
requested_partitions: vec![], requested_partitions: vec![],
message: "Starting build execution".to_string(), message: "Starting build execution".to_string(),
comment: None,
want_id: None,
}), }),
) )
} }
@ -63,17 +66,18 @@ pub fn create_build_completed_event(
}; };
let status = match result { let status = match result {
super::BuildResult::Success { .. } => BuildRequestStatus::BuildRequestCompleted, super::BuildResult::Success { .. } => BuildRequestStatusCode::BuildRequestCompleted.status(),
super::BuildResult::Failed { .. } | super::BuildResult::FailFast { .. } => BuildRequestStatus::BuildRequestFailed, super::BuildResult::Failed { .. } | super::BuildResult::FailFast { .. } => BuildRequestStatusCode::BuildRequestFailed.status(),
}; };
create_build_event( create_build_event(
build_request_id, build_request_id,
build_event::EventType::BuildRequestEvent(BuildRequestEvent { build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: status as i32, status: Some(status),
status_name: status.to_display_string(),
requested_partitions: vec![], requested_partitions: vec![],
message, message,
comment: None,
want_id: None,
}), }),
) )
} }
@ -86,10 +90,11 @@ pub fn create_analysis_completed_event(
create_build_event( create_build_event(
build_request_id, build_request_id,
build_event::EventType::BuildRequestEvent(BuildRequestEvent { build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: BuildRequestStatus::BuildRequestAnalysisCompleted as i32, status: Some(BuildRequestStatusCode::BuildRequestAnalysisCompleted.status()),
status_name: BuildRequestStatus::BuildRequestAnalysisCompleted.to_display_string(),
requested_partitions, requested_partitions,
message: format!("Analysis completed successfully, {} tasks planned", task_count), message: format!("Analysis completed successfully, {} tasks planned", task_count),
comment: None,
want_id: None,
}), }),
) )
} }
@ -101,7 +106,7 @@ pub fn create_job_scheduled_event(
BuildEvent { BuildEvent {
event_id: generate_event_id(), event_id: generate_event_id(),
timestamp: current_timestamp_nanos(), timestamp: current_timestamp_nanos(),
build_request_id, build_request_id: Some(build_request_id),
event_type: Some(build_event::EventType::JobEvent(job_event.clone())), event_type: Some(build_event::EventType::JobEvent(job_event.clone())),
} }
} }
@ -113,7 +118,7 @@ pub fn create_job_completed_event(
BuildEvent { BuildEvent {
event_id: generate_event_id(), event_id: generate_event_id(),
timestamp: current_timestamp_nanos(), timestamp: current_timestamp_nanos(),
build_request_id, build_request_id: Some(build_request_id),
event_type: Some(build_event::EventType::JobEvent(job_event.clone())), event_type: Some(build_event::EventType::JobEvent(job_event.clone())),
} }
} }
@ -125,7 +130,7 @@ pub fn create_partition_available_event(
BuildEvent { BuildEvent {
event_id: generate_event_id(), event_id: generate_event_id(),
timestamp: current_timestamp_nanos(), timestamp: current_timestamp_nanos(),
build_request_id, build_request_id: Some(build_request_id),
event_type: Some(build_event::EventType::PartitionEvent(partition_event.clone())), event_type: Some(build_event::EventType::PartitionEvent(partition_event.clone())),
} }
} }

View file

@ -66,7 +66,7 @@ impl BuildOrchestrator {
self.event_writer.update_build_status( self.event_writer.update_build_status(
self.build_request_id.clone(), self.build_request_id.clone(),
BuildRequestStatus::BuildRequestPlanning, BuildRequestStatusCode::BuildRequestPlanning.status(),
"Starting build planning".to_string(), "Starting build planning".to_string(),
).await ).await
.map_err(OrchestrationError::EventLog)?; .map_err(OrchestrationError::EventLog)?;
@ -80,7 +80,7 @@ impl BuildOrchestrator {
self.event_writer.update_build_status( self.event_writer.update_build_status(
self.build_request_id.clone(), self.build_request_id.clone(),
BuildRequestStatus::BuildRequestExecuting, BuildRequestStatusCode::BuildRequestExecuting.status(),
"Starting build execution".to_string(), "Starting build execution".to_string(),
).await ).await
.map_err(OrchestrationError::EventLog)?; .map_err(OrchestrationError::EventLog)?;
@ -95,22 +95,22 @@ impl BuildOrchestrator {
let (status, message) = match &result { let (status, message) = match &result {
BuildResult::Success { jobs_completed } => { BuildResult::Success { jobs_completed } => {
(BuildRequestStatus::BuildRequestCompleted, (BuildRequestStatusCode::BuildRequestCompleted,
format!("Build completed successfully with {} jobs", jobs_completed)) format!("Build completed successfully with {} jobs", jobs_completed))
} }
BuildResult::Failed { jobs_completed, jobs_failed } => { BuildResult::Failed { jobs_completed, jobs_failed } => {
(BuildRequestStatus::BuildRequestFailed, (BuildRequestStatusCode::BuildRequestFailed,
format!("Build failed: {} jobs completed, {} jobs failed", jobs_completed, jobs_failed)) format!("Build failed: {} jobs completed, {} jobs failed", jobs_completed, jobs_failed))
} }
BuildResult::FailFast { trigger_job } => { BuildResult::FailFast { trigger_job } => {
(BuildRequestStatus::BuildRequestFailed, (BuildRequestStatusCode::BuildRequestFailed,
format!("Build failed fast due to job: {}", trigger_job)) format!("Build failed fast due to job: {}", trigger_job))
} }
}; };
self.event_writer.update_build_status( self.event_writer.update_build_status(
self.build_request_id.clone(), self.build_request_id.clone(),
status, status.status(),
message, message,
).await ).await
.map_err(OrchestrationError::EventLog)?; .map_err(OrchestrationError::EventLog)?;
@ -122,7 +122,7 @@ impl BuildOrchestrator {
pub async fn emit_analysis_completed(&self, task_count: usize) -> Result<()> { pub async fn emit_analysis_completed(&self, task_count: usize) -> Result<()> {
self.event_writer.update_build_status_with_partitions( self.event_writer.update_build_status_with_partitions(
self.build_request_id.clone(), self.build_request_id.clone(),
BuildRequestStatus::BuildRequestAnalysisCompleted, BuildRequestStatusCode::BuildRequestAnalysisCompleted.status(),
self.requested_partitions.clone(), self.requested_partitions.clone(),
format!("Analysis completed successfully, {} tasks planned", task_count), format!("Analysis completed successfully, {} tasks planned", task_count),
).await ).await

View file

@ -62,7 +62,7 @@ impl BuildsRepository {
let builds = response.builds.into_iter().map(|build| { let builds = response.builds.into_iter().map(|build| {
BuildInfo { BuildInfo {
build_request_id: build.build_request_id, build_request_id: build.build_request_id,
status: BuildRequestStatus::try_from(build.status_code).unwrap_or(BuildRequestStatus::BuildRequestUnknown), status: build.status.clone().unwrap_or(BuildRequestStatusCode::BuildRequestUnknown.status()),
requested_partitions: build.requested_partitions, requested_partitions: build.requested_partitions,
requested_at: build.requested_at, requested_at: build.requested_at,
started_at: build.started_at, started_at: build.started_at,
@ -116,7 +116,7 @@ impl BuildsRepository {
let mut timeline = Vec::new(); let mut timeline = Vec::new();
for event in all_events { for event in all_events {
if let Some(crate::build_event::EventType::BuildRequestEvent(br_event)) = &event.event_type { if let Some(crate::build_event::EventType::BuildRequestEvent(br_event)) = &event.event_type {
if let Ok(status) = BuildRequestStatus::try_from(br_event.status_code) { if let Some(status) = br_event.clone().status {
timeline.push(BuildEvent { timeline.push(BuildEvent {
timestamp: event.timestamp, timestamp: event.timestamp,
event_type: "build_status".to_string(), event_type: "build_status".to_string(),
@ -151,8 +151,7 @@ impl BuildsRepository {
.into_iter() .into_iter()
.map(|event| ServiceBuildTimelineEvent { .map(|event| ServiceBuildTimelineEvent {
timestamp: event.timestamp, timestamp: event.timestamp,
status_code: event.status.map(|s| s as i32), status: event.status,
status_name: event.status.map(|s| s.to_display_string()),
message: event.message, message: event.message,
event_type: event.event_type, event_type: event.event_type,
cancel_reason: event.cancel_reason, cancel_reason: event.cancel_reason,
@ -161,8 +160,7 @@ impl BuildsRepository {
let response = BuildDetailResponse { let response = BuildDetailResponse {
build_request_id: build_info.build_request_id, build_request_id: build_info.build_request_id,
status_code: build_info.status as i32, status: Some(build_info.status),
status_name: build_info.status.to_display_string(),
requested_partitions: build_info.requested_partitions, requested_partitions: build_info.requested_partitions,
total_jobs: build_info.total_jobs as u32, total_jobs: build_info.total_jobs as u32,
completed_jobs: build_info.completed_jobs as u32, completed_jobs: build_info.completed_jobs as u32,
@ -200,18 +198,18 @@ impl BuildsRepository {
let (build, _timeline) = build_info.unwrap(); let (build, _timeline) = build_info.unwrap();
// Check if build is in a cancellable state // Check if build is in a cancellable state
match build.status { match BuildRequestStatusCode::try_from(build.status.code) {
BuildRequestStatus::BuildRequestCompleted => { Ok(BuildRequestStatusCode::BuildRequestCompleted) => {
return Err(BuildEventLogError::QueryError( return Err(BuildEventLogError::QueryError(
format!("Cannot cancel completed build: {}", build_request_id) format!("Cannot cancel completed build: {}", build_request_id)
)); ));
} }
BuildRequestStatus::BuildRequestFailed => { Ok(BuildRequestStatusCode::BuildRequestFailed) => {
return Err(BuildEventLogError::QueryError( return Err(BuildEventLogError::QueryError(
format!("Cannot cancel failed build: {}", build_request_id) format!("Cannot cancel failed build: {}", build_request_id)
)); ));
} }
BuildRequestStatus::BuildRequestCancelled => { Ok(BuildRequestStatusCode::BuildRequestCancelled) => {
return Err(BuildEventLogError::QueryError( return Err(BuildEventLogError::QueryError(
format!("Build already cancelled: {}", build_request_id) format!("Build already cancelled: {}", build_request_id)
)); ));
@ -225,10 +223,11 @@ impl BuildsRepository {
let cancel_event = create_build_event( let cancel_event = create_build_event(
build_request_id.to_string(), build_request_id.to_string(),
crate::build_event::EventType::BuildRequestEvent(crate::BuildRequestEvent { crate::build_event::EventType::BuildRequestEvent(crate::BuildRequestEvent {
status_code: BuildRequestStatus::BuildRequestCancelled as i32, status: Some(BuildRequestStatusCode::BuildRequestCancelled.status()),
status_name: BuildRequestStatus::BuildRequestCancelled.to_display_string(),
requested_partitions: build.requested_partitions, requested_partitions: build.requested_partitions,
message: format!("Build cancelled"), message: format!("Build cancelled"),
comment: None,
want_id: None,
}) })
); );
@ -250,8 +249,7 @@ impl BuildsRepository {
.into_iter() .into_iter()
.map(|build| crate::BuildSummary { .map(|build| crate::BuildSummary {
build_request_id: build.build_request_id, build_request_id: build.build_request_id,
status_code: build.status as i32, status: Some(build.status),
status_name: build.status.to_display_string(),
requested_partitions: build.requested_partitions.into_iter().map(|p| crate::PartitionRef { str: p.str }).collect(), requested_partitions: build.requested_partitions.into_iter().map(|p| crate::PartitionRef { str: p.str }).collect(),
total_jobs: build.total_jobs as u32, total_jobs: build.total_jobs as u32,
completed_jobs: build.completed_jobs as u32, completed_jobs: build.completed_jobs as u32,
@ -262,6 +260,7 @@ impl BuildsRepository {
completed_at: build.completed_at, completed_at: build.completed_at,
duration_ms: build.duration_ms, duration_ms: build.duration_ms,
cancelled: build.cancelled, cancelled: build.cancelled,
comment: None,
}) })
.collect(); .collect();
@ -292,10 +291,10 @@ mod tests {
// Create events for multiple builds // Create events for multiple builds
let events = vec![ let events = vec![
test_events::build_request_event(Some(build_id1.clone()), vec![partition1.clone()], BuildRequestStatus::BuildRequestReceived), test_events::build_request_event(Some(build_id1.clone()), vec![partition1.clone()], BuildRequestStatusCode::BuildRequestReceived.status()),
test_events::build_request_event(Some(build_id1.clone()), vec![partition1.clone()], BuildRequestStatus::BuildRequestCompleted), test_events::build_request_event(Some(build_id1.clone()), vec![partition1.clone()], BuildRequestStatusCode::BuildRequestCompleted.status()),
test_events::build_request_event(Some(build_id2.clone()), vec![partition2.clone()], BuildRequestStatus::BuildRequestReceived), test_events::build_request_event(Some(build_id2.clone()), vec![partition2.clone()], BuildRequestStatusCode::BuildRequestReceived.status()),
test_events::build_request_event(Some(build_id2.clone()), vec![partition2.clone()], BuildRequestStatus::BuildRequestFailed), test_events::build_request_event(Some(build_id2.clone()), vec![partition2.clone()], BuildRequestStatusCode::BuildRequestFailed.status()),
]; ];
let query_engine = create_mock_bel_query_engine_with_events(events).await.unwrap(); let query_engine = create_mock_bel_query_engine_with_events(events).await.unwrap();
@ -308,11 +307,11 @@ mod tests {
let build1 = builds.iter().find(|b| b.build_request_id == build_id1).unwrap(); let build1 = builds.iter().find(|b| b.build_request_id == build_id1).unwrap();
let build2 = builds.iter().find(|b| b.build_request_id == build_id2).unwrap(); let build2 = builds.iter().find(|b| b.build_request_id == build_id2).unwrap();
assert_eq!(build1.status, BuildRequestStatus::BuildRequestCompleted); assert_eq!(build1.status, BuildRequestStatusCode::BuildRequestCompleted.status());
assert_eq!(build1.requested_partitions.len(), 1); assert_eq!(build1.requested_partitions.len(), 1);
assert!(!build1.cancelled); assert!(!build1.cancelled);
assert_eq!(build2.status, BuildRequestStatus::BuildRequestFailed); assert_eq!(build2.status, BuildRequestStatusCode::BuildRequestFailed.status());
assert_eq!(build2.requested_partitions.len(), 1); assert_eq!(build2.requested_partitions.len(), 1);
assert!(!build2.cancelled); assert!(!build2.cancelled);
} }
@ -323,10 +322,10 @@ mod tests {
let partition = PartitionRef { str: "analytics/daily".to_string() }; let partition = PartitionRef { str: "analytics/daily".to_string() };
let events = vec![ let events = vec![
test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatus::BuildRequestReceived), test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatusCode::BuildRequestReceived.status()),
test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatus::BuildRequestPlanning), test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatusCode::BuildRequestPlanning.status()),
test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatus::BuildRequestExecuting), test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatusCode::BuildRequestExecuting.status()),
test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatus::BuildRequestCompleted), test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatusCode::BuildRequestCompleted.status()),
]; ];
let query_engine = create_mock_bel_query_engine_with_events(events).await.unwrap(); let query_engine = create_mock_bel_query_engine_with_events(events).await.unwrap();
@ -337,14 +336,14 @@ mod tests {
let (info, timeline) = result.unwrap(); let (info, timeline) = result.unwrap();
assert_eq!(info.build_request_id, build_id); assert_eq!(info.build_request_id, build_id);
assert_eq!(info.status, BuildRequestStatus::BuildRequestCompleted); assert_eq!(info.status, BuildRequestStatusCode::BuildRequestCompleted.status());
assert!(!info.cancelled); assert!(!info.cancelled);
assert_eq!(timeline.len(), 4); assert_eq!(timeline.len(), 4);
assert_eq!(timeline[0].status, Some(BuildRequestStatus::BuildRequestReceived)); assert_eq!(timeline[0].status, Some(BuildRequestStatusCode::BuildRequestReceived.status()));
assert_eq!(timeline[1].status, Some(BuildRequestStatus::BuildRequestPlanning)); assert_eq!(timeline[1].status, Some(BuildRequestStatusCode::BuildRequestPlanning.status()));
assert_eq!(timeline[2].status, Some(BuildRequestStatus::BuildRequestExecuting)); assert_eq!(timeline[2].status, Some(BuildRequestStatusCode::BuildRequestExecuting.status()));
assert_eq!(timeline[3].status, Some(BuildRequestStatus::BuildRequestCompleted)); assert_eq!(timeline[3].status, Some(BuildRequestStatusCode::BuildRequestCompleted.status()));
} }
#[tokio::test] #[tokio::test]
@ -363,8 +362,8 @@ mod tests {
// Start with a running build // Start with a running build
let events = vec![ let events = vec![
test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatus::BuildRequestReceived), test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatusCode::BuildRequestReceived.status()),
test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatus::BuildRequestExecuting), test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatusCode::BuildRequestExecuting.status()),
]; ];
let query_engine = create_mock_bel_query_engine_with_events(events).await.unwrap(); let query_engine = create_mock_bel_query_engine_with_events(events).await.unwrap();
@ -389,8 +388,8 @@ mod tests {
// Create a completed build // Create a completed build
let events = vec![ let events = vec![
test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatus::BuildRequestReceived), test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatusCode::BuildRequestReceived.status()),
test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatus::BuildRequestCompleted), test_events::build_request_event(Some(build_id.clone()), vec![partition.clone()], BuildRequestStatusCode::BuildRequestCompleted.status()),
]; ];
let query_engine = create_mock_bel_query_engine_with_events(events).await.unwrap(); let query_engine = create_mock_bel_query_engine_with_events(events).await.unwrap();

View file

@ -102,7 +102,7 @@ impl JobsRepository {
let job_run = JobRunDetail { let job_run = JobRunDetail {
job_run_id: j_event.job_run_id.clone(), job_run_id: j_event.job_run_id.clone(),
job_label: job_label.clone(), job_label: job_label.clone(),
build_request_id: event.build_request_id.clone(), build_request_id: event.build_request_id.clone().unwrap(),
target_partitions: j_event.target_partitions.clone(), target_partitions: j_event.target_partitions.clone(),
status, status,
scheduled_at: event.timestamp, scheduled_at: event.timestamp,
@ -229,7 +229,7 @@ impl JobsRepository {
let job_run = JobRunDetail { let job_run = JobRunDetail {
job_run_id: j_event.job_run_id.clone(), job_run_id: j_event.job_run_id.clone(),
job_label: job_label.to_string(), job_label: job_label.to_string(),
build_request_id: event.build_request_id.clone(), build_request_id: event.build_request_id.clone().unwrap(),
target_partitions: j_event.target_partitions.clone(), target_partitions: j_event.target_partitions.clone(),
status, status,
scheduled_at: event.timestamp, scheduled_at: event.timestamp,

View file

@ -164,7 +164,7 @@ impl PartitionsRepository {
timestamp: event.timestamp, timestamp: event.timestamp,
status: event_status, status: event_status,
message: p_event.message.clone(), message: p_event.message.clone(),
build_request_id: event.build_request_id, build_request_id: event.build_request_id.unwrap(),
job_run_id: if p_event.job_run_id.is_empty() { None } else { Some(p_event.job_run_id.clone()) }, job_run_id: if p_event.job_run_id.is_empty() { None } else { Some(p_event.job_run_id.clone()) },
}); });
} }

View file

@ -86,7 +86,7 @@ impl TasksRepository {
TaskInfo { TaskInfo {
job_run_id: j_event.job_run_id.clone(), job_run_id: j_event.job_run_id.clone(),
job_label: job_label.clone(), job_label: job_label.clone(),
build_request_id: event.build_request_id.clone(), build_request_id: event.build_request_id.clone().unwrap(),
status: JobStatus::JobUnknown, status: JobStatus::JobUnknown,
target_partitions: j_event.target_partitions.clone(), target_partitions: j_event.target_partitions.clone(),
scheduled_at: event.timestamp, scheduled_at: event.timestamp,
@ -182,7 +182,7 @@ impl TasksRepository {
task_info = Some(TaskInfo { task_info = Some(TaskInfo {
job_run_id: j_event.job_run_id.clone(), job_run_id: j_event.job_run_id.clone(),
job_label: job_label.clone(), job_label: job_label.clone(),
build_request_id: event.build_request_id.clone(), build_request_id: event.build_request_id.clone().unwrap(),
status: JobStatus::JobUnknown, status: JobStatus::JobUnknown,
target_partitions: j_event.target_partitions.clone(), target_partitions: j_event.target_partitions.clone(),
scheduled_at: event.timestamp, scheduled_at: event.timestamp,

View file

@ -61,7 +61,7 @@ pub async fn submit_build_request(
// Create build request state // Create build request state
let build_state = BuildRequestState { let build_state = BuildRequestState {
build_request_id: build_request_id.clone(), build_request_id: build_request_id.clone(),
status: BuildRequestStatus::BuildRequestReceived, status: BuildRequestStatusCode::BuildRequestReceived.status(),
requested_partitions: request.partitions.clone(), requested_partitions: request.partitions.clone(),
created_at: timestamp, created_at: timestamp,
updated_at: timestamp, updated_at: timestamp,
@ -160,7 +160,7 @@ pub async fn cancel_build_request(
{ {
let mut active_builds = service.active_builds.write().await; let mut active_builds = service.active_builds.write().await;
if let Some(build_state) = active_builds.get_mut(&build_request_id) { if let Some(build_state) = active_builds.get_mut(&build_request_id) {
build_state.status = BuildRequestStatus::BuildRequestCancelled; build_state.status = BuildRequestStatusCode::BuildRequestCancelled.status();
build_state.updated_at = current_timestamp_nanos(); build_state.updated_at = current_timestamp_nanos();
} else { } else {
return Err(( return Err((
@ -176,10 +176,11 @@ pub async fn cancel_build_request(
let event = create_build_event( let event = create_build_event(
build_request_id.clone(), build_request_id.clone(),
crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent { crate::build_event::EventType::BuildRequestEvent(BuildRequestEvent {
status_code: BuildRequestStatus::BuildRequestCancelled as i32, status: Some(BuildRequestStatusCode::BuildRequestCancelled.status()),
status_name: BuildRequestStatus::BuildRequestCancelled.to_display_string(),
requested_partitions: vec![], requested_partitions: vec![],
message: "Build request cancelled".to_string(), message: "Build request cancelled".to_string(),
comment: None,
want_id: None,
}), }),
); );
@ -262,14 +263,14 @@ pub async fn get_partition_events(
let decoded_partition_ref = base64_url_decode(&partition_ref).unwrap(); let decoded_partition_ref = base64_url_decode(&partition_ref).unwrap();
let events = match service.query_engine.get_partition_events(&decoded_partition_ref, None).await { let events = match service.query_engine.get_partition_events(&decoded_partition_ref, None).await {
Ok(events) => events.into_iter().map(|e| { Ok(events) => events.into_iter().filter(|e| e.build_request_id.is_some()).map(|e| {
let (job_label, partition_ref, delegated_build_id) = extract_navigation_data(&e.event_type); let (job_label, partition_ref, delegated_build_id) = extract_navigation_data(&e.event_type);
BuildEventSummary { BuildEventSummary {
event_id: e.event_id, event_id: e.event_id,
timestamp: e.timestamp, timestamp: e.timestamp,
event_type: event_type_to_string(&e.event_type), event_type: event_type_to_string(&e.event_type),
message: event_to_message(&e.event_type), message: event_to_message(&e.event_type),
build_request_id: e.build_request_id, build_request_id: e.build_request_id.clone().unwrap(),
job_label, job_label,
partition_ref, partition_ref,
delegated_build_id, delegated_build_id,
@ -350,7 +351,7 @@ async fn execute_build_request(
); );
// Update status to planning // Update status to planning
update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestPlanning).await; update_build_request_status(&service, &build_request_id, BuildRequestStatusCode::BuildRequestPlanning.status()).await;
// Log planning event // Log planning event
if let Err(e) = orchestrator.start_planning().await { if let Err(e) = orchestrator.start_planning().await {
@ -362,7 +363,7 @@ async fn execute_build_request(
Ok(graph) => graph, Ok(graph) => graph,
Err(e) => { Err(e) => {
error!("Failed to analyze build graph: {}", e); error!("Failed to analyze build graph: {}", e);
update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestFailed).await; update_build_request_status(&service, &build_request_id, BuildRequestStatusCode::BuildRequestFailed.status()).await;
// Log failure event // Log failure event
if let Err(log_err) = orchestrator.complete_build(BuildResult::Failed { jobs_completed: 0, jobs_failed: 1 }).await { if let Err(log_err) = orchestrator.complete_build(BuildResult::Failed { jobs_completed: 0, jobs_failed: 1 }).await {
@ -375,7 +376,7 @@ async fn execute_build_request(
// Update status to executing // Update status to executing
update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestExecuting).await; update_build_request_status(&service, &build_request_id, BuildRequestStatusCode::BuildRequestExecuting.status()).await;
// Log executing event // Log executing event
if let Err(e) = orchestrator.start_execution().await { if let Err(e) = orchestrator.start_execution().await {
@ -386,7 +387,7 @@ async fn execute_build_request(
match run_execute_command(&service, &build_request_id, &job_graph).await { match run_execute_command(&service, &build_request_id, &job_graph).await {
Ok(_) => { Ok(_) => {
info!("Build request {} completed successfully", build_request_id); info!("Build request {} completed successfully", build_request_id);
update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestCompleted).await; update_build_request_status(&service, &build_request_id, BuildRequestStatusCode::BuildRequestCompleted.status()).await;
// Log completion event // Log completion event
if let Err(e) = orchestrator.complete_build(BuildResult::Success { jobs_completed: 0 }).await { if let Err(e) = orchestrator.complete_build(BuildResult::Success { jobs_completed: 0 }).await {
@ -397,7 +398,7 @@ async fn execute_build_request(
} }
Err(e) => { Err(e) => {
error!("Build request {} failed: {}", build_request_id, e); error!("Build request {} failed: {}", build_request_id, e);
update_build_request_status(&service, &build_request_id, BuildRequestStatus::BuildRequestFailed).await; update_build_request_status(&service, &build_request_id, BuildRequestStatusCode::BuildRequestFailed.status()).await;
// Log failure event // Log failure event
if let Err(log_err) = orchestrator.complete_build(BuildResult::Failed { jobs_completed: 0, jobs_failed: 1 }).await { if let Err(log_err) = orchestrator.complete_build(BuildResult::Failed { jobs_completed: 0, jobs_failed: 1 }).await {
@ -505,7 +506,9 @@ fn event_type_to_string(event_type: &Option<crate::build_event::EventType>) -> S
Some(crate::build_event::EventType::PartitionInvalidationEvent(_)) => "partition_invalidation".to_string(), Some(crate::build_event::EventType::PartitionInvalidationEvent(_)) => "partition_invalidation".to_string(),
Some(crate::build_event::EventType::JobRunCancelEvent(_)) => "task_cancel".to_string(), Some(crate::build_event::EventType::JobRunCancelEvent(_)) => "task_cancel".to_string(),
Some(crate::build_event::EventType::BuildCancelEvent(_)) => "build_cancel".to_string(), Some(crate::build_event::EventType::BuildCancelEvent(_)) => "build_cancel".to_string(),
None => "INVALID_EVENT_TYPE".to_string(), // Make this obvious rather than hiding it Some(build_event::EventType::WantEvent(_)) => "want".to_string(),
Some(build_event::EventType::TaintEvent(_)) => "taint".to_string(),
None => "INVALID_EVENT_TYPE".to_string(),
} }
} }
@ -519,7 +522,10 @@ fn event_to_message(event_type: &Option<crate::build_event::EventType>) -> Strin
Some(crate::build_event::EventType::PartitionInvalidationEvent(event)) => event.reason.clone(), Some(crate::build_event::EventType::PartitionInvalidationEvent(event)) => event.reason.clone(),
Some(crate::build_event::EventType::JobRunCancelEvent(event)) => event.reason.clone(), Some(crate::build_event::EventType::JobRunCancelEvent(event)) => event.reason.clone(),
Some(crate::build_event::EventType::BuildCancelEvent(event)) => event.reason.clone(), Some(crate::build_event::EventType::BuildCancelEvent(event)) => event.reason.clone(),
None => "INVALID_EVENT_NO_MESSAGE".to_string(), // Make this obvious Some(build_event::EventType::WantEvent(event)) => event.comment.clone(),
Some(build_event::EventType::TaintEvent(event)) => event.comment.clone(),
None => "INVALID_EVENT_NO_MESSAGE".to_string(),
} }
} }
@ -557,6 +563,12 @@ fn extract_navigation_data(event_type: &Option<crate::build_event::EventType>) -
// Build cancel events don't need navigation links // Build cancel events don't need navigation links
(None, None, None) (None, None, None)
}, },
Some(crate::build_event::EventType::WantEvent(_)) => {
(None, None, None)
},
Some(crate::build_event::EventType::TaintEvent(_)) => {
(None, None, None)
},
None => (None, None, None), None => (None, None, None),
} }
} }
@ -1417,8 +1429,7 @@ pub async fn get_build_detail(
let timeline_events: Vec<BuildTimelineEvent> = protobuf_response.timeline.into_iter().map(|event| { let timeline_events: Vec<BuildTimelineEvent> = protobuf_response.timeline.into_iter().map(|event| {
BuildTimelineEvent { BuildTimelineEvent {
timestamp: event.timestamp, timestamp: event.timestamp,
status_code: event.status_code, status: event.status,
status_name: event.status_name,
message: event.message, message: event.message,
event_type: event.event_type, event_type: event.event_type,
cancel_reason: event.cancel_reason, cancel_reason: event.cancel_reason,
@ -1427,8 +1438,7 @@ pub async fn get_build_detail(
Ok(Json(BuildDetailResponse { Ok(Json(BuildDetailResponse {
build_request_id: protobuf_response.build_request_id, build_request_id: protobuf_response.build_request_id,
status_code: protobuf_response.status_code, status: protobuf_response.status,
status_name: protobuf_response.status_name,
requested_partitions: protobuf_response.requested_partitions, requested_partitions: protobuf_response.requested_partitions,
total_jobs: protobuf_response.total_jobs, total_jobs: protobuf_response.total_jobs,
completed_jobs: protobuf_response.completed_jobs, completed_jobs: protobuf_response.completed_jobs,

View file

@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize};
use schemars::JsonSchema; use schemars::JsonSchema;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use rusqlite::ToSql;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use uuid::Uuid; use uuid::Uuid;
@ -398,15 +399,17 @@ impl BuildGraphService {
} }
pub fn status_to_string(status: BuildRequestStatus) -> String { pub fn status_to_string(status: BuildRequestStatus) -> String {
match status { match BuildRequestStatusCode::try_from(status.code) {
BuildRequestStatus::BuildRequestUnknown => "unknown".to_string(), Ok(BuildRequestStatusCode::BuildRequestUnknown) => "unknown".to_string(),
BuildRequestStatus::BuildRequestReceived => "received".to_string(), Ok(BuildRequestStatusCode::BuildRequestReceived) => "received".to_string(),
BuildRequestStatus::BuildRequestPlanning => "planning".to_string(), Ok(BuildRequestStatusCode::BuildRequestPlanning) => "planning".to_string(),
BuildRequestStatus::BuildRequestAnalysisCompleted => "analysis_completed".to_string(), Ok(BuildRequestStatusCode::BuildRequestAnalysisCompleted) => "analysis_completed".to_string(),
BuildRequestStatus::BuildRequestExecuting => "executing".to_string(), Ok(BuildRequestStatusCode::BuildRequestExecuting) => "executing".to_string(),
BuildRequestStatus::BuildRequestCompleted => "completed".to_string(), Ok(BuildRequestStatusCode::BuildRequestCompleted) => "completed".to_string(),
BuildRequestStatus::BuildRequestFailed => "failed".to_string(), Ok(BuildRequestStatusCode::BuildRequestFailed) => "failed".to_string(),
BuildRequestStatus::BuildRequestCancelled => "cancelled".to_string(), Ok(BuildRequestStatusCode::BuildRequestCancelled) => "cancelled".to_string(),
Ok(BuildRequestStatusCode::BuildRequestPreconditionFailed) => "precondition_failed".to_string(),
Err(_) => "error".to_string(),
} }
} }

View file

@ -61,35 +61,44 @@ impl JobStatus {
} }
} }
impl BuildRequestStatus { impl BuildRequestStatusCode {
/// Convert build request status to human-readable string matching current CLI/service format /// Convert build request status to human-readable string matching current CLI/service format
pub fn to_display_string(&self) -> String { pub fn to_display_string(&self) -> String {
match self { match self {
BuildRequestStatus::BuildRequestUnknown => "unknown".to_string(), BuildRequestStatusCode::BuildRequestUnknown => "unknown".to_string(),
BuildRequestStatus::BuildRequestReceived => "received".to_string(), BuildRequestStatusCode::BuildRequestReceived => "received".to_string(),
BuildRequestStatus::BuildRequestPlanning => "planning".to_string(), BuildRequestStatusCode::BuildRequestPlanning => "planning".to_string(),
BuildRequestStatus::BuildRequestAnalysisCompleted => "analysis_completed".to_string(), BuildRequestStatusCode::BuildRequestAnalysisCompleted => "analysis_completed".to_string(),
BuildRequestStatus::BuildRequestExecuting => "executing".to_string(), BuildRequestStatusCode::BuildRequestExecuting => "executing".to_string(),
BuildRequestStatus::BuildRequestCompleted => "completed".to_string(), BuildRequestStatusCode::BuildRequestCompleted => "completed".to_string(),
BuildRequestStatus::BuildRequestFailed => "failed".to_string(), BuildRequestStatusCode::BuildRequestFailed => "failed".to_string(),
BuildRequestStatus::BuildRequestCancelled => "cancelled".to_string(), BuildRequestStatusCode::BuildRequestCancelled => "cancelled".to_string(),
&BuildRequestStatusCode::BuildRequestPreconditionFailed => "precondition failed".to_string(),
} }
} }
/// Parse a display string back to enum /// Parse a display string back to enum
pub fn from_display_string(s: &str) -> Option<Self> { pub fn from_display_string(s: &str) -> Option<Self> {
match s { match s {
"unknown" => Some(BuildRequestStatus::BuildRequestUnknown), "unknown" => Some(BuildRequestStatusCode::BuildRequestUnknown),
"received" => Some(BuildRequestStatus::BuildRequestReceived), "received" => Some(BuildRequestStatusCode::BuildRequestReceived),
"planning" => Some(BuildRequestStatus::BuildRequestPlanning), "planning" => Some(BuildRequestStatusCode::BuildRequestPlanning),
"analysis_completed" => Some(BuildRequestStatus::BuildRequestAnalysisCompleted), "analysis_completed" => Some(BuildRequestStatusCode::BuildRequestAnalysisCompleted),
"executing" => Some(BuildRequestStatus::BuildRequestExecuting), "executing" => Some(BuildRequestStatusCode::BuildRequestExecuting),
"completed" => Some(BuildRequestStatus::BuildRequestCompleted), "completed" => Some(BuildRequestStatusCode::BuildRequestCompleted),
"failed" => Some(BuildRequestStatus::BuildRequestFailed), "failed" => Some(BuildRequestStatusCode::BuildRequestFailed),
"cancelled" => Some(BuildRequestStatus::BuildRequestCancelled), "cancelled" => Some(BuildRequestStatusCode::BuildRequestCancelled),
"precondition failed" => Some(BuildRequestStatusCode::BuildRequestPreconditionFailed),
_ => None, _ => None,
} }
} }
pub fn status(&self) -> BuildRequestStatus {
BuildRequestStatus {
code: self.clone().into(),
name: self.to_display_string(),
}
}
} }
impl DepType { impl DepType {
@ -205,11 +214,11 @@ pub mod list_response_helpers {
completed_at: Option<i64>, completed_at: Option<i64>,
duration_ms: Option<i64>, duration_ms: Option<i64>,
cancelled: bool, cancelled: bool,
comment: Option<String>,
) -> BuildSummary { ) -> BuildSummary {
BuildSummary { BuildSummary {
build_request_id, build_request_id,
status_code: status as i32, status: Some(status),
status_name: status.to_display_string(),
requested_partitions, requested_partitions,
total_jobs: total_jobs as u32, total_jobs: total_jobs as u32,
completed_jobs: completed_jobs as u32, completed_jobs: completed_jobs as u32,
@ -220,6 +229,7 @@ pub mod list_response_helpers {
completed_at, completed_at,
duration_ms, duration_ms,
cancelled, cancelled,
comment,
} }
} }
@ -256,9 +266,8 @@ mod tests {
#[test] #[test]
fn test_build_request_status_conversions() { fn test_build_request_status_conversions() {
let status = BuildRequestStatus::BuildRequestCompleted; let status = BuildRequestStatusCode::BuildRequestCompleted.status();
assert_eq!(status.to_display_string(), "completed"); assert_eq!(status.name, "completed");
assert_eq!(BuildRequestStatus::from_display_string("completed"), Some(status));
} }
#[test] #[test]
@ -276,7 +285,7 @@ mod tests {
fn test_invalid_display_string() { fn test_invalid_display_string() {
assert_eq!(PartitionStatus::from_display_string("invalid"), None); assert_eq!(PartitionStatus::from_display_string("invalid"), None);
assert_eq!(JobStatus::from_display_string("invalid"), None); assert_eq!(JobStatus::from_display_string("invalid"), None);
assert_eq!(BuildRequestStatus::from_display_string("invalid"), None); assert_eq!(BuildRequestStatusCode::from_display_string("invalid"), None);
assert_eq!(DepType::from_display_string("invalid"), None); assert_eq!(DepType::from_display_string("invalid"), None);
} }
} }

5
design/questions.md Normal file
View file

@ -0,0 +1,5 @@
# Questions
- What happens when we deploy a new graph, and nothing builds a wanted partition?
- Is the interaction model between graph_a -> graph_b actually graph_a registering a want in graph_b?

86
design/resilience.md Normal file
View file

@ -0,0 +1,86 @@
# Resilience Design
Purpose: Enable DataBuild to maintain correctness during version rollouts, unexpected restarts, and continuous deployment.
## Core Tenets
- **Simplicity over complexity** - Avoid distributed state management
- **Correctness over speed** - Accept retries rather than risk corruption
- **Enable good design** - Constraints that guide users toward robust patterns
## Key Mechanisms
### Process Generation Tracking
Each DataBuild service instance receives a monotonically increasing generation identifier (timestamp-based) at startup. The BEL rejects events from older generations, preventing split-brain scenarios where multiple versions compete. This provides natural version fencing without coordination - the newest service always wins, and older instances' events are ignored.
### Builds as Expiring Leases
Build requests function as time-bound leases on partition production. Active builds must periodically heartbeat to maintain their lease. When a service restarts, builds without recent heartbeats are considered orphaned and marked failed. This prevents zombie builds from interfering with new attempts while avoiding the complexity of explicit lease management.
### Want-Level Retry Logic
Wants represent the desired state (partitions that should exist), while builds are ephemeral attempts to achieve that state. When a service restarts, the recovery strategy is deliberately simple: cancel all in-flight builds and let active wants retrigger new attempts. This separation means restarts don't require complex state recovery - the system naturally converges to satisfy outstanding wants.
### Job Re-entrance for Long-Running Tasks
Long-running jobs on external systems (EMR, BigQuery, Databricks) must survive frequent restarts caused by continuous deployment. DataBuild enables re-entrance through a simple stdout-based state mechanism:
- Jobs emit state by printing specially-marked JSON lines to stdout
- The wrapper intercepts these lines and stores them in the BEL (max 1KB per event)
- On restart, jobs receive their previous state via environment variable
- Jobs can then re-attach to external resources or resume from checkpoints
This approach is language-agnostic, requires no libraries, and works across all deployment platforms. The 1KB limit forces jobs to store pointers (job IDs, URLs) rather than data, maintaining proper separation of concerns.
### Write Collision Detection
Rather than preventing duplicate execution through locking, DataBuild detects and handles collisions after the fact. Jobs announce their intent to build a partition via BEL events. If another job is already building or has recently built the partition, the new job delegates to the existing effort. This approach embraces eventual consistency while ensuring correctness through idempotency.
## Operational Guidelines
### Deployment Strategy
New deployments start with a fresh generation, causing the BEL to reject events from the previous version. Old builds timeout and fail naturally, while active wants trigger fresh build attempts under the new generation. This approach requires no coordination between versions and handles both planned rollouts and unexpected restarts.
### Job Design Requirements
Jobs must be idempotent - producing the same output given the same inputs, regardless of how many times they run. This is achieved through:
- Checking if outputs already exist before starting work
- Using atomic writes (staging location then rename)
- Storing progress markers for resumption
- Re-attaching to external jobs when possible
## Risk Classes and Mitigations
### State Consistency Risks
- **Orphaned jobs**: Handled via heartbeat timeouts and generation tracking
- **Double execution**: Mitigated through idempotency and collision detection
- **Lost progress**: Addressed by want-level retries and job re-entrance
### Coordination Risks
- **Cross-graph desync**: GraphService API provides reliable event streaming with retry
- **Event ordering conflicts**: Generation-based fencing ensures single writer
- **Want satisfaction gaps**: Continuous evaluation ensures wants are eventually satisfied
### Resource Management Risks
- **Storage conflicts**: Atomic writes and existence checks prevent corruption
- **Compute waste**: Accepted trade-off for simplicity and correctness
- **Memory state loss**: System designed to be stateless with full recovery from BEL
## Key Insights
1. **Restarts are routine**: Continuous deployment means services restart frequently, often mid-build. The system must handle this gracefully without operator intervention.
2. **State through stdout**: Job state management via stdout/env vars provides a universal mechanism that works across languages and platforms without libraries.
3. **BEL as source of truth**: All coordination happens through the append-only Build Event Log, eliminating distributed state management complexity.
4. **Wants ensure completion**: By separating intent (wants) from execution (builds), the system can fail builds freely while guaranteeing eventual completion.
5. **Small state constraint**: The 1KB limit on job state forces proper design - jobs store references to external resources, not the resources themselves.
6. **Generation fencing**: Simple timestamp-based generations provide total ordering without coordination, preventing cross-version interference.
## Benefits
This design achieves resilience through simplicity. Rather than building complex resumption logic or distributed coordination, DataBuild embraces restarts as routine events. The system may run slower during transitions but maintains correctness without operator intervention. The same patterns work across local development, containers, and cloud platforms, providing a consistent mental model for users.

View file

@ -42,7 +42,6 @@ This creates several limitations:
- Add new endpoints to handle CLI build requests - Add new endpoints to handle CLI build requests
- Move analysis and execution logic from CLI to service - Move analysis and execution logic from CLI to service
- Service maintains orchestrator state and coordinates builds - Service maintains orchestrator state and coordinates builds
- Add real-time progress streaming for CLI consumption
2. **Add CLI-Specific API Endpoints** 2. **Add CLI-Specific API Endpoints**
- `/api/v1/cli/build` - Handle build requests from CLI - `/api/v1/cli/build` - Handle build requests from CLI