Sketch out some API stuff
Some checks are pending
/ setup (push) Waiting to run

This commit is contained in:
Stuart Axelbrooke 2025-11-20 01:03:00 -08:00
parent d42bddac90
commit eadd23eb63
7 changed files with 222 additions and 26 deletions

View file

@ -1,5 +1,5 @@
use crate::data_build_event::Event; use crate::data_build_event::Event;
use crate::{DataBuildEvent, WantDetail}; use crate::{CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, CreateWantRequest, CreateWantResponse, DataBuildEvent, GetTaintRequest, GetTaintResponse, GetWantRequest, GetWantResponse, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, TaintCreateEventV1, WantCancelEventV1, WantCreateEventV1, WantDetail};
use prost::Message; use prost::Message;
use rusqlite::Connection; use rusqlite::Connection;
use std::error::Error; use std::error::Error;
@ -169,6 +169,57 @@ impl<S: BELStorage + Debug> BuildEventLog<S> {
let idx = self.storage.append_event(event)?; let idx = self.storage.append_event(event)?;
Ok(idx) Ok(idx)
} }
// API methods
pub fn api_handle_list_wants(&self, req: ListWantsRequest) -> ListWantsResponse {
self.state.list_wants(&req)
}
pub fn api_handle_list_taints(&self, req: ListTaintsRequest) -> ListTaintsResponse {
self.state.list_taints(&req)
}
pub fn api_handle_list_partitions(&self, req: ListPartitionsRequest) -> ListPartitionsResponse {
self.state.list_partitions(&req)
}
pub fn api_handle_list_job_runs(&self, req: ListJobRunsRequest) -> ListJobRunsResponse {
self.state.list_job_runs(&req)
}
pub fn api_handle_want_create(&mut self, req: CreateWantRequest) -> Result<CreateWantResponse, DatabuildError> {
let ev: WantCreateEventV1 = req.into();
self.append_event(&ev.clone().into())?;
Ok(self.state.get_want(&ev.want_id).into())
}
pub fn api_handle_want_get(&self, req: GetWantRequest) -> GetWantResponse {
self.state.get_want(&req.want_id).into()
}
pub fn api_handle_want_cancel(&mut self, req: CancelWantRequest) -> Result<CancelWantResponse, DatabuildError> {
let ev: WantCancelEventV1 = req.into();
self.append_event(&ev.clone().into())?;
Ok(self.state.get_want(&ev.want_id).into())
}
pub fn api_handle_taint_create(&mut self, req: CreateTaintRequest) -> Result<CreateTaintResponse, DatabuildError> {
// TODO Need to do this hierarchically? A taint will impact downstream partitions also
todo!();
let ev: TaintCreateEventV1 = req.into();
self.append_event(&ev.clone().into())?;
Ok(self.state.get_taint(&ev.taint_id).into())
}
pub fn api_handle_taint_get(&self, req: GetTaintRequest) -> GetTaintResponse {
todo!()
}
// Not implemented yet
// pub fn api_handle_taint_cancel(&mut self, req: CancelWantRequest) -> CancelWantResponse {
// todo!()
// }
} }
impl Clone for BuildEventLog<MemoryBELStorage> { impl Clone for BuildEventLog<MemoryBELStorage> {

View file

@ -1,17 +1,10 @@
use crate::data_build_event::Event; use crate::data_build_event::Event;
use crate::data_deps::{WantTimestamps, missing_deps_to_want_events}; use crate::data_deps::{missing_deps_to_want_events, WantTimestamps};
use crate::job_run::{DepMissJobRun, SubProcessBackend}; use crate::job_run::{DepMissJobRun, SubProcessBackend};
use crate::util::{DatabuildError, current_timestamp}; use crate::util::{current_timestamp, DatabuildError};
use crate::{ use crate::{JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, PartitionRef, PartitionStatusCode, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode};
JobRunBufferEventV1, JobRunCancelEventV1, JobRunDetail, JobRunFailureEventV1,
JobRunHeartbeatEventV1, JobRunMissingDepsEventV1, JobRunStatusCode, JobRunSuccessEventV1,
ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse,
ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail,
PartitionRef, PartitionStatusCode, TaintCreateEventV1, TaintDeleteEventV1, TaintDetail,
WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode,
};
use rusqlite::ToSql;
use rusqlite::types::FromSql; use rusqlite::types::FromSql;
use rusqlite::ToSql;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::error::Error; use std::error::Error;
@ -76,7 +69,7 @@ impl BuildState {
Event::WantCancelV1(e) => self.handle_want_cancel(e), Event::WantCancelV1(e) => self.handle_want_cancel(e),
// Taint events // Taint events
Event::TaintCreateV1(e) => self.handle_taint_create(e), Event::TaintCreateV1(e) => self.handle_taint_create(e),
Event::TaintDeleteV1(e) => self.handle_taint_delete(e), Event::TaintCancelV1(e) => self.handle_taint_delete(e),
// Ruh roh! // Ruh roh!
_ => panic!("Unhandled event type! {:?}", event), _ => panic!("Unhandled event type! {:?}", event),
} }
@ -89,6 +82,7 @@ impl BuildState {
} }
fn handle_want_cancel(&mut self, event: &WantCancelEventV1) -> Result<(), DatabuildError> { fn handle_want_cancel(&mut self, event: &WantCancelEventV1) -> Result<(), DatabuildError> {
// TODO actually cancel in-progress job runs that no longer have a sponsoring want
if let Some(want) = self.wants.get_mut(&event.want_id) { if let Some(want) = self.wants.get_mut(&event.want_id) {
want.status = Some(WantStatusCode::WantCanceled.into()); want.status = Some(WantStatusCode::WantCanceled.into());
want.last_updated_timestamp = current_timestamp(); want.last_updated_timestamp = current_timestamp();
@ -266,7 +260,7 @@ impl BuildState {
todo!("...?") todo!("...?")
} }
fn handle_taint_delete(&mut self, event: &TaintDeleteEventV1) -> Result<(), DatabuildError> { fn handle_taint_delete(&mut self, event: &TaintCancelEventV1) -> Result<(), DatabuildError> {
todo!("...?") todo!("...?")
} }

View file

@ -44,7 +44,7 @@ message DataBuildEvent {
WantCancelEventV1 want_cancel_v1 = 18; WantCancelEventV1 want_cancel_v1 = 18;
// Taint events // Taint events
TaintCreateEventV1 taint_create_v1 = 19; TaintCreateEventV1 taint_create_v1 = 19;
TaintDeleteEventV1 taint_delete_v1 = 20; TaintCancelEventV1 taint_cancel_v1 = 20;
} }
} }
@ -155,7 +155,7 @@ message TaintCreateEventV1 {
EventSource source = 5; EventSource source = 5;
optional string comment = 6; optional string comment = 6;
} }
message TaintDeleteEventV1 { message TaintCancelEventV1 {
string taint_id = 1; string taint_id = 1;
EventSource source = 2; EventSource source = 2;
optional string comment = 3; optional string comment = 3;
@ -302,4 +302,54 @@ message ListJobRunsResponse {
uint64 match_count = 2; uint64 match_count = 2;
uint64 page = 3; uint64 page = 3;
uint64 page_size = 4; uint64 page_size = 4;
} }
message CreateWantRequest {
repeated PartitionRef partitions = 1;
uint64 data_timestamp = 2;
uint64 ttl_seconds = 3;
uint64 sla_seconds = 4;
EventSource source = 5;
optional string comment = 6;
}
message CreateWantResponse {
WantDetail data = 1;
}
message CancelWantRequest {
string want_id = 1;
EventSource source = 2;
optional string comment = 3;
}
message CancelWantResponse {
WantDetail data = 1;
}
message GetWantRequest {
string want_id = 1;
}
message GetWantResponse {
WantDetail data = 1;
}
message CreateTaintRequest {
// TODO
}
message CreateTaintResponse {
// TODO
}
message GetTaintRequest {
// TODO
}
message GetTaintResponse {
// TODO
}
// Not implemented yet
//message CancelTaintRequest {
// // TODO
//}
//message CancelTaintResponse {
// // TODO
//}

View file

@ -1,6 +1,7 @@
use uuid::Uuid;
use crate::data_build_event::Event; use crate::data_build_event::Event;
use crate::util::current_timestamp; use crate::util::current_timestamp;
use crate::{event_source, EventSource, JobRunBufferEventV1, JobRunDetail, JobRunStatus, JobRunStatusCode, JobTriggeredEvent, ManuallyTriggeredEvent, PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, WantAttributedPartitions, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode}; use crate::{event_source, CancelWantRequest, CancelWantResponse, CreateTaintRequest, CreateTaintResponse, CreateWantRequest, CreateWantResponse, EventSource, GetWantResponse, JobRunBufferEventV1, JobRunDetail, JobRunStatus, JobRunStatusCode, JobTriggeredEvent, ManuallyTriggeredEvent, PartitionDetail, PartitionRef, PartitionStatus, PartitionStatusCode, TaintCancelEventV1, TaintCreateEventV1, TaintDetail, WantAttributedPartitions, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode};
use crate::PartitionStatusCode::{PartitionFailed, PartitionLive}; use crate::PartitionStatusCode::{PartitionFailed, PartitionLive};
impl From<&WantCreateEventV1> for WantDetail { impl From<&WantCreateEventV1> for WantDetail {
@ -34,6 +35,16 @@ impl From<WantCancelEventV1> for Event {
Event::WantCancelV1(value) Event::WantCancelV1(value)
} }
} }
impl From<TaintCreateEventV1> for Event {
fn from(value: TaintCreateEventV1) -> Self {
Event::TaintCreateV1(value)
}
}
impl From<TaintCancelEventV1> for Event {
fn from(value: TaintCancelEventV1) -> Self {
Event::TaintCancelV1(value)
}
}
impl From<WantCreateEventV1> for WantAttributedPartitions { impl From<WantCreateEventV1> for WantAttributedPartitions {
fn from(value: WantCreateEventV1) -> Self { fn from(value: WantCreateEventV1) -> Self {
@ -146,4 +157,66 @@ impl From<&WantDetail> for WantAttributedPartitions {
partitions: value.partitions.clone(), partitions: value.partitions.clone(),
} }
} }
}
impl From<CreateWantRequest> for WantCreateEventV1 {
fn from(value: CreateWantRequest) -> Self {
WantCreateEventV1 {
want_id: Uuid::new_v4().into(),
partitions: value.partitions,
data_timestamp: value.data_timestamp,
ttl_seconds: value.ttl_seconds,
sla_seconds: value.sla_seconds,
source: value.source,
comment: value.comment,
}
}
}
impl Into<CreateWantResponse> for Option<WantDetail> {
fn into(self) -> CreateWantResponse {
CreateWantResponse {
data: self,
}
}
}
impl Into<GetWantResponse> for Option<WantDetail> {
fn into(self) -> GetWantResponse {
GetWantResponse {
data: self,
}
}
}
impl From<CancelWantRequest> for WantCancelEventV1 {
fn from(value: CancelWantRequest) -> Self {
WantCancelEventV1 {
want_id: value.want_id,
source: value.source,
comment: value.comment,
}
}
}
impl Into<CancelWantResponse> for Option<WantDetail> {
fn into(self) -> CancelWantResponse {
CancelWantResponse {
data: self,
}
}
}
impl From<CreateTaintRequest> for TaintCreateEventV1 {
fn from(value: CreateTaintRequest) -> Self {
todo!()
}
}
impl Into<CreateTaintResponse> for Option<TaintDetail> {
fn into(self) -> CreateTaintResponse {
CreateTaintResponse {
// TODO
}
}
} }

View file

@ -1,11 +1,12 @@
use crate::job_run::{NotStartedJobRun, SubProcessBackend}; use crate::job_run::{NotStartedJobRun, SubProcessBackend};
use crate::{PartitionRef, WantDetail}; use crate::{JobConfig, PartitionRef, WantDetail};
use regex::Regex; use regex::Regex;
use crate::util::DatabuildError;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct JobConfiguration { pub struct JobConfiguration {
pub label: String, pub label: String,
pub pattern: String, pub patterns: Vec<String>,
pub entry_point: String, pub entry_point: String,
} }
@ -19,8 +20,25 @@ impl JobConfiguration {
} }
pub fn matches(&self, refs: &PartitionRef) -> bool { pub fn matches(&self, refs: &PartitionRef) -> bool {
let regex = self.patterns.iter().any(|pattern| {
Regex::new(&self.pattern).expect(&format!("Invalid regex pattern: {}", self.pattern)); let regex =
regex.is_match(&refs.r#ref) Regex::new(&pattern).expect(&format!("Invalid regex pattern: {}", pattern));
regex.is_match(&refs.r#ref)
})
} }
} }
impl From<JobConfig> for JobConfiguration {
fn from(config: JobConfig) -> Self {
Self {
label: config.label,
patterns: config.partition_patterns,
entry_point: config.entrypoint,
}
}
}
pub fn parse_job_configuration(s: &str) -> Result<JobConfiguration, DatabuildError> {
let cfg: JobConfig = serde_json::from_str(s)?;
Ok(cfg.into())
}

View file

@ -317,12 +317,12 @@ mod tests {
jobs: vec![ jobs: vec![
JobConfiguration { JobConfiguration {
label: "alpha".to_string(), label: "alpha".to_string(),
pattern: "data/alpha".to_string(), patterns: vec!["data/alpha".to_string()],
entry_point: MockJobRun::bin_path(), entry_point: MockJobRun::bin_path(),
}, },
JobConfiguration { JobConfiguration {
label: "beta".to_string(), label: "beta".to_string(),
pattern: "data/beta".to_string(), patterns: vec!["data/beta".to_string()],
entry_point: MockJobRun::bin_path(), entry_point: MockJobRun::bin_path(),
}, },
], ],
@ -635,7 +635,7 @@ mod tests {
fn create_job_config(label: &str, pattern: &str) -> JobConfiguration { fn create_job_config(label: &str, pattern: &str) -> JobConfiguration {
JobConfiguration { JobConfiguration {
label: label.to_string(), label: label.to_string(),
pattern: pattern.to_string(), patterns: vec![pattern.to_string()],
entry_point: "test_entrypoint".to_string(), entry_point: "test_entrypoint".to_string(),
} }
} }

View file

@ -62,6 +62,16 @@ impl From<prost::EncodeError> for DatabuildError {
} }
} }
impl From<serde_json::Error> for DatabuildError {
fn from(err: serde_json::Error) -> Self {
Self {
msg: err.to_string(),
source: Some(Box::new(err)),
backtrace: maybe_backtrace()
}
}
}
impl From<String> for DatabuildError { impl From<String> for DatabuildError {
fn from(value: String) -> Self { fn from(value: String) -> Self {
Self::new(value) Self::new(value)