Compare commits

..

3 commits

Author SHA1 Message Date
022868b7b0 Update test definition in orchestrator.rs
Some checks failed
/ setup (push) Has been cancelled
2025-10-12 13:35:07 -07:00
4e28b6048e update "why not push" 2025-10-12 12:57:41 -07:00
f388f4d86d WIP I guess 2025-10-11 11:13:27 -07:00
10 changed files with 331 additions and 253 deletions

View file

@ -1 +1 @@
8.3.1
8.4.2

File diff suppressed because one or more lines are too long

View file

@ -154,18 +154,18 @@ impl BELStorage for SqliteBELStorage {
}
#[derive(Debug)]
pub struct BuildEventLog<S: BELStorage + Debug, B: BuildState + Default> {
pub struct BuildEventLog<S: BELStorage + Debug> {
pub storage: S,
pub state: B,
pub state: BuildState,
}
impl<S: BELStorage + Debug, B: BuildState + Default> BuildEventLog<S, B> {
pub fn new(storage: S, state: B) -> BuildEventLog<S, B> {
impl<S: BELStorage + Debug> BuildEventLog<S> {
pub fn new(storage: S, state: BuildState) -> BuildEventLog<S> {
BuildEventLog { storage, state }
}
pub fn append_event(&mut self, event: &Event) -> Result<u64, Box<dyn Error>> {
self.state.handle_event(&event)?;
self.state.handle_event(&event);
let idx = self.storage.append_event(event)?;
Ok(idx)
}
@ -180,7 +180,7 @@ mod tests {
use crate::build_event_log::{BELStorage, BuildEventLog, SqliteBELStorage};
use crate::data_build_event::Event;
use crate::{PartitionRef, WantCreateEventV1};
use crate::build_state::{BuildState, SqliteBuildState};
use crate::build_state::BuildState;
#[test]
fn test_hello() {
@ -191,7 +191,7 @@ mod tests {
fn test_sqlite_append_event() {
let storage =
SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage");
let state = SqliteBuildState::default();
let state = BuildState::default();
let mut log = BuildEventLog { storage, state };
let want_id = "sqlite_test_1234".to_string();
@ -204,7 +204,7 @@ mod tests {
assert_eq!(events.len(), 0);
// Verify want doesn't exist in state
assert!(log.state.get_want(&want_id).expect("query failed").is_none());
assert!(log.state.get_want(&want_id).is_none());
// Append an event
let mut e = WantCreateEventV1::default();
@ -239,12 +239,11 @@ mod tests {
// Verify state was updated
assert!(
log.state.get_want(&want_id).expect("query failed").is_some(),
log.state.get_want(&want_id).is_some(),
"want_id not found in state"
);
assert_eq!(
log.state.get_want(&want_id)
.expect("Failed to get want from state")
.map(|want| want.want_id.clone())
.expect("state.wants want_id not found"),
want_id,

View file

@ -1,153 +1,122 @@
use crate::data_build_event::Event;
use crate::util::current_timestamp;
use crate::{WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode};
use crate::{JobRunDetail, ListJobRunsRequest, ListJobRunsResponse, ListPartitionsRequest, ListPartitionsResponse, ListTaintsRequest, ListTaintsResponse, ListWantsRequest, ListWantsResponse, PartitionDetail, TaintDetail, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode};
use rusqlite::types::{FromSql, FromSqlResult, ToSqlOutput, ValueRef};
use rusqlite::{Connection, ToSql};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::error::Error;
pub trait BuildState {
fn init(&mut self);
fn handle_event(&mut self, event: &Event) -> Result<(), Box<dyn Error>>;
fn get_want(&self, want_id: &str) -> Result<Option<WantDetail>, Box<dyn Error>>;
#[derive(Debug)]
pub struct BuildState {
wants: BTreeMap<String, WantDetail>,
taints: BTreeMap<String, TaintDetail>,
partitions: BTreeMap<String, PartitionDetail>,
job_runs: BTreeMap<String, JobRunDetail>,
}
pub struct SqliteBuildState {
conn: Connection,
}
impl Default for SqliteBuildState {
impl Default for BuildState {
fn default() -> Self {
let conn = Connection::open_in_memory().unwrap();
let mut build_state = Self { conn };
build_state.init();
build_state
Self {
wants: Default::default(),
taints: Default::default(),
partitions: Default::default(),
job_runs: Default::default(),
}
}
}
impl SqliteBuildState {
fn want_create(&mut self, e: &WantCreateEventV1) -> Result<usize, Box<dyn Error>> {
self.conn
.execute(
r#"
INSERT INTO wants (id, data) values (?1, ?2)"#,
(e.want_id.as_str(), Json(WantDetail::from(e))),
)
.map_err(|e| e.into())
}
impl BuildState {
fn want_update(&mut self, want: &WantDetail) -> Result<usize, Box<dyn Error>> {
self.conn
.execute(
r#"
UPDATE wants SET data = ?1 where id = ?2"#,
(Json(want.clone()), want.want_id.as_str()),
)
.map_err(|e| e.into())
}
fn want_cancel(&mut self, e: &WantCancelEventV1) -> Result<usize, Box<dyn Error>> {
let mut want = self.get_want(e.want_id.as_str())?.unwrap();
want.status = Some(WantStatusCode::WantCanceled.into());
want.last_updated_timestamp = current_timestamp();
self.want_update(&want)
}
}
impl BuildState for SqliteBuildState {
fn init(&mut self) {
self.conn
.execute(
r#"
create table wants (id text primary key, data text not null);
"#,
[],
)
.unwrap();
}
fn handle_event(&mut self, event: &Event) -> Result<(), Box<dyn Error>> {
pub fn handle_event(&mut self, event: &Event) -> () {
match event {
Event::WantCreateV1(e) => {
self.want_create(e)?;
self.wants.insert(e.want_id.clone(), e.clone().into());
}
Event::WantCancelV1(e) => {
self.want_cancel(e)?;
if let Some(want) = self.wants.get_mut(&e.want_id) {
want.status = Some(WantStatusCode::WantCanceled.into());
want.last_updated_timestamp = current_timestamp();
}
}
_ => (),
}
Ok(())
}
fn get_want(&self, want_id: &str) -> Result<Option<WantDetail>, Box<dyn Error>> {
let mut stmt = self.conn.prepare(
r#"
select data from wants where id = ?1 limit 1
"#,
)?;
pub fn get_want(&self, want_id: &str) -> Option<WantDetail> {
self.wants.get(want_id).cloned()
}
pub fn get_taint(&self, taint_id: &str) -> Option<TaintDetail> {
self.taints.get(taint_id).cloned()
}
pub fn get_partition(&self, partition_id: &str) -> Option<PartitionDetail> {
self.partitions.get(partition_id).cloned()
}
pub fn get_job_run(&self, job_run_id: &str) -> Option<JobRunDetail> {
self.job_runs.get(job_run_id).cloned()
}
let mut rows = stmt.query_map([want_id], |row| row.get(0))?;
pub fn list_wants(&self, request: &ListWantsRequest) -> ListWantsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
ListWantsResponse {
data: list_state_items(&self.wants, page, page_size),
match_count: self.wants.len() as u64,
page,
page_size,
}
}
match rows.next() {
Some(result) => result
.map(|r: Json<WantDetail>| Some(r.0))
.map_err(|e| e.into()),
None => Ok(None),
pub fn list_taints(&self, request: &ListTaintsRequest) -> ListTaintsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
ListTaintsResponse {
data: list_state_items(&self.taints, page, page_size),
match_count: self.wants.len() as u64,
page,
page_size,
}
}
pub fn list_partitions(&self, request: &ListPartitionsRequest) -> ListPartitionsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
ListPartitionsResponse {
data: list_state_items(&self.partitions, page, page_size),
match_count: self.wants.len() as u64,
page,
page_size,
}
}
pub fn list_job_runs(&self, request: &ListJobRunsRequest) -> ListJobRunsResponse {
let page = request.page.unwrap_or(0);
let page_size = request.page_size.unwrap_or(consts::DEFAULT_PAGE_SIZE);
ListJobRunsResponse {
data: list_state_items(&self.job_runs, page, page_size),
match_count: self.wants.len() as u64,
page,
page_size,
}
}
}
pub struct Json<T>(T);
impl<T: Serialize> ToSql for Json<T> {
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
Ok(ToSqlOutput::from(
serde_json::to_string(&self.0).expect("invalid json"),
))
}
fn list_state_items<T: Clone>(map: &BTreeMap<String, T>, page: u64, page_size: u64) -> Vec<T> {
// TODO when we add filtering, can we add it generically via some trait or filter object that can be provided?
let start = page * page_size;
let end = start + page_size;
map.values().skip(start as usize).take(end as usize).cloned().collect()
}
impl<T: for<'de> Deserialize<'de>> FromSql for Json<T> {
fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
let s = value.as_str()?;
println!("{:?}", s);
Ok(Json(serde_json::from_str(s).expect("invalid json")))
}
mod consts {
pub const DEFAULT_PAGE_SIZE: u64 = 100;
}
#[cfg(test)]
mod tests {
mod event_serde {
use crate::build_state::Json;
use crate::{PartitionRef, WantDetail};
use rusqlite::types::ToSql;
#[test]
fn test_should_serialize_data() {
let datum = WantDetail {
want_id: "1234".to_string(),
partitions: vec![PartitionRef {
r#ref: "myref".to_string(),
}],
data_timestamp: 0,
ttl_seconds: 0,
sla_seconds: 0,
source: None,
comment: None,
status: None,
last_updated_timestamp: 0,
};
let json = serde_json::to_string(&datum).unwrap();
println!("{}", json);
Json(datum).to_sql().expect("should serialize");
}
}
mod sqlite_build_state {
mod want {
use crate::build_state::{BuildState, SqliteBuildState};
use crate::build_state::BuildState;
use crate::{WantCancelEventV1, WantCreateEventV1, WantDetail};
#[test]
@ -156,9 +125,9 @@ mod tests {
e.want_id = "1234".to_string();
e.partitions = vec!["mypart".into()];
let mut state = SqliteBuildState::default();
state.handle_event(&e.clone().into()).unwrap();
let want = state.get_want("1234").unwrap().unwrap();
let mut state = BuildState::default();
state.handle_event(&e.clone().into());
let want = state.get_want("1234").unwrap();
let mut expected: WantDetail = e.into();
// Into will set this field as current timestamp
expected.last_updated_timestamp = want.last_updated_timestamp;
@ -171,16 +140,19 @@ mod tests {
e.want_id = "1234".to_string();
e.partitions = vec!["mypart".into()];
let mut state = SqliteBuildState::default();
state.handle_event(&e.clone().into()).unwrap();
let mut state = BuildState::default();
state.handle_event(&e.clone().into());
// Should be able to cancel
let mut e = WantCancelEventV1::default();
e.want_id = "1234".to_string();
state.handle_event(&e.clone().into()).unwrap();
let want = state.get_want("1234").unwrap().unwrap();
state.handle_event(&e.clone().into());
let want = state.get_want("1234").unwrap();
assert_eq!(want.status, Some(crate::WantStatusCode::WantCanceled.into()));
assert_eq!(
want.status,
Some(crate::WantStatusCode::WantCanceled.into())
);
}
}
}

View file

@ -197,3 +197,51 @@ message EventFilter {
// IDs of wants to get relevant events for
repeated string want_ids = 1;
}
message ListWantsRequest {
optional uint64 page = 1;
optional uint64 page_size = 2;
// TODO add filters later
}
message ListWantsResponse {
repeated WantDetail data = 1;
uint64 match_count = 2;
uint64 page = 3;
uint64 page_size = 4;
}
message ListTaintsRequest {
optional uint64 page = 1;
optional uint64 page_size = 2;
// TODO add filters later
}
message ListTaintsResponse {
repeated TaintDetail data = 1;
uint64 match_count = 2;
uint64 page = 3;
uint64 page_size = 4;
}
message ListPartitionsRequest {
optional uint64 page = 1;
optional uint64 page_size = 2;
// TODO add filters later
}
message ListPartitionsResponse {
repeated PartitionDetail data = 1;
uint64 match_count = 2;
uint64 page = 3;
uint64 page_size = 4;
}
message ListJobRunsRequest {
optional uint64 page = 1;
optional uint64 page_size = 2;
// TODO add filters later
}
message ListJobRunsResponse {
repeated JobRunDetail data = 1;
uint64 match_count = 2;
uint64 page = 3;
uint64 page_size = 4;
}

View file

@ -2,7 +2,6 @@ mod build_event_log;
mod orchestrator;
mod job_run;
mod job;
mod bel_reducers;
mod util;
mod build_state;
mod event_transforms;

View file

@ -5,15 +5,14 @@ use crate::{PartitionRef, WantDetail};
use std::collections::HashMap;
use std::error::Error;
use std::fmt::Debug;
use crate::build_state::BuildState;
/**
Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads +
the visitor pattern to monitor job exec progress and liveness, and adds
*/
struct Orchestrator<S: BELStorage + Debug, B: BuildState + Default> {
bel: BuildEventLog<S, B>,
struct Orchestrator<S: BELStorage + Debug> {
bel: BuildEventLog<S>,
job_runs: Vec<JobRunHandle>,
config: OrchestratorConfig,
}
@ -59,7 +58,7 @@ struct GroupedWants {
unhandled_wants: Vec<WantDetail>,
}
impl<S: BELStorage + Debug, B: BuildState + Default> Orchestrator<S, B> {
impl<S: BELStorage + Debug> Orchestrator<S> {
fn new(storage: S, config: OrchestratorConfig) -> Self {
Self {
bel: BuildEventLog::new(storage, Default::default()),
@ -69,7 +68,7 @@ impl<S: BELStorage + Debug, B: BuildState + Default> Orchestrator<S, B> {
}
/** Continuously invoked function to watch job run status */
fn poll_jobs(&mut self) -> Result<(), Box<dyn Error>> {
fn poll_job_runs(&mut self) -> Result<(), Box<dyn Error>> {
// Visit existing jobs, remove completed
self.job_runs.retain_mut(|jr| {
// Append emitted events
@ -97,7 +96,7 @@ impl<S: BELStorage + Debug, B: BuildState + Default> Orchestrator<S, B> {
fn poll_wants(&mut self) -> Result<(), Box<dyn Error>> {
// Collect unhandled wants, group by job that handles each partition,
let grouped_wants =
Orchestrator::<S, B>::group_wants(&self.config, &self.bel.schedulable_wants());
Orchestrator::<S>::group_wants(&self.config, &self.bel.schedulable_wants());
if !grouped_wants.want_groups.is_empty() {
// All wants must be mapped to jobs that can be handled
@ -146,7 +145,7 @@ impl<S: BELStorage + Debug, B: BuildState + Default> Orchestrator<S, B> {
/** Entrypoint for running jobs */
pub fn join(mut self) -> Result<(), Box<dyn Error>> {
loop {
self.poll_jobs()?;
self.poll_job_runs()?;
self.poll_wants()?;
}
}
@ -154,48 +153,106 @@ impl<S: BELStorage + Debug, B: BuildState + Default> Orchestrator<S, B> {
#[cfg(test)]
mod tests {
mod poll_jobs {
// The orchestrator needs to be able to actually execute job runs
mod run_jobs {
// Use case: the orchestrator should be able to execute a spawned-process job
#[test]
#[ignore] // TODO define this interface
fn test_spawned_process_job() {
todo!()
}
}
// The orchestrator relies on polling job run status to react to job completions that imply
// key outcomes like:
// - Success: partitions produced, other job runs may be schedulable
// - Dep miss: wants need to be created
// - Failure: engineer likely needs to react
mod poll_job_runs {
// Use case: we find a job that has completed, BEL should be written with appropriate event
// (both for success and fail cases)
#[test]
#[ignore]
fn test_job_completion_events() {
// TODO
todo!()
}
//Use case: a job has written new stdout, it should produce a new heartbeat event in the BEL
// TODO - we should come back here later and ensure we have a minimum heartbeat period
#[test]
#[ignore]
fn test_heartbeat_from_stdout() {
// TODO
todo!()
}
}
// The orchestrator polls wants so that it can react to new wants created by users, or to wants
// created by itself (for dep miss job run failures)
mod poll_wants {
// Use case: Empty schedulable wants is a valid case, and should create no new jobs.
#[test]
#[ignore]
fn test_empty_wants_noop() {
// TODO
todo!()
}
// Use case: Some schedulable wants with jobs that can be matched should launch those jobs
// (but in this case using a noop/mock child process)
#[test]
#[ignore]
fn test_schedulable_wants_should_schedule() {
// TODO
todo!()
}
// Use case: A schedulable want that can't be matched to a job should return an error
#[test]
#[ignore]
fn test_schedulable_want_no_matching_job() {
// TODO
todo!()
}
}
mod want_group {
// Orchestrator want creation is the means of data dependency propagation, allowing the
// orchestrator to create partitions needed by jobs that produce the existing wanted partitions.
mod want_create {
// Use case: The orchestrator should map a failed job into a set of wants
#[test]
#[ignore]
fn test_job_fail_want_mapping() {
todo!()
}
}
// Orchestrator needs to be able to achieve high level orchestration use cases.
mod orchestration {
// Use case: should run a job to produce a partition in reaction to a want, then have the
// want fulfilled.
#[test]
#[ignore]
fn test_want_builds_partition() {
todo!()
}
// Use case: a graph with multi-hop deps should achieve the multi-hop build
// - Job B depends on part_a produced by job A
// - Job B should be attempted, fail, and create a want for part_a
// - Job A should be attempted, succeed, and produce part_a
// - Job B should be attempted, succeed, and produce part_b
#[test]
#[ignore]
fn test_multi_hop_want_builds_partition() {
todo!()
}
}
// The orchestrator groups wants to enable efficient execution. Many individual wants may
// reference the same partitions, or many different partitions may be referenced by many
// different wants. The orchestrator needs to be able to achieve job run batching, where a
// single job run builds multiple partitions from multiple different wants.
mod want_grouping {
use super::super::*;
use crate::build_event_log::MemoryBELStorage;
use crate::{PartitionRef, WantDetail};
use crate::build_state::SqliteBuildState;
fn create_job_config(label: &str, pattern: &str) -> JobConfiguration {
JobConfiguration {
@ -229,7 +286,7 @@ mod tests {
let config = OrchestratorConfig { jobs: vec![] };
let wants = vec![];
let result = Orchestrator::<MemoryBELStorage, SqliteBuildState>::group_wants(&config, &wants);
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
assert!(result.want_groups.is_empty());
assert!(result.unhandled_wants.is_empty());
@ -244,7 +301,7 @@ mod tests {
let want = create_want_detail("want1", vec!["partition1"]);
let wants = vec![want.clone()];
let result = Orchestrator::<MemoryBELStorage, SqliteBuildState>::group_wants(&config, &wants);
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
assert!(result.unhandled_wants.is_empty());
assert_eq!(result.want_groups.len(), 1);
@ -262,7 +319,7 @@ mod tests {
let want = create_want_detail("want1", vec!["different_partition"]);
let wants = vec![want.clone()];
let result = Orchestrator::<MemoryBELStorage, SqliteBuildState>::group_wants(&config, &wants);
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
assert_eq!(result.unhandled_wants.len(), 1);
assert_eq!(result.unhandled_wants[0].want_id, "want1");
@ -282,7 +339,7 @@ mod tests {
let want3 = create_want_detail("want3", vec!["pattern2_partition"]);
let wants = vec![want1, want2, want3];
let result = Orchestrator::<MemoryBELStorage, SqliteBuildState>::group_wants(&config, &wants);
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
assert!(result.unhandled_wants.is_empty());
assert_eq!(result.want_groups.len(), 2);

View file

@ -0,0 +1,13 @@
The core question: what defines partition identity?
- Hive partitioning is great, but making every source of variation or dimension of config a partition col would be ridiculous for real applications
- Why would it be ridiculous? Examples:
- We use many config dims: targeting goal, modeling strategy, per-modeling strategy config (imagine flattening a struct tree), etc. Then we add another dimension to the config for an experiment. Is that a "new dataset"?
- "all config goes through partitions" means that every config change has to be a dataset or partition pattern change.
- Maybe this is fine, actually, as long as authors make sensible defaults a non-problem? (e.g. think of schema evolution)
- In cases where config is internally resolved by the job,
- Whose to say the partition isn't a struct itself? `s3://my/dataset/date=2025-01-01` could also be `{"dataset": "s3://my/dataset", "date": "2025-01-01"}`, or even `{"kind": "icecream", "meta": {"toppings": ["sprinkles"]}}` (though a clear requirement is that databuild does not parse partition refs)
- Should we consider the bazel config approach, where identity is the whole set of config, manifest as a hash? (not human-readable)
-

View file

@ -10,3 +10,9 @@
- How do you deal with DAG run identity under changing DAG definition?
- These questions are all red herrings. We don't care about the DAG definition - we care about the data we want to produce.
- We should instead declare what partitions we want, and iteratively propagate
- Inter-job invariants suck (simplify)
- What about sense plan act? Rebuttal is "sense produces data"? How would launchpad under this work in a way that didn't suck?
- Is there a hot take to make about config? "customer X is targeting Y" is a reality of modern apps, bazel-esque config is
- Should this be under `#partition-identity` or something?

View file

@ -1,2 +1,15 @@
Or "why pull"?
- Initially, when writing a single DAG or pipeline, "pull" and "push" concepts don't make sense. You just run the DAG that produces the data periodically via some automated trigger.
- As separate jobs get added to the workload, in many cases you can just add them to the same DAG. Without complex dependency relationships, you can keep composing different workloads this way.
- This simplicity is great, making it easy to understand system state even in the case of failures, and to develop marginally and retry granularly with tools like Airflow, Dagster, and Prefect.
- "push" semantics arise from building upon the simple solution here: data build decisions push across data dep relationships, going from "input" data to "output" data. Said another way, data gets built because it can be built and its inputs are available.
- This breaks down for partitioned datasets, where dependencies are more complicated that 1:1 - e.g. if you need to aggregate revenue from events coming from different platforms and customers
- This happens when other people deliver batched data to you, particularly when you do business on platforms that handle multiple customers' business for you
- Under push, even with extensive logging, propagation of fixes and new features takes backfilling downstream of the version-bumped or fixed dataset.
- Under push, that orchestration code is distributed across teams and DAGs. It's tribal knowledge. Under pull, the dependency relationships are explicit and queryable.
- The systemd analogy:
- SysV init and init.d scripts (1980s-2000s): Services were started by numbered bash scripts (/etc/init.d/S10network, /etc/rc3.d/S20postgres) that explicitly called other scripts in sequence. Each script contained imperative startup logic and had to manually handle dependencies - if your app needed postgres, your init script had to know to start postgres first, wait for it, then start your app. Changing the dependency graph meant editing scripts across multiple services. Debugging startup failures required tracing through bash scripts to figure out which service failed to start its dependencies. The numbering system (S10, S20, S30) was a crude way to enforce ordering, but became unmaintainable as systems grew complex.
- The key problem: Orchestration logic was distributed across individual service scripts (push-based), requiring global knowledge to modify, rather than being centralized with declarative per-service dependencies (pull-based).
- In pull, data is built because something wants it to exist. No complex logic is needed to reason about what needs to run before a given workload can run. The system does it all for you based on the graph structure specified by your application/codebase.
- Cool thing you can do: estimate the cost of fixing an issue or backfilling data (because of the downstream work that needs to be done)