From eadd23eb63f0bab104c5aa56a2819400443738a7 Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Thu, 20 Nov 2025 01:03:00 -0800 Subject: [PATCH] Sketch out some API stuff --- databuild/build_event_log.rs | 53 ++++++++++++++++++++++++- databuild/build_state.rs | 20 ++++------ databuild/databuild.proto | 56 ++++++++++++++++++++++++-- databuild/event_transforms.rs | 75 ++++++++++++++++++++++++++++++++++- databuild/job.rs | 28 ++++++++++--- databuild/orchestrator.rs | 6 +-- databuild/util.rs | 10 +++++ 7 files changed, 222 insertions(+), 26 deletions(-) diff --git a/databuild/build_event_log.rs b/databuild/build_event_log.rs index 8b1ba3a..fcaa321 100644 --- a/databuild/build_event_log.rs +++ b/databuild/build_event_log.rs @@ -1,5 +1,5 @@ use crate::data_build_event::Event; -use crate::{DataBuildEvent, WantDetail}; +use crate::{CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, CreateWantRequest, CreateWantResponse, DataBuildEvent, GetTaintRequest, GetTaintResponse, GetWantRequest, GetWantResponse, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, TaintCreateEventV1, WantCancelEventV1, WantCreateEventV1, WantDetail}; use prost::Message; use rusqlite::Connection; use std::error::Error; @@ -169,6 +169,57 @@ impl BuildEventLog { let idx = self.storage.append_event(event)?; Ok(idx) } + + + // API methods + pub fn api_handle_list_wants(&self, req: ListWantsRequest) -> ListWantsResponse { + self.state.list_wants(&req) + } + + pub fn api_handle_list_taints(&self, req: ListTaintsRequest) -> ListTaintsResponse { + self.state.list_taints(&req) + } + + pub fn api_handle_list_partitions(&self, req: ListPartitionsRequest) -> ListPartitionsResponse { + self.state.list_partitions(&req) + } + + pub fn api_handle_list_job_runs(&self, req: ListJobRunsRequest) -> ListJobRunsResponse { + self.state.list_job_runs(&req) + } + + pub fn api_handle_want_create(&mut self, req: CreateWantRequest) -> Result { + let ev: WantCreateEventV1 = req.into(); + self.append_event(&ev.clone().into())?; + Ok(self.state.get_want(&ev.want_id).into()) + } + + pub fn api_handle_want_get(&self, req: GetWantRequest) -> GetWantResponse { + self.state.get_want(&req.want_id).into() + } + + pub fn api_handle_want_cancel(&mut self, req: CancelWantRequest) -> Result { + let ev: WantCancelEventV1 = req.into(); + self.append_event(&ev.clone().into())?; + Ok(self.state.get_want(&ev.want_id).into()) + } + + pub fn api_handle_taint_create(&mut self, req: CreateTaintRequest) -> Result { + // TODO Need to do this hierarchically? A taint will impact downstream partitions also + todo!(); + let ev: TaintCreateEventV1 = req.into(); + self.append_event(&ev.clone().into())?; + Ok(self.state.get_taint(&ev.taint_id).into()) + } + + pub fn api_handle_taint_get(&self, req: GetTaintRequest) -> GetTaintResponse { + todo!() + } + + // Not implemented yet + // pub fn api_handle_taint_cancel(&mut self, req: CancelWantRequest) -> CancelWantResponse { + // todo!() + // } } impl Clone for BuildEventLog { diff --git a/databuild/build_state.rs b/databuild/build_state.rs index 2a87667..3cb2ec7 100644 --- a/databuild/build_state.rs +++ b/databuild/build_state.rs @@ -1,17 +1,10 @@ use crate::data_build_event::Event; -use crate::data_deps::{WantTimestamps, missing_deps_to_want_events}; +use crate::data_deps::{missing_deps_to_want_events, WantTimestamps}; use crate::job_run::{DepMissJobRun, SubProcessBackend}; -use crate::util::{DatabuildError, current_timestamp}; -use crate::{ - JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1, - JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1, - ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, - ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, - PartitionRef, PartitionStatusCode, TaintCreateEventV1, TaintDeleteEventV1, TaintDetail, - WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode, -}; -use rusqlite::ToSql; +use crate::util::{current_timestamp, DatabuildError}; +use crate::{JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, PartitionRef, PartitionStatusCode, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode}; use rusqlite::types::FromSql; +use rusqlite::ToSql; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::error::Error; @@ -76,7 +69,7 @@ impl BuildState { Event::WantCancelV1(e) => self.handle_want_cancel(e), // Taint events Event::TaintCreateV1(e) => self.handle_taint_create(e), - Event::TaintDeleteV1(e) => self.handle_taint_delete(e), + Event::TaintCancelV1(e) => self.handle_taint_delete(e), // Ruh roh! _ => panic!("Unhandled event type! {:?}", event), } @@ -89,6 +82,7 @@ impl BuildState { } fn handle_want_cancel(&mut self, event: &WantCancelEventV1) -> Result<(), DatabuildError> { + // TODO actually cancel in-progress job runs that no longer have a sponsoring want if let Some(want) = self.wants.get_mut(&event.want_id) { want.status = Some(WantStatusCode::WantCanceled.into()); want.last_updated_timestamp = current_timestamp(); @@ -266,7 +260,7 @@ impl BuildState { todo!("...?") } - fn handle_taint_delete(&mut self, event: &TaintDeleteEventV1) -> Result<(), DatabuildError> { + fn handle_taint_delete(&mut self, event: &TaintCancelEventV1) -> Result<(), DatabuildError> { todo!("...?") } diff --git a/databuild/databuild.proto b/databuild/databuild.proto index 076b87b..17cdc92 100644 --- a/databuild/databuild.proto +++ b/databuild/databuild.proto @@ -44,7 +44,7 @@ message DataBuildEvent { WantCancelEventV1 want_cancel_v1 = 18; // Taint events TaintCreateEventV1 taint_create_v1 = 19; - TaintDeleteEventV1 taint_delete_v1 = 20; + TaintCancelEventV1 taint_cancel_v1 = 20; } } @@ -155,7 +155,7 @@ message TaintCreateEventV1 { EventSource source = 5; optional string comment = 6; } -message TaintDeleteEventV1 { +message TaintCancelEventV1 { string taint_id = 1; EventSource source = 2; optional string comment = 3; @@ -302,4 +302,54 @@ message ListJobRunsResponse { uint64 match_count = 2; uint64 page = 3; uint64 page_size = 4; -} \ No newline at end of file +} + +message CreateWantRequest { + repeated PartitionRef partitions = 1; + uint64 data_timestamp = 2; + uint64 ttl_seconds = 3; + uint64 sla_seconds = 4; + EventSource source = 5; + optional string comment = 6; +} +message CreateWantResponse { + WantDetail data = 1; +} + +message CancelWantRequest { + string want_id = 1; + EventSource source = 2; + optional string comment = 3; +} +message CancelWantResponse { + WantDetail data = 1; +} + +message GetWantRequest { + string want_id = 1; +} +message GetWantResponse { + WantDetail data = 1; +} + +message CreateTaintRequest { + // TODO +} +message CreateTaintResponse { + // TODO +} + +message GetTaintRequest { + // TODO +} +message GetTaintResponse { + // TODO +} + +// Not implemented yet +//message CancelTaintRequest { +// // TODO +//} +//message CancelTaintResponse { +// // TODO +//} diff --git a/databuild/event_transforms.rs b/databuild/event_transforms.rs index 8a935da..5165b13 100644 --- a/databuild/event_transforms.rs +++ b/databuild/event_transforms.rs @@ -1,6 +1,7 @@ +use uuid::Uuid; use crate::data_build_event::Event; use crate::util::current_timestamp; -use crate::{event_source, EventSource, JobRunBufferEventV1, JobRunDetail, JobRunStatus, JobRunStatusCode, JobTriggeredEvent, ManuallyTriggeredEvent, PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, WantAttributedPartitions, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode}; +use crate::{event_source, CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, CreateWantRequest, CreateWantResponse, EventSource, GetWantResponse, JobRunBufferEventV1, JobRunDetail, JobRunStatus, JobRunStatusCode, JobTriggeredEvent, ManuallyTriggeredEvent, PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantAttributedPartitions, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode}; use crate::PartitionStatusCode::{PartitionFailed, PartitionLive}; impl From<&WantCreateEventV1> for WantDetail { @@ -34,6 +35,16 @@ impl From for Event { Event::WantCancelV1(value) } } +impl From for Event { + fn from(value: TaintCreateEventV1) -> Self { + Event::TaintCreateV1(value) + } +} +impl From for Event { + fn from(value: TaintCancelEventV1) -> Self { + Event::TaintCancelV1(value) + } +} impl From for WantAttributedPartitions { fn from(value: WantCreateEventV1) -> Self { @@ -146,4 +157,66 @@ impl From<&WantDetail> for WantAttributedPartitions { partitions: value.partitions.clone(), } } +} + +impl From for WantCreateEventV1 { + fn from(value: CreateWantRequest) -> Self { + WantCreateEventV1 { + want_id: Uuid::new_v4().into(), + partitions: value.partitions, + data_timestamp: value.data_timestamp, + ttl_seconds: value.ttl_seconds, + sla_seconds: value.sla_seconds, + source: value.source, + comment: value.comment, + } + } +} + +impl Into for Option { + fn into(self) -> CreateWantResponse { + CreateWantResponse { + data: self, + } + } +} + +impl Into for Option { + fn into(self) -> GetWantResponse { + GetWantResponse { + data: self, + } + } +} + +impl From for WantCancelEventV1 { + fn from(value: CancelWantRequest) -> Self { + WantCancelEventV1 { + want_id: value.want_id, + source: value.source, + comment: value.comment, + } + } +} + +impl Into for Option { + fn into(self) -> CancelWantResponse { + CancelWantResponse { + data: self, + } + } +} + +impl From for TaintCreateEventV1 { + fn from(value: CreateTaintRequest) -> Self { + todo!() + } +} + +impl Into for Option { + fn into(self) -> CreateTaintResponse { + CreateTaintResponse { + // TODO + } + } } \ No newline at end of file diff --git a/databuild/job.rs b/databuild/job.rs index 357b8c4..2a70323 100644 --- a/databuild/job.rs +++ b/databuild/job.rs @@ -1,11 +1,12 @@ use crate::job_run::{NotStartedJobRun, SubProcessBackend}; -use crate::{PartitionRef, WantDetail}; +use crate::{JobConfig, PartitionRef, WantDetail}; use regex::Regex; +use crate::util::DatabuildError; #[derive(Debug, Clone)] pub struct JobConfiguration { pub label: String, - pub pattern: String, + pub patterns: Vec, pub entry_point: String, } @@ -19,8 +20,25 @@ impl JobConfiguration { } pub fn matches(&self, refs: &PartitionRef) -> bool { - let regex = - Regex::new(&self.pattern).expect(&format!("Invalid regex pattern: {}", self.pattern)); - regex.is_match(&refs.r#ref) + self.patterns.iter().any(|pattern| { + let regex = + Regex::new(&pattern).expect(&format!("Invalid regex pattern: {}", pattern)); + regex.is_match(&refs.r#ref) + }) } } + +impl From for JobConfiguration { + fn from(config: JobConfig) -> Self { + Self { + label: config.label, + patterns: config.partition_patterns, + entry_point: config.entrypoint, + } + } +} + +pub fn parse_job_configuration(s: &str) -> Result { + let cfg: JobConfig = serde_json::from_str(s)?; + Ok(cfg.into()) +} diff --git a/databuild/orchestrator.rs b/databuild/orchestrator.rs index 22d3e9e..084afaa 100644 --- a/databuild/orchestrator.rs +++ b/databuild/orchestrator.rs @@ -317,12 +317,12 @@ mod tests { jobs: vec![ JobConfiguration { label: "alpha".to_string(), - pattern: "data/alpha".to_string(), + patterns: vec!["data/alpha".to_string()], entry_point: MockJobRun::bin_path(), }, JobConfiguration { label: "beta".to_string(), - pattern: "data/beta".to_string(), + patterns: vec!["data/beta".to_string()], entry_point: MockJobRun::bin_path(), }, ], @@ -635,7 +635,7 @@ mod tests { fn create_job_config(label: &str, pattern: &str) -> JobConfiguration { JobConfiguration { label: label.to_string(), - pattern: pattern.to_string(), + patterns: vec![pattern.to_string()], entry_point: "test_entrypoint".to_string(), } } diff --git a/databuild/util.rs b/databuild/util.rs index b8797b5..826eaec 100644 --- a/databuild/util.rs +++ b/databuild/util.rs @@ -62,6 +62,16 @@ impl From for DatabuildError { } } +impl From for DatabuildError { + fn from(err: serde_json::Error) -> Self { + Self { + msg: err.to_string(), + source: Some(Box::new(err)), + backtrace: maybe_backtrace() + } + } +} + impl From for DatabuildError { fn from(value: String) -> Self { Self::new(value)