WIP I guess

This commit is contained in:
Stuart Axelbrooke 2025-10-11 11:13:27 -07:00
parent ea83610d35
commit f388f4d86d
9 changed files with 251 additions and 243 deletions

View file

@ -1 +1 @@
8.3.1 8.4.2

File diff suppressed because one or more lines are too long

View file

@ -154,18 +154,18 @@ impl BELStorage for SqliteBELStorage {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct BuildEventLog<S: BELStorage + Debug, B: BuildState + Default> { pub struct BuildEventLog<S: BELStorage + Debug> {
pub storage: S, pub storage: S,
pub state: B, pub state: BuildState,
} }
impl<S: BELStorage + Debug, B: BuildState + Default> BuildEventLog<S, B> { impl<S: BELStorage + Debug> BuildEventLog<S> {
pub fn new(storage: S, state: B) -> BuildEventLog<S, B> { pub fn new(storage: S, state: BuildState) -> BuildEventLog<S> {
BuildEventLog { storage, state } BuildEventLog { storage, state }
} }
pub fn append_event(&mut self, event: &Event) -> Result<u64, Box<dyn Error>> { pub fn append_event(&mut self, event: &Event) -> Result<u64, Box<dyn Error>> {
self.state.handle_event(&event)?; self.state.handle_event(&event);
let idx = self.storage.append_event(event)?; let idx = self.storage.append_event(event)?;
Ok(idx) Ok(idx)
} }
@ -180,7 +180,7 @@ mod tests {
use crate::build_event_log::{BELStorage, BuildEventLog, SqliteBELStorage}; use crate::build_event_log::{BELStorage, BuildEventLog, SqliteBELStorage};
use crate::data_build_event::Event; use crate::data_build_event::Event;
use crate::{PartitionRef, WantCreateEventV1}; use crate::{PartitionRef, WantCreateEventV1};
use crate::build_state::{BuildState, SqliteBuildState}; use crate::build_state::BuildState;
#[test] #[test]
fn test_hello() { fn test_hello() {
@ -191,7 +191,7 @@ mod tests {
fn test_sqlite_append_event() { fn test_sqlite_append_event() {
let storage = let storage =
SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage"); SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage");
let state = SqliteBuildState::default(); let state = BuildState::default();
let mut log = BuildEventLog { storage, state }; let mut log = BuildEventLog { storage, state };
let want_id = "sqlite_test_1234".to_string(); let want_id = "sqlite_test_1234".to_string();
@ -204,7 +204,7 @@ mod tests {
assert_eq!(events.len(), 0); assert_eq!(events.len(), 0);
// Verify want doesn't exist in state // Verify want doesn't exist in state
assert!(log.state.get_want(&want_id).expect("query failed").is_none()); assert!(log.state.get_want(&want_id).is_none());
// Append an event // Append an event
let mut e = WantCreateEventV1::default(); let mut e = WantCreateEventV1::default();
@ -239,12 +239,11 @@ mod tests {
// Verify state was updated // Verify state was updated
assert!( assert!(
log.state.get_want(&want_id).expect("query failed").is_some(), log.state.get_want(&want_id).is_some(),
"want_id not found in state" "want_id not found in state"
); );
assert_eq!( assert_eq!(
log.state.get_want(&want_id) log.state.get_want(&want_id)
.expect("Failed to get want from state")
.map(|want| want.want_id.clone()) .map(|want| want.want_id.clone())
.expect("state.wants want_id not found"), .expect("state.wants want_id not found"),
want_id, want_id,

View file

@ -1,153 +1,122 @@
use crate::data_build_event::Event; use crate::data_build_event::Event;
use crate::util::current_timestamp; use crate::util::current_timestamp;
use crate::{WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode}; use crate::{JobRunDetail, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, TaintDetail, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode};
use rusqlite::types::{FromSql, FromSqlResult, ToSqlOutput, ValueRef}; use rusqlite::types::{FromSql, FromSqlResult, ToSqlOutput, ValueRef};
use rusqlite::{Connection, ToSql}; use rusqlite::{Connection, ToSql};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::error::Error; use std::error::Error;
pub trait BuildState { #[derive(Debug)]
fn init(&mut self); pub struct BuildState {
fn handle_event(&mut self, event: &Event) -> Result<(), Box<dyn Error>>; wants: BTreeMap<String, WantDetail>,
taints: BTreeMap<String, TaintDetail>,
fn get_want(&self, want_id: &str) -> Result<Option<WantDetail>, Box<dyn Error>>; partitions: BTreeMap<String, PartitionDetail>,
job_runs: BTreeMap<String, JobRunDetail>,
} }
pub struct SqliteBuildState { impl Default for BuildState {
conn: Connection,
}
impl Default for SqliteBuildState {
fn default() -> Self { fn default() -> Self {
let conn = Connection::open_in_memory().unwrap(); Self {
let mut build_state = Self { conn }; wants: Default::default(),
build_state.init(); taints: Default::default(),
build_state partitions: Default::default(),
job_runs: Default::default(),
}
} }
} }
impl SqliteBuildState { impl BuildState {
fn want_create(&mut self, e: &WantCreateEventV1) -> Result<usize, Box<dyn Error>> {
self.conn
.execute(
r#"
INSERT INTO wants (id, data) values (?1, ?2)"#,
(e.want_id.as_str(), Json(WantDetail::from(e))),
)
.map_err(|e| e.into())
}
fn want_update(&mut self, want: &WantDetail) -> Result<usize, Box<dyn Error>> { pub fn handle_event(&mut self, event: &Event) -> () {
self.conn
.execute(
r#"
UPDATE wants SET data = ?1 where id = ?2"#,
(Json(want.clone()), want.want_id.as_str()),
)
.map_err(|e| e.into())
}
fn want_cancel(&mut self, e: &WantCancelEventV1) -> Result<usize, Box<dyn Error>> {
let mut want = self.get_want(e.want_id.as_str())?.unwrap();
want.status = Some(WantStatusCode::WantCanceled.into());
want.last_updated_timestamp = current_timestamp();
self.want_update(&want)
}
}
impl BuildState for SqliteBuildState {
fn init(&mut self) {
self.conn
.execute(
r#"
create table wants (id text primary key, data text not null);
"#,
[],
)
.unwrap();
}
fn handle_event(&mut self, event: &Event) -> Result<(), Box<dyn Error>> {
match event { match event {
Event::WantCreateV1(e) => { Event::WantCreateV1(e) => {
self.want_create(e)?; self.wants.insert(e.want_id.clone(), e.clone().into());
} }
Event::WantCancelV1(e) => { Event::WantCancelV1(e) => {
self.want_cancel(e)?; if let Some(want) = self.wants.get_mut(&e.want_id) {
want.status = Some(WantStatusCode::WantCanceled.into());
want.last_updated_timestamp = current_timestamp();
}
} }
_ => (), _ => (),
} }
Ok(())
} }
fn get_want(&self, want_id: &str) -> Result<Option<WantDetail>, Box<dyn Error>> { pub fn get_want(&self, want_id: &str) -> Option<WantDetail> {
let mut stmt = self.conn.prepare( self.wants.get(want_id).cloned()
r#" }
select data from wants where id = ?1 limit 1 pub fn get_taint(&self, taint_id: &str) -> Option<TaintDetail> {
"#, self.taints.get(taint_id).cloned()
)?; }
pub fn get_partition(&self, partition_id: &str) -> Option<PartitionDetail> {
self.partitions.get(partition_id).cloned()
}
pub fn get_job_run(&self, job_run_id: &str) -> Option<JobRunDetail> {
self.job_runs.get(job_run_id).cloned()
}
let mut rows = stmt.query_map([want_id], |row| row.get(0))?; pub fn list_wants(&self, request: &ListWantsRequest) -> ListWantsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
ListWantsResponse {
data: list_state_items(&self.wants, page, page_size),
match_count: self.wants.len() as u64,
page,
page_size,
}
}
match rows.next() { pub fn list_taints(&self, request: &ListTaintsRequest) -> ListTaintsResponse {
Some(result) => result let page = request.page.unwrap_or(0);
.map(|r: Json<WantDetail>| Some(r.0)) let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
.map_err(|e| e.into()), ListTaintsResponse {
None => Ok(None), data: list_state_items(&self.taints, page, page_size),
match_count: self.wants.len() as u64,
page,
page_size,
}
}
pub fn list_partitions(&self, request: &ListPartitionsRequest) -> ListPartitionsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
ListPartitionsResponse {
data: list_state_items(&self.partitions, page, page_size),
match_count: self.wants.len() as u64,
page,
page_size,
}
}
pub fn list_job_runs(&self, request: &ListJobRunsRequest) -> ListJobRunsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
ListJobRunsResponse {
data: list_state_items(&self.job_runs, page, page_size),
match_count: self.wants.len() as u64,
page,
page_size,
} }
} }
} }
pub struct Json<T>(T); fn list_state_items<T: Clone>(map: &BTreeMap<String, T>, page: u64, page_size: u64) -> Vec<T> {
// TODO when we add filtering, can we add it generically via some trait or filter object that can be provided?
impl<T: Serialize> ToSql for Json<T> { let start = page * page_size;
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> { let end = start + page_size;
Ok(ToSqlOutput::from( map.values().skip(start as usize).take(end as usize).cloned().collect()
serde_json::to_string(&self.0).expect("invalid json"),
))
}
} }
impl<T: for<'de> Deserialize<'de>> FromSql for Json<T> { mod consts {
fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> { pub const DEFAULT_PAGE_SIZE: u64 = 100;
let s = value.as_str()?;
println!("{:?}", s);
Ok(Json(serde_json::from_str(s).expect("invalid json")))
}
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
mod event_serde {
use crate::build_state::Json;
use crate::{PartitionRef, WantDetail};
use rusqlite::types::ToSql;
#[test]
fn test_should_serialize_data() {
let datum = WantDetail {
want_id: "1234".to_string(),
partitions: vec![PartitionRef {
r#ref: "myref".to_string(),
}],
data_timestamp: 0,
ttl_seconds: 0,
sla_seconds: 0,
source: None,
comment: None,
status: None,
last_updated_timestamp: 0,
};
let json = serde_json::to_string(&datum).unwrap();
println!("{}", json);
Json(datum).to_sql().expect("should serialize");
}
}
mod sqlite_build_state { mod sqlite_build_state {
mod want { mod want {
use crate::build_state::{BuildState, SqliteBuildState}; use crate::build_state::BuildState;
use crate::{WantCancelEventV1, WantCreateEventV1, WantDetail}; use crate::{WantCancelEventV1, WantCreateEventV1, WantDetail};
#[test] #[test]
@ -156,9 +125,9 @@ mod tests {
e.want_id = "1234".to_string(); e.want_id = "1234".to_string();
e.partitions = vec!["mypart".into()]; e.partitions = vec!["mypart".into()];
let mut state = SqliteBuildState::default(); let mut state = BuildState::default();
state.handle_event(&e.clone().into()).unwrap(); state.handle_event(&e.clone().into());
let want = state.get_want("1234").unwrap().unwrap(); let want = state.get_want("1234").unwrap();
let mut expected: WantDetail = e.into(); let mut expected: WantDetail = e.into();
// Into will set this field as current timestamp // Into will set this field as current timestamp
expected.last_updated_timestamp = want.last_updated_timestamp; expected.last_updated_timestamp = want.last_updated_timestamp;
@ -171,16 +140,19 @@ mod tests {
e.want_id = "1234".to_string(); e.want_id = "1234".to_string();
e.partitions = vec!["mypart".into()]; e.partitions = vec!["mypart".into()];
let mut state = SqliteBuildState::default(); let mut state = BuildState::default();
state.handle_event(&e.clone().into()).unwrap(); state.handle_event(&e.clone().into());
// Should be able to cancel // Should be able to cancel
let mut e = WantCancelEventV1::default(); let mut e = WantCancelEventV1::default();
e.want_id = "1234".to_string(); e.want_id = "1234".to_string();
state.handle_event(&e.clone().into()).unwrap(); state.handle_event(&e.clone().into());
let want = state.get_want("1234").unwrap().unwrap(); let want = state.get_want("1234").unwrap();
assert_eq!(want.status, Some(crate::WantStatusCode::WantCanceled.into())); assert_eq!(
want.status,
Some(crate::WantStatusCode::WantCanceled.into())
);
} }
} }
} }

View file

@ -197,3 +197,51 @@ message EventFilter {
// IDs of wants to get relevant events for // IDs of wants to get relevant events for
repeated string want_ids = 1; repeated string want_ids = 1;
} }
message ListWantsRequest {
optional uint64 page = 1;
optional uint64 page_size = 2;
// TODO add filters later
}
message ListWantsResponse {
repeated WantDetail data = 1;
uint64 match_count = 2;
uint64 page = 3;
uint64 page_size = 4;
}
message ListTaintsRequest {
optional uint64 page = 1;
optional uint64 page_size = 2;
// TODO add filters later
}
message ListTaintsResponse {
repeated TaintDetail data = 1;
uint64 match_count = 2;
uint64 page = 3;
uint64 page_size = 4;
}
message ListPartitionsRequest {
optional uint64 page = 1;
optional uint64 page_size = 2;
// TODO add filters later
}
message ListPartitionsResponse {
repeated PartitionDetail data = 1;
uint64 match_count = 2;
uint64 page = 3;
uint64 page_size = 4;
}
message ListJobRunsRequest {
optional uint64 page = 1;
optional uint64 page_size = 2;
// TODO add filters later
}
message ListJobRunsResponse {
repeated JobRunDetail data = 1;
uint64 match_count = 2;
uint64 page = 3;
uint64 page_size = 4;
}

View file

@ -2,7 +2,6 @@ mod build_event_log;
mod orchestrator; mod orchestrator;
mod job_run; mod job_run;
mod job; mod job;
mod bel_reducers;
mod util; mod util;
mod build_state; mod build_state;
mod event_transforms; mod event_transforms;

View file

@ -12,8 +12,8 @@ Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lig
the visitor pattern to monitor job exec progress and liveness, and adds the visitor pattern to monitor job exec progress and liveness, and adds
*/ */
struct Orchestrator<S: BELStorage + Debug, B: BuildState + Default> { struct Orchestrator<S: BELStorage + Debug> {
bel: BuildEventLog<S, B>, bel: BuildEventLog<S>,
job_runs: Vec<JobRunHandle>, job_runs: Vec<JobRunHandle>,
config: OrchestratorConfig, config: OrchestratorConfig,
} }
@ -59,7 +59,7 @@ struct GroupedWants {
unhandled_wants: Vec<WantDetail>, unhandled_wants: Vec<WantDetail>,
} }
impl<S: BELStorage + Debug, B: BuildState + Default> Orchestrator<S, B> { impl<S: BELStorage + Debug> Orchestrator<S> {
fn new(storage: S, config: OrchestratorConfig) -> Self { fn new(storage: S, config: OrchestratorConfig) -> Self {
Self { Self {
bel: BuildEventLog::new(storage, Default::default()), bel: BuildEventLog::new(storage, Default::default()),
@ -97,7 +97,7 @@ impl<S: BELStorage + Debug, B: BuildState + Default> Orchestrator<S, B> {
fn poll_wants(&mut self) -> Result<(), Box<dyn Error>> { fn poll_wants(&mut self) -> Result<(), Box<dyn Error>> {
// Collect unhandled wants, group by job that handles each partition, // Collect unhandled wants, group by job that handles each partition,
let grouped_wants = let grouped_wants =
Orchestrator::<S, B>::group_wants(&self.config, &self.bel.schedulable_wants()); Orchestrator::<S>::group_wants(&self.config, &self.bel.schedulable_wants());
if !grouped_wants.want_groups.is_empty() { if !grouped_wants.want_groups.is_empty() {
// All wants must be mapped to jobs that can be handled // All wants must be mapped to jobs that can be handled
@ -195,7 +195,7 @@ mod tests {
use super::super::*; use super::super::*;
use crate::build_event_log::MemoryBELStorage; use crate::build_event_log::MemoryBELStorage;
use crate::{PartitionRef, WantDetail}; use crate::{PartitionRef, WantDetail};
use crate::build_state::SqliteBuildState; use crate::build_state::BuildState;
fn create_job_config(label: &str, pattern: &str) -> JobConfiguration { fn create_job_config(label: &str, pattern: &str) -> JobConfiguration {
JobConfiguration { JobConfiguration {
@ -229,7 +229,7 @@ mod tests {
let config = OrchestratorConfig { jobs: vec![] }; let config = OrchestratorConfig { jobs: vec![] };
let wants = vec![]; let wants = vec![];
let result = Orchestrator::<MemoryBELStorage, SqliteBuildState>::group_wants(&config, &wants); let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
assert!(result.want_groups.is_empty()); assert!(result.want_groups.is_empty());
assert!(result.unhandled_wants.is_empty()); assert!(result.unhandled_wants.is_empty());
@ -244,7 +244,7 @@ mod tests {
let want = create_want_detail("want1", vec!["partition1"]); let want = create_want_detail("want1", vec!["partition1"]);
let wants = vec![want.clone()]; let wants = vec![want.clone()];
let result = Orchestrator::<MemoryBELStorage, SqliteBuildState>::group_wants(&config, &wants); let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
assert!(result.unhandled_wants.is_empty()); assert!(result.unhandled_wants.is_empty());
assert_eq!(result.want_groups.len(), 1); assert_eq!(result.want_groups.len(), 1);
@ -262,7 +262,7 @@ mod tests {
let want = create_want_detail("want1", vec!["different_partition"]); let want = create_want_detail("want1", vec!["different_partition"]);
let wants = vec![want.clone()]; let wants = vec![want.clone()];
let result = Orchestrator::<MemoryBELStorage, SqliteBuildState>::group_wants(&config, &wants); let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
assert_eq!(result.unhandled_wants.len(), 1); assert_eq!(result.unhandled_wants.len(), 1);
assert_eq!(result.unhandled_wants[0].want_id, "want1"); assert_eq!(result.unhandled_wants[0].want_id, "want1");
@ -282,7 +282,7 @@ mod tests {
let want3 = create_want_detail("want3", vec!["pattern2_partition"]); let want3 = create_want_detail("want3", vec!["pattern2_partition"]);
let wants = vec![want1, want2, want3]; let wants = vec![want1, want2, want3];
let result = Orchestrator::<MemoryBELStorage, SqliteBuildState>::group_wants(&config, &wants); let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
assert!(result.unhandled_wants.is_empty()); assert!(result.unhandled_wants.is_empty());
assert_eq!(result.want_groups.len(), 2); assert_eq!(result.want_groups.len(), 2);

View file

@ -0,0 +1,13 @@
The core question: what defines partition identity?
- Hive partitioning is great, but making every source of variation or dimension of config a partition col would be ridiculous for real applications
- Why would it be ridiculous? Examples:
- We use many config dims: targeting goal, modeling strategy, per-modeling strategy config (imagine flattening a struct tree), etc. Then we add another dimension to the config for an experiment. Is that a "new dataset"?
- "all config goes through partitions" means that every config change has to be a dataset or partition pattern change.
- Maybe this is fine, actually, as long as authors make sensible defaults a non-problem? (e.g. think of schema evolution)
- In cases where config is internally resolved by the job,
- Whose to say the partition isn't a struct itself? `s3://my/dataset/date=2025-01-01` could also be `{"dataset": "s3://my/dataset", "date": "2025-01-01"}`, or even `{"kind": "icecream", "meta": {"toppings": ["sprinkles"]}}` (though a clear requirement is that databuild does not parse partition refs)
- Should we consider the bazel config approach, where identity is the whole set of config, manifest as a hash? (not human-readable)
-

View file

@ -10,3 +10,9 @@
- How do you deal with DAG run identity under changing DAG definition? - How do you deal with DAG run identity under changing DAG definition?
- These questions are all red herrings. We don't care about the DAG definition - we care about the data we want to produce. - These questions are all red herrings. We don't care about the DAG definition - we care about the data we want to produce.
- We should instead declare what partitions we want, and iteratively propagate - We should instead declare what partitions we want, and iteratively propagate
- Inter-job invariants suck (simplify)
- What about sense plan act? Rebuttal is "sense produces data"? How would launchpad under this work in a way that didn't suck?
- Is there a hot take to make about config? "customer X is targeting Y" is a reality of modern apps, bazel-esque config is
- Should this be under `#partition-identity` or something?