This commit is contained in:
parent
c07cf7cd81
commit
ea83610d35
29 changed files with 539 additions and 6674 deletions
16
AGENTS.md
16
AGENTS.md
|
|
@ -5,14 +5,14 @@ DataBuild is a bazel-based data build system. Key files:
|
||||||
- [`DESIGN.md`](./DESIGN.md) - Overall design of databuild
|
- [`DESIGN.md`](./DESIGN.md) - Overall design of databuild
|
||||||
- [`databuild.proto`](databuild/databuild.proto) - System interfaces
|
- [`databuild.proto`](databuild/databuild.proto) - System interfaces
|
||||||
- Component designs - design docs for specific aspects or components of databuild:
|
- Component designs - design docs for specific aspects or components of databuild:
|
||||||
- [Core build](./design/core-build.md) - How the core semantics of databuild works and are implemented
|
- [Core build](docs/design/core-build.md) - How the core semantics of databuild works and are implemented
|
||||||
- [Build event log](./design/build-event-log.md) - How the build event log works and is accessed
|
- [Build event log](docs/design/build-event-log.md) - How the build event log works and is accessed
|
||||||
- [Service](./design/service.md) - How the databuild HTTP service and web app are designed.
|
- [Service](docs/design/service.md) - How the databuild HTTP service and web app are designed.
|
||||||
- [Glossary](./design/glossary.md) - Centralized description of key terms.
|
- [Glossary](docs/design/glossary.md) - Centralized description of key terms.
|
||||||
- [Graph specification](./design/graph-specification.md) - Describes the different libraries that enable more succinct declaration of databuild applications than the core bazel-based interface.
|
- [Graph specification](docs/design/graph-specification.md) - Describes the different libraries that enable more succinct declaration of databuild applications than the core bazel-based interface.
|
||||||
- [Deploy strategies](./design/deploy-strategies.md) - Different strategies for deploying databuild applications.
|
- [Deploy strategies](docs/design/deploy-strategies.md) - Different strategies for deploying databuild applications.
|
||||||
- [Wants](./design/wants.md) - How triggering works in databuild applications.
|
- [Wants](docs/design/wants.md) - How triggering works in databuild applications.
|
||||||
- [Why databuild?](./design/why-databuild.md) - Why to choose databuild instead of other better established orchestration solutions.
|
- [Why databuild?](docs/design/why-databuild.md) - Why to choose databuild instead of other better established orchestration solutions.
|
||||||
|
|
||||||
Please reference these for any related work, as they indicate key technical bias/direction of the project.
|
Please reference these for any related work, as they indicate key technical bias/direction of the project.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ DataBuild is a trivially-deployable, partition-oriented, declarative build syste
|
||||||
|
|
||||||
## Philosophy
|
## Philosophy
|
||||||
|
|
||||||
Inspired by [these requirements.](./design/requirements.md)
|
Inspired by [these requirements.](docs/design/requirements.md)
|
||||||
|
|
||||||
Many large-scale systems for producing data leave the complexity of true orchestration to the user - even DAG-based systems for implementing dependencies leave the system as a collection of DAGs, requiring engineers to solve the same "why doesn't this data exist?" and "how do I build this data?"
|
Many large-scale systems for producing data leave the complexity of true orchestration to the user - even DAG-based systems for implementing dependencies leave the system as a collection of DAGs, requiring engineers to solve the same "why doesn't this data exist?" and "how do I build this data?"
|
||||||
|
|
||||||
|
|
@ -22,7 +22,7 @@ Graphs and jobs are defined in [bazel](https://bazel.build), allowing graphs (an
|
||||||
- **Wants** - Partition wants can be registered with DataBuild, enabling continuous data reconciliation and build of wanted partitions as soon as their graph-external dependencies are met.
|
- **Wants** - Partition wants can be registered with DataBuild, enabling continuous data reconciliation and build of wanted partitions as soon as their graph-external dependencies are met.
|
||||||
- **Taints** - Taints mark a partition as invalid, indicating that readers should not use it, and that it should be rebuilt when requested or depended upon. If there is a still-active want for the tainted partition, it will be rebuilt immediately.
|
- **Taints** - Taints mark a partition as invalid, indicating that readers should not use it, and that it should be rebuilt when requested or depended upon. If there is a still-active want for the tainted partition, it will be rebuilt immediately.
|
||||||
- **Bazel Targets** - Bazel is a fast, extensible, and hermetic build system. DataBuild uses bazel targets to describe graphs and jobs, making graphs themselves deployable application. Implementing a DataBuild app is the process of integrating your data build jobs in `databuild_job` bazel targets, and connecting them with a `databuild_graph` target.
|
- **Bazel Targets** - Bazel is a fast, extensible, and hermetic build system. DataBuild uses bazel targets to describe graphs and jobs, making graphs themselves deployable application. Implementing a DataBuild app is the process of integrating your data build jobs in `databuild_job` bazel targets, and connecting them with a `databuild_graph` target.
|
||||||
- [**Graph Definition Languages**](design/graph-specification.md) Application libraries in Python/Rust/Scala that use language features to enable ergonomic and succinct specification of jobs and graphs.
|
- [**Graph Definition Languages**](docs/design/graph-specification.md) Application libraries in Python/Rust/Scala that use language features to enable ergonomic and succinct specification of jobs and graphs.
|
||||||
|
|
||||||
## Bazel Components
|
## Bazel Components
|
||||||
|
|
||||||
|
|
@ -39,14 +39,14 @@ The `databuild_graph` rule expects two fields, `jobs`, and `lookup`:
|
||||||
- The `lookup` binary target should return a JSON object with keys as job labels and values as the list of partitions that each job is responsible for producing. This enables graph planning by walking backwards in the data dependency graph.
|
- The `lookup` binary target should return a JSON object with keys as job labels and values as the list of partitions that each job is responsible for producing. This enables graph planning by walking backwards in the data dependency graph.
|
||||||
- The `jobs` list should just be a list of all jobs involved in the graph. The graph will recursively call config to resolve the full set of jobs to run.
|
- The `jobs` list should just be a list of all jobs involved in the graph. The graph will recursively call config to resolve the full set of jobs to run.
|
||||||
|
|
||||||
### [Build Event Log (BEL)](./design/build-event-log.md)
|
### [Build Event Log (BEL)](docs/design/build-event-log.md)
|
||||||
|
|
||||||
The BEL encodes all relevant build actions that occur, enabling distributed/concurrent builds. This includes submitted wants, job events (started, succeeded, partitions missing, etc)
|
The BEL encodes all relevant build actions that occur, enabling distributed/concurrent builds. This includes submitted wants, job events (started, succeeded, partitions missing, etc)
|
||||||
|
|
||||||
The BEL is similar to [event-sourced](https://martinfowler.com/eaaDev/EventSourcing.html) systems, as all application state is rendered from aggregations over the BEL. This enables the BEL to stay simple while also powering concurrent builds, the data catalog, and the DataBuild service.
|
The BEL is similar to [event-sourced](https://martinfowler.com/eaaDev/EventSourcing.html) systems, as all application state is rendered from aggregations over the BEL. This enables the BEL to stay simple while also powering concurrent builds, the data catalog, and the DataBuild service.
|
||||||
|
|
||||||
### Wants and Taints
|
### Wants and Taints
|
||||||
["Wants"](./design/wants.md) are the main mechanism for eventually built partitions. In real world scenarios, it is standard for data to arrive late, or not at all. Wants cause the databuild graph to continually attempt to build the wanted partitions while they aren't live, and enabling it to list wants who are past SLA.
|
["Wants"](docs/design/wants.md) are the main mechanism for eventually built partitions. In real world scenarios, it is standard for data to arrive late, or not at all. Wants cause the databuild graph to continually attempt to build the wanted partitions while they aren't live, and enabling it to list wants who are past SLA.
|
||||||
|
|
||||||
Taints allow for manual/programmatic invalidation of built partitions. Partitions tainted since their last build are considered as non-existent, and will be rebuilt if any other wanted partition depends on them. This also opens the door for invalidating downstream partitions as well.
|
Taints allow for manual/programmatic invalidation of built partitions. Partitions tainted since their last build are considered as non-existent, and will be rebuilt if any other wanted partition depends on them. This also opens the door for invalidating downstream partitions as well.
|
||||||
|
|
||||||
|
|
|
||||||
93
MODULE.bazel
93
MODULE.bazel
|
|
@ -22,34 +22,6 @@ crate.spec(
|
||||||
package = "serde_json",
|
package = "serde_json",
|
||||||
version = "1.0",
|
version = "1.0",
|
||||||
)
|
)
|
||||||
crate.spec(
|
|
||||||
package = "log",
|
|
||||||
version = "0.4",
|
|
||||||
)
|
|
||||||
crate.spec(
|
|
||||||
features = ["stderr"],
|
|
||||||
package = "simple_logger",
|
|
||||||
version = "4.3",
|
|
||||||
)
|
|
||||||
crate.spec(
|
|
||||||
package = "crossbeam-channel",
|
|
||||||
version = "0.5",
|
|
||||||
)
|
|
||||||
crate.spec(
|
|
||||||
package = "num_cpus",
|
|
||||||
version = "1.16",
|
|
||||||
)
|
|
||||||
crate.spec(
|
|
||||||
default_features = False,
|
|
||||||
features = [
|
|
||||||
"macros",
|
|
||||||
"net",
|
|
||||||
"rt-multi-thread",
|
|
||||||
"sync",
|
|
||||||
],
|
|
||||||
package = "tokio",
|
|
||||||
version = "1.38",
|
|
||||||
)
|
|
||||||
crate.spec(
|
crate.spec(
|
||||||
package = "prost",
|
package = "prost",
|
||||||
version = "0.13",
|
version = "0.13",
|
||||||
|
|
@ -66,10 +38,6 @@ crate.spec(
|
||||||
package = "tempfile",
|
package = "tempfile",
|
||||||
version = "3.0",
|
version = "3.0",
|
||||||
)
|
)
|
||||||
crate.spec(
|
|
||||||
package = "async-trait",
|
|
||||||
version = "0.1",
|
|
||||||
)
|
|
||||||
crate.spec(
|
crate.spec(
|
||||||
features = ["v4"],
|
features = ["v4"],
|
||||||
package = "uuid",
|
package = "uuid",
|
||||||
|
|
@ -80,74 +48,13 @@ crate.spec(
|
||||||
package = "rusqlite",
|
package = "rusqlite",
|
||||||
version = "0.31",
|
version = "0.31",
|
||||||
)
|
)
|
||||||
crate.spec(
|
|
||||||
features = ["derive"],
|
|
||||||
package = "clap",
|
|
||||||
version = "4.0",
|
|
||||||
)
|
|
||||||
crate.spec(
|
|
||||||
features = ["json"],
|
|
||||||
package = "axum",
|
|
||||||
version = "0.7.2",
|
|
||||||
)
|
|
||||||
crate.spec(
|
|
||||||
package = "tower",
|
|
||||||
version = "0.4",
|
|
||||||
)
|
|
||||||
crate.spec(
|
|
||||||
features = ["cors"],
|
|
||||||
package = "tower-http",
|
|
||||||
version = "0.5",
|
|
||||||
)
|
|
||||||
crate.spec(
|
|
||||||
features = ["full"],
|
|
||||||
package = "hyper",
|
|
||||||
version = "1.0",
|
|
||||||
)
|
|
||||||
crate.spec(
|
|
||||||
features = ["axum"],
|
|
||||||
package = "aide",
|
|
||||||
version = "0.13.0",
|
|
||||||
)
|
|
||||||
crate.spec(
|
crate.spec(
|
||||||
features = [
|
features = [
|
||||||
"uuid1",
|
|
||||||
"derive",
|
"derive",
|
||||||
],
|
],
|
||||||
package = "schemars",
|
package = "schemars",
|
||||||
version = "0.8.16",
|
version = "0.8.16",
|
||||||
)
|
)
|
||||||
crate.spec(
|
|
||||||
features = ["aide"],
|
|
||||||
package = "axum-jsonschema",
|
|
||||||
version = "0.8.0",
|
|
||||||
)
|
|
||||||
crate.spec(
|
|
||||||
package = "thiserror",
|
|
||||||
version = "1.0",
|
|
||||||
)
|
|
||||||
crate.spec(
|
|
||||||
features = ["debug-embed"],
|
|
||||||
package = "rust-embed",
|
|
||||||
version = "8.0",
|
|
||||||
)
|
|
||||||
crate.spec(
|
|
||||||
package = "sysinfo",
|
|
||||||
version = "0.30",
|
|
||||||
)
|
|
||||||
crate.spec(
|
|
||||||
features = ["datafusion"],
|
|
||||||
package = "deltalake",
|
|
||||||
version = "0.27",
|
|
||||||
)
|
|
||||||
crate.spec(
|
|
||||||
package = "parquet",
|
|
||||||
version = "55.2",
|
|
||||||
)
|
|
||||||
crate.spec(
|
|
||||||
package = "chrono",
|
|
||||||
version = "0.4",
|
|
||||||
)
|
|
||||||
crate.spec(
|
crate.spec(
|
||||||
package = "regex",
|
package = "regex",
|
||||||
version = "1.10",
|
version = "1.10",
|
||||||
|
|
|
||||||
6468
MODULE.bazel.lock
6468
MODULE.bazel.lock
File diff suppressed because one or more lines are too long
|
|
@ -27,7 +27,7 @@ DataBuild is a trivially-deployable, partition-oriented, declarative data build
|
||||||
|
|
||||||
DataBuild is for teams at data-driven orgs who need reliable, flexible, and correct data pipelines and are tired of manually orchestrating complex dependency graphs. You define Jobs (that take input data partitions and produce output partitions), compose them into Graphs (partition dependency networks), and DataBuild handles the rest. Just ask it to build a partition, and databuild handles resolving the jobs that need to run, planning execution order, running builds concurrently, and tracking and exposing build progress. Instead of writing orchestration code that breaks when dependencies change, you focus on the data transformations while DataBuild ensures your pipelines are correct, observable, and reliable.
|
DataBuild is for teams at data-driven orgs who need reliable, flexible, and correct data pipelines and are tired of manually orchestrating complex dependency graphs. You define Jobs (that take input data partitions and produce output partitions), compose them into Graphs (partition dependency networks), and DataBuild handles the rest. Just ask it to build a partition, and databuild handles resolving the jobs that need to run, planning execution order, running builds concurrently, and tracking and exposing build progress. Instead of writing orchestration code that breaks when dependencies change, you focus on the data transformations while DataBuild ensures your pipelines are correct, observable, and reliable.
|
||||||
|
|
||||||
For important context, check out [DESIGN.md](./DESIGN.md), along with designs in [design/](./design/). Also, check out [`databuild.proto`](./databuild/databuild.proto) for key system interfaces. Key features:
|
For important context, check out [DESIGN.md](./DESIGN.md), along with designs in [design/](docs/design/). Also, check out [`databuild.proto`](./databuild/databuild.proto) for key system interfaces. Key features:
|
||||||
|
|
||||||
- **Declarative dependencies** - Ask for data, get data. Define partition dependencies and DataBuild automatically plans what jobs to run and when.
|
- **Declarative dependencies** - Ask for data, get data. Define partition dependencies and DataBuild automatically plans what jobs to run and when.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -19,25 +19,12 @@ rust_binary(
|
||||||
# DataBuild library using generated prost code
|
# DataBuild library using generated prost code
|
||||||
rust_library(
|
rust_library(
|
||||||
name = "databuild",
|
name = "databuild",
|
||||||
srcs = [
|
srcs = glob(["**/*.rs"]) + [
|
||||||
"build_event_log.rs",
|
|
||||||
"job.rs",
|
|
||||||
"job_run.rs",
|
|
||||||
"lib.rs",
|
|
||||||
"orchestrator.rs",
|
|
||||||
":generate_databuild_rust",
|
":generate_databuild_rust",
|
||||||
],
|
],
|
||||||
edition = "2021",
|
edition = "2021",
|
||||||
proc_macro_deps = [
|
|
||||||
"@crates//:async-trait",
|
|
||||||
],
|
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
deps = [
|
deps = [
|
||||||
"@crates//:aide",
|
|
||||||
"@crates//:axum",
|
|
||||||
"@crates//:axum-jsonschema",
|
|
||||||
"@crates//:chrono",
|
|
||||||
"@crates//:log",
|
|
||||||
"@crates//:prost",
|
"@crates//:prost",
|
||||||
"@crates//:prost-types",
|
"@crates//:prost-types",
|
||||||
"@crates//:regex",
|
"@crates//:regex",
|
||||||
|
|
@ -45,8 +32,6 @@ rust_library(
|
||||||
"@crates//:schemars",
|
"@crates//:schemars",
|
||||||
"@crates//:serde",
|
"@crates//:serde",
|
||||||
"@crates//:serde_json",
|
"@crates//:serde_json",
|
||||||
"@crates//:thiserror",
|
|
||||||
"@crates//:tokio",
|
|
||||||
"@crates//:uuid",
|
"@crates//:uuid",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
@ -56,32 +41,6 @@ rust_test(
|
||||||
crate = ":databuild",
|
crate = ":databuild",
|
||||||
)
|
)
|
||||||
|
|
||||||
# Build Graph Service binary
|
|
||||||
rust_binary(
|
|
||||||
name = "build_graph_service",
|
|
||||||
srcs = ["service/main.rs"],
|
|
||||||
data = ["//databuild/dashboard:dist"],
|
|
||||||
edition = "2021",
|
|
||||||
visibility = ["//visibility:public"],
|
|
||||||
deps = [
|
|
||||||
":databuild",
|
|
||||||
"@crates//:aide",
|
|
||||||
"@crates//:axum",
|
|
||||||
"@crates//:axum-jsonschema",
|
|
||||||
"@crates//:clap",
|
|
||||||
"@crates//:hyper",
|
|
||||||
"@crates//:log",
|
|
||||||
"@crates//:schemars",
|
|
||||||
"@crates//:serde",
|
|
||||||
"@crates//:serde_json",
|
|
||||||
"@crates//:simple_logger",
|
|
||||||
"@crates//:tokio",
|
|
||||||
"@crates//:tower",
|
|
||||||
"@crates//:tower-http",
|
|
||||||
"@crates//:uuid",
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
# Legacy filegroup for backwards compatibility
|
# Legacy filegroup for backwards compatibility
|
||||||
filegroup(
|
filegroup(
|
||||||
name = "proto",
|
name = "proto",
|
||||||
|
|
|
||||||
|
|
@ -1,14 +1,15 @@
|
||||||
use crate::data_build_event::Event;
|
use crate::data_build_event::Event;
|
||||||
use crate::{BuildState, DataBuildEvent, WantDetail};
|
use crate::{DataBuildEvent, WantDetail};
|
||||||
use prost::Message;
|
use prost::Message;
|
||||||
use rusqlite::Connection;
|
use rusqlite::Connection;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::sync::{Arc, RwLock};
|
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
use crate::build_state::BuildState;
|
||||||
|
use crate::util::current_timestamp;
|
||||||
|
|
||||||
pub trait BELStorage {
|
pub trait BELStorage {
|
||||||
fn append_event(&mut self, event: Event) -> Result<u64, Box<dyn Error>>;
|
fn append_event(&mut self, event: &Event) -> Result<u64, Box<dyn Error>>;
|
||||||
fn list_events(
|
fn list_events(
|
||||||
&self,
|
&self,
|
||||||
since_idx: u64,
|
since_idx: u64,
|
||||||
|
|
@ -18,7 +19,7 @@ pub trait BELStorage {
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct MemoryBELStorage {
|
pub struct MemoryBELStorage {
|
||||||
events: Vec<DataBuildEvent>,
|
pub events: Vec<DataBuildEvent>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for MemoryBELStorage {
|
impl Default for MemoryBELStorage {
|
||||||
|
|
@ -34,16 +35,12 @@ impl MemoryBELStorage {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BELStorage for MemoryBELStorage {
|
impl BELStorage for MemoryBELStorage {
|
||||||
fn append_event(&mut self, event: Event) -> Result<u64, Box<dyn Error>> {
|
fn append_event(&mut self, event: &Event) -> Result<u64, Box<dyn Error>> {
|
||||||
let now = SystemTime::now();
|
let timestamp = current_timestamp();
|
||||||
let duration_since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
|
|
||||||
|
|
||||||
let timestamp = duration_since_epoch.as_nanos() as u64;
|
|
||||||
|
|
||||||
let dbe = DataBuildEvent {
|
let dbe = DataBuildEvent {
|
||||||
timestamp,
|
timestamp,
|
||||||
event_id: self.events.len() as u64,
|
event_id: self.events.len() as u64,
|
||||||
event: Some(event),
|
event: Some(event.clone()),
|
||||||
};
|
};
|
||||||
self.events.push(dbe);
|
self.events.push(dbe);
|
||||||
Ok(self.events.len() as u64)
|
Ok(self.events.len() as u64)
|
||||||
|
|
@ -88,7 +85,7 @@ impl SqliteBELStorage {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BELStorage for SqliteBELStorage {
|
impl BELStorage for SqliteBELStorage {
|
||||||
fn append_event(&mut self, event: Event) -> Result<u64, Box<dyn Error>> {
|
fn append_event(&mut self, event: &Event) -> Result<u64, Box<dyn Error>> {
|
||||||
let now = SystemTime::now();
|
let now = SystemTime::now();
|
||||||
let duration_since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
|
let duration_since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
|
||||||
let timestamp = duration_since_epoch.as_nanos() as u64;
|
let timestamp = duration_since_epoch.as_nanos() as u64;
|
||||||
|
|
@ -97,7 +94,7 @@ impl BELStorage for SqliteBELStorage {
|
||||||
let dbe = DataBuildEvent {
|
let dbe = DataBuildEvent {
|
||||||
timestamp,
|
timestamp,
|
||||||
event_id: 0, // Will be set by the database
|
event_id: 0, // Will be set by the database
|
||||||
event: Some(event),
|
event: Some(event.clone()),
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut buf = Vec::new();
|
let mut buf = Vec::new();
|
||||||
|
|
@ -130,7 +127,7 @@ impl BELStorage for SqliteBELStorage {
|
||||||
let event_data: Vec<u8> = row.get(2)?;
|
let event_data: Vec<u8> = row.get(2)?;
|
||||||
|
|
||||||
// Deserialize the event using prost
|
// Deserialize the event using prost
|
||||||
let mut dbe = DataBuildEvent::decode(event_data.as_slice()).map_err(|e| {
|
let mut dbe = DataBuildEvent::decode(event_data.as_slice()).map_err(|_e| {
|
||||||
rusqlite::Error::InvalidColumnType(
|
rusqlite::Error::InvalidColumnType(
|
||||||
0,
|
0,
|
||||||
"event_data".to_string(),
|
"event_data".to_string(),
|
||||||
|
|
@ -157,115 +154,45 @@ impl BELStorage for SqliteBELStorage {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct BuildEventLog<B: BELStorage + Debug> {
|
pub struct BuildEventLog<S: BELStorage + Debug, B: BuildState + Default> {
|
||||||
pub storage: B,
|
pub storage: S,
|
||||||
pub state: Arc<RwLock<BuildState>>,
|
pub state: B,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B: BELStorage + Debug> BuildEventLog<B> {
|
impl<S: BELStorage + Debug, B: BuildState + Default> BuildEventLog<S, B> {
|
||||||
pub fn new(storage: B) -> BuildEventLog<B> {
|
pub fn new(storage: S, state: B) -> BuildEventLog<S, B> {
|
||||||
BuildEventLog {
|
BuildEventLog { storage, state }
|
||||||
storage,
|
|
||||||
state: Arc::new(Default::default()),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn append_event(&mut self, event: Event) -> Result<u64, Box<dyn Error>> {
|
pub fn append_event(&mut self, event: &Event) -> Result<u64, Box<dyn Error>> {
|
||||||
self.reduce(event.clone())?;
|
self.state.handle_event(&event)?;
|
||||||
let idx = self.storage.append_event(event)?;
|
let idx = self.storage.append_event(event)?;
|
||||||
Ok(idx)
|
Ok(idx)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn reduce(&mut self, event: Event) -> Result<(), Box<dyn Error>> {
|
|
||||||
let mut state = self.state.write().expect("lock poisoned");
|
|
||||||
match event {
|
|
||||||
Event::JobRunBufferV1(e) => {}
|
|
||||||
Event::JobRunQueueV1(e) => {}
|
|
||||||
Event::JobRunHeartbeatV1(e) => {}
|
|
||||||
Event::JobRunSuccessV1(e) => {}
|
|
||||||
Event::JobRunFailureV1(e) => {}
|
|
||||||
Event::JobRunCancelV1(e) => {}
|
|
||||||
Event::JobRunMissingDepsV1(e) => {}
|
|
||||||
Event::WantCreateV1(e) => {
|
|
||||||
state.wants.insert(
|
|
||||||
e.want_id.clone(),
|
|
||||||
WantDetail {
|
|
||||||
want_id: e.want_id,
|
|
||||||
refs: e.partitions,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Event::WantCancelV1(e) => {}
|
|
||||||
Event::TaintCreateV1(e) => {}
|
|
||||||
Event::TaintDeleteV1(e) => {}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn schedulable_wants(&self) -> Vec<WantDetail> {
|
pub fn schedulable_wants(&self) -> Vec<WantDetail> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::build_event_log::{BELStorage, BuildEventLog, MemoryBELStorage, SqliteBELStorage};
|
use uuid::Uuid;
|
||||||
|
use crate::build_event_log::{BELStorage, BuildEventLog, SqliteBELStorage};
|
||||||
use crate::data_build_event::Event;
|
use crate::data_build_event::Event;
|
||||||
use crate::{PartitionRef, WantCreateEventV1};
|
use crate::{PartitionRef, WantCreateEventV1};
|
||||||
|
use crate::build_state::{BuildState, SqliteBuildState};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_hello() {
|
fn test_hello() {
|
||||||
assert_eq!(2 + 3, 5);
|
assert_eq!(2 + 3, 5);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_append_event() {
|
|
||||||
let storage = MemoryBELStorage::new();
|
|
||||||
let mut log = BuildEventLog::new(storage);
|
|
||||||
// Initial state
|
|
||||||
assert_eq!(log.storage.events.len(), 0);
|
|
||||||
let want_id = "1234".to_string();
|
|
||||||
{
|
|
||||||
let state = log.state.read().unwrap();
|
|
||||||
assert!(state.wants.get(&want_id).is_none());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Given
|
|
||||||
log.append_event(Event::WantCreateV1(WantCreateEventV1 {
|
|
||||||
want_id: want_id.clone(),
|
|
||||||
root_want_id: "123".to_string(),
|
|
||||||
parent_want_id: "123".to_string(),
|
|
||||||
partitions: vec![PartitionRef {
|
|
||||||
r#ref: "1234".to_string(),
|
|
||||||
}],
|
|
||||||
data_timestamp: 0,
|
|
||||||
ttl_seconds: 1,
|
|
||||||
sla_seconds: 1,
|
|
||||||
source: None,
|
|
||||||
comment: None,
|
|
||||||
}))
|
|
||||||
.expect("append_event failed");
|
|
||||||
|
|
||||||
// Assert
|
|
||||||
assert_eq!(log.storage.events.len(), 1);
|
|
||||||
let state = log.state.read().expect("couldn't take read lock");
|
|
||||||
assert!(state.wants.get(&want_id).is_some(), "want_id not found");
|
|
||||||
assert_eq!(
|
|
||||||
state
|
|
||||||
.wants
|
|
||||||
.get(&want_id)
|
|
||||||
.map(|want| want.want_id.clone())
|
|
||||||
.expect("state.wants want_id not found"),
|
|
||||||
want_id,
|
|
||||||
"want_id not equal",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_sqlite_append_event() {
|
fn test_sqlite_append_event() {
|
||||||
let storage =
|
let storage =
|
||||||
SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage");
|
SqliteBELStorage::create(":memory:").expect("Failed to create SQLite storage");
|
||||||
let mut log = BuildEventLog::new(storage);
|
let state = SqliteBuildState::default();
|
||||||
|
let mut log = BuildEventLog { storage, state };
|
||||||
|
|
||||||
let want_id = "sqlite_test_1234".to_string();
|
let want_id = "sqlite_test_1234".to_string();
|
||||||
|
|
||||||
|
|
@ -277,26 +204,14 @@ mod tests {
|
||||||
assert_eq!(events.len(), 0);
|
assert_eq!(events.len(), 0);
|
||||||
|
|
||||||
// Verify want doesn't exist in state
|
// Verify want doesn't exist in state
|
||||||
{
|
assert!(log.state.get_want(&want_id).expect("query failed").is_none());
|
||||||
let state = log.state.read().unwrap();
|
|
||||||
assert!(state.wants.get(&want_id).is_none());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Append an event
|
// Append an event
|
||||||
|
let mut e = WantCreateEventV1::default();
|
||||||
|
e.want_id = want_id.clone();
|
||||||
|
e.partitions = vec!(PartitionRef { r#ref: "sqlite_partition_1234".to_string() });
|
||||||
let event_id = log
|
let event_id = log
|
||||||
.append_event(Event::WantCreateV1(WantCreateEventV1 {
|
.append_event(&Event::WantCreateV1(e))
|
||||||
want_id: want_id.clone(),
|
|
||||||
root_want_id: "sqlite_root_123".to_string(),
|
|
||||||
parent_want_id: "sqlite_parent_123".to_string(),
|
|
||||||
partitions: vec![PartitionRef {
|
|
||||||
r#ref: "sqlite_partition_1234".to_string(),
|
|
||||||
}],
|
|
||||||
data_timestamp: 0,
|
|
||||||
ttl_seconds: 1,
|
|
||||||
sla_seconds: 1,
|
|
||||||
source: None,
|
|
||||||
comment: None,
|
|
||||||
}))
|
|
||||||
.expect("append_event failed");
|
.expect("append_event failed");
|
||||||
|
|
||||||
// Verify event was stored
|
// Verify event was stored
|
||||||
|
|
@ -316,8 +231,6 @@ mod tests {
|
||||||
// Verify the event content
|
// Verify the event content
|
||||||
if let Some(Event::WantCreateV1(want_event)) = &stored_event.event {
|
if let Some(Event::WantCreateV1(want_event)) = &stored_event.event {
|
||||||
assert_eq!(want_event.want_id, want_id);
|
assert_eq!(want_event.want_id, want_id);
|
||||||
assert_eq!(want_event.root_want_id, "sqlite_root_123");
|
|
||||||
assert_eq!(want_event.parent_want_id, "sqlite_parent_123");
|
|
||||||
assert_eq!(want_event.partitions.len(), 1);
|
assert_eq!(want_event.partitions.len(), 1);
|
||||||
assert_eq!(want_event.partitions[0].r#ref, "sqlite_partition_1234");
|
assert_eq!(want_event.partitions[0].r#ref, "sqlite_partition_1234");
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -325,19 +238,33 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify state was updated
|
// Verify state was updated
|
||||||
let state = log.state.read().expect("couldn't take read lock");
|
|
||||||
assert!(
|
assert!(
|
||||||
state.wants.get(&want_id).is_some(),
|
log.state.get_want(&want_id).expect("query failed").is_some(),
|
||||||
"want_id not found in state"
|
"want_id not found in state"
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
state
|
log.state.get_want(&want_id)
|
||||||
.wants
|
.expect("Failed to get want from state")
|
||||||
.get(&want_id)
|
|
||||||
.map(|want| want.want_id.clone())
|
.map(|want| want.want_id.clone())
|
||||||
.expect("state.wants want_id not found"),
|
.expect("state.wants want_id not found"),
|
||||||
want_id,
|
want_id,
|
||||||
"want_id not equal in state",
|
"want_id not equal in state",
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let mut e2 = WantCreateEventV1::default();
|
||||||
|
e2.want_id = Uuid::new_v4().into();
|
||||||
|
log.append_event(&Event::WantCreateV1(e2)).expect("append_event failed");
|
||||||
|
let mut e3 = WantCreateEventV1::default();
|
||||||
|
e3.want_id = Uuid::new_v4().into();
|
||||||
|
log.append_event(&Event::WantCreateV1(e3)).expect("append_event failed");
|
||||||
|
let mut e4 = WantCreateEventV1::default();
|
||||||
|
e4.want_id = Uuid::new_v4().into();
|
||||||
|
log.append_event(&Event::WantCreateV1(e4)).expect("append_event failed");
|
||||||
|
|
||||||
|
let events = log
|
||||||
|
.storage
|
||||||
|
.list_events(0, 100)
|
||||||
|
.expect("Failed to list events");
|
||||||
|
assert_eq!(events.len(), 4);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
187
databuild/build_state.rs
Normal file
187
databuild/build_state.rs
Normal file
|
|
@ -0,0 +1,187 @@
|
||||||
|
use crate::data_build_event::Event;
|
||||||
|
use crate::util::current_timestamp;
|
||||||
|
use crate::{WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatusCode};
|
||||||
|
use rusqlite::types::{FromSql, FromSqlResult, ToSqlOutput, ValueRef};
|
||||||
|
use rusqlite::{Connection, ToSql};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::error::Error;
|
||||||
|
|
||||||
|
pub trait BuildState {
|
||||||
|
fn init(&mut self);
|
||||||
|
fn handle_event(&mut self, event: &Event) -> Result<(), Box<dyn Error>>;
|
||||||
|
|
||||||
|
fn get_want(&self, want_id: &str) -> Result<Option<WantDetail>, Box<dyn Error>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct SqliteBuildState {
|
||||||
|
conn: Connection,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for SqliteBuildState {
|
||||||
|
fn default() -> Self {
|
||||||
|
let conn = Connection::open_in_memory().unwrap();
|
||||||
|
let mut build_state = Self { conn };
|
||||||
|
build_state.init();
|
||||||
|
build_state
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SqliteBuildState {
|
||||||
|
fn want_create(&mut self, e: &WantCreateEventV1) -> Result<usize, Box<dyn Error>> {
|
||||||
|
self.conn
|
||||||
|
.execute(
|
||||||
|
r#"
|
||||||
|
INSERT INTO wants (id, data) values (?1, ?2)"#,
|
||||||
|
(e.want_id.as_str(), Json(WantDetail::from(e))),
|
||||||
|
)
|
||||||
|
.map_err(|e| e.into())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn want_update(&mut self, want: &WantDetail) -> Result<usize, Box<dyn Error>> {
|
||||||
|
self.conn
|
||||||
|
.execute(
|
||||||
|
r#"
|
||||||
|
UPDATE wants SET data = ?1 where id = ?2"#,
|
||||||
|
(Json(want.clone()), want.want_id.as_str()),
|
||||||
|
)
|
||||||
|
.map_err(|e| e.into())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn want_cancel(&mut self, e: &WantCancelEventV1) -> Result<usize, Box<dyn Error>> {
|
||||||
|
let mut want = self.get_want(e.want_id.as_str())?.unwrap();
|
||||||
|
want.status = Some(WantStatusCode::WantCanceled.into());
|
||||||
|
want.last_updated_timestamp = current_timestamp();
|
||||||
|
self.want_update(&want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BuildState for SqliteBuildState {
|
||||||
|
fn init(&mut self) {
|
||||||
|
self.conn
|
||||||
|
.execute(
|
||||||
|
r#"
|
||||||
|
create table wants (id text primary key, data text not null);
|
||||||
|
"#,
|
||||||
|
[],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_event(&mut self, event: &Event) -> Result<(), Box<dyn Error>> {
|
||||||
|
match event {
|
||||||
|
Event::WantCreateV1(e) => {
|
||||||
|
self.want_create(e)?;
|
||||||
|
}
|
||||||
|
Event::WantCancelV1(e) => {
|
||||||
|
self.want_cancel(e)?;
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_want(&self, want_id: &str) -> Result<Option<WantDetail>, Box<dyn Error>> {
|
||||||
|
let mut stmt = self.conn.prepare(
|
||||||
|
r#"
|
||||||
|
select data from wants where id = ?1 limit 1
|
||||||
|
"#,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let mut rows = stmt.query_map([want_id], |row| row.get(0))?;
|
||||||
|
|
||||||
|
match rows.next() {
|
||||||
|
Some(result) => result
|
||||||
|
.map(|r: Json<WantDetail>| Some(r.0))
|
||||||
|
.map_err(|e| e.into()),
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Json<T>(T);
|
||||||
|
|
||||||
|
impl<T: Serialize> ToSql for Json<T> {
|
||||||
|
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
|
||||||
|
Ok(ToSqlOutput::from(
|
||||||
|
serde_json::to_string(&self.0).expect("invalid json"),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: for<'de> Deserialize<'de>> FromSql for Json<T> {
|
||||||
|
fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
|
||||||
|
let s = value.as_str()?;
|
||||||
|
println!("{:?}", s);
|
||||||
|
Ok(Json(serde_json::from_str(s).expect("invalid json")))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
mod event_serde {
|
||||||
|
use crate::build_state::Json;
|
||||||
|
use crate::{PartitionRef, WantDetail};
|
||||||
|
use rusqlite::types::ToSql;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_should_serialize_data() {
|
||||||
|
let datum = WantDetail {
|
||||||
|
want_id: "1234".to_string(),
|
||||||
|
partitions: vec![PartitionRef {
|
||||||
|
r#ref: "myref".to_string(),
|
||||||
|
}],
|
||||||
|
data_timestamp: 0,
|
||||||
|
ttl_seconds: 0,
|
||||||
|
sla_seconds: 0,
|
||||||
|
source: None,
|
||||||
|
comment: None,
|
||||||
|
status: None,
|
||||||
|
last_updated_timestamp: 0,
|
||||||
|
};
|
||||||
|
let json = serde_json::to_string(&datum).unwrap();
|
||||||
|
println!("{}", json);
|
||||||
|
|
||||||
|
Json(datum).to_sql().expect("should serialize");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mod sqlite_build_state {
|
||||||
|
mod want {
|
||||||
|
use crate::build_state::{BuildState, SqliteBuildState};
|
||||||
|
use crate::{WantCancelEventV1, WantCreateEventV1, WantDetail};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_should_create_want() {
|
||||||
|
let mut e = WantCreateEventV1::default();
|
||||||
|
e.want_id = "1234".to_string();
|
||||||
|
e.partitions = vec!["mypart".into()];
|
||||||
|
|
||||||
|
let mut state = SqliteBuildState::default();
|
||||||
|
state.handle_event(&e.clone().into()).unwrap();
|
||||||
|
let want = state.get_want("1234").unwrap().unwrap();
|
||||||
|
let mut expected: WantDetail = e.into();
|
||||||
|
// Into will set this field as current timestamp
|
||||||
|
expected.last_updated_timestamp = want.last_updated_timestamp;
|
||||||
|
assert_eq!(want, expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_should_cancel_want() {
|
||||||
|
let mut e = WantCreateEventV1::default();
|
||||||
|
e.want_id = "1234".to_string();
|
||||||
|
e.partitions = vec!["mypart".into()];
|
||||||
|
|
||||||
|
let mut state = SqliteBuildState::default();
|
||||||
|
state.handle_event(&e.clone().into()).unwrap();
|
||||||
|
|
||||||
|
// Should be able to cancel
|
||||||
|
let mut e = WantCancelEventV1::default();
|
||||||
|
e.want_id = "1234".to_string();
|
||||||
|
state.handle_event(&e.clone().into()).unwrap();
|
||||||
|
let want = state.get_want("1234").unwrap().unwrap();
|
||||||
|
|
||||||
|
assert_eq!(want.status, Some(crate::WantStatusCode::WantCanceled.into()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -32,26 +32,28 @@ message DataBuildEvent {
|
||||||
|
|
||||||
// Source metadata for user-driven events
|
// Source metadata for user-driven events
|
||||||
message EventSource {
|
message EventSource {
|
||||||
// Revisit - how do we model this? See this chat: https://claude.ai/share/76622c1c-7489-496e-be81-a64fef24e636
|
oneof source {
|
||||||
EventSourceType source_type = 1;
|
ManuallyTriggeredEvent manually_triggered = 1;
|
||||||
string source_name = 2;
|
JobTriggeredEvent job_triggered = 2;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
message EventSourceType {
|
message ManuallyTriggeredEvent {
|
||||||
EventSourceCode code = 1;
|
string user = 1;
|
||||||
string name = 2;
|
|
||||||
}
|
}
|
||||||
enum EventSourceCode{
|
message JobTriggeredEvent {
|
||||||
Manual = 0;
|
string job_run_id = 1;
|
||||||
Automated = 1;
|
}
|
||||||
Propagated = 2;
|
|
||||||
|
message WantAttributedPartitions {
|
||||||
|
string want_id = 1;
|
||||||
|
repeated PartitionRef partitions = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Indicates buffer state for job.
|
// Indicates buffer state for job.
|
||||||
message JobRunBufferEventV1 {
|
message JobRunBufferEventV1 {
|
||||||
string job_run_id = 1;
|
string job_run_id = 1;
|
||||||
string job_label = 2;
|
string job_label = 2;
|
||||||
repeated string servicing_want_ids = 3;
|
repeated WantAttributedPartitions want_attributed_partitions = 3;
|
||||||
repeated string producing_partition_refs = 4;
|
|
||||||
// TODO how do we handle buffer definition? Start simple, noop until we want something here?
|
// TODO how do we handle buffer definition? Start simple, noop until we want something here?
|
||||||
}
|
}
|
||||||
// Just indicates that job has entered queue
|
// Just indicates that job has entered queue
|
||||||
|
|
@ -91,15 +93,15 @@ message MissingDeps {
|
||||||
|
|
||||||
|
|
||||||
message WantCreateEventV1 {
|
message WantCreateEventV1 {
|
||||||
|
// The unique ID of this want
|
||||||
string want_id = 1;
|
string want_id = 1;
|
||||||
string root_want_id = 2;
|
repeated PartitionRef partitions = 2;
|
||||||
string parent_want_id = 3;
|
uint64 data_timestamp = 3;
|
||||||
repeated PartitionRef partitions = 4;
|
uint64 ttl_seconds = 4;
|
||||||
uint64 data_timestamp = 5;
|
uint64 sla_seconds = 5;
|
||||||
uint64 ttl_seconds = 6;
|
// The source of the want. Can be from job, API, CLI, web app...
|
||||||
uint64 sla_seconds = 7;
|
EventSource source = 6;
|
||||||
EventSource source = 8;
|
optional string comment = 7;
|
||||||
optional string comment = 9;
|
|
||||||
}
|
}
|
||||||
message WantCancelEventV1 {
|
message WantCancelEventV1 {
|
||||||
string want_id = 1;
|
string want_id = 1;
|
||||||
|
|
@ -124,16 +126,35 @@ message TaintDeleteEventV1 {
|
||||||
// Build State
|
// Build State
|
||||||
|
|
||||||
// Represents the whole state of the system
|
// Represents the whole state of the system
|
||||||
message BuildState {
|
//message BuildState {
|
||||||
map<string, WantDetail> wants = 1;
|
// map<string, WantDetail> wants = 1;
|
||||||
map<string, PartitionDetail> partitions = 2;
|
// map<string, PartitionDetail> partitions = 2;
|
||||||
map<string, TaintDetail> taints = 3;
|
// map<string, TaintDetail> taints = 3;
|
||||||
map<string, JobRunDetail> job_runs = 4;
|
// map<string, JobRunDetail> job_runs = 4;
|
||||||
|
//}
|
||||||
|
|
||||||
|
message WantStatus {
|
||||||
|
WantStatusCode code = 1;
|
||||||
|
string name = 2;
|
||||||
|
}
|
||||||
|
enum WantStatusCode {
|
||||||
|
WantIdle = 0;
|
||||||
|
WantBuilding = 1;
|
||||||
|
WantFailed = 2;
|
||||||
|
WantSuccessful = 3;
|
||||||
|
WantCanceled = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message WantDetail {
|
message WantDetail {
|
||||||
string want_id = 1;
|
string want_id = 1;
|
||||||
repeated PartitionRef refs = 2;
|
repeated PartitionRef partitions = 2;
|
||||||
|
uint64 data_timestamp = 3;
|
||||||
|
uint64 ttl_seconds = 4;
|
||||||
|
uint64 sla_seconds = 5;
|
||||||
|
EventSource source = 6;
|
||||||
|
optional string comment = 7;
|
||||||
|
WantStatus status = 8;
|
||||||
|
uint64 last_updated_timestamp = 9;
|
||||||
// TODO
|
// TODO
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -143,7 +164,7 @@ message PartitionDetail {
|
||||||
// The partitions current status
|
// The partitions current status
|
||||||
PartitionStatus status = 2;
|
PartitionStatus status = 2;
|
||||||
// The latest update to the partition's status
|
// The latest update to the partition's status
|
||||||
optional uint64 last_updated_at = 3;
|
optional uint64 last_updated_timestamp = 3;
|
||||||
// IDs that associate the partition with other objects
|
// IDs that associate the partition with other objects
|
||||||
repeated string job_run_ids = 4;
|
repeated string job_run_ids = 4;
|
||||||
repeated string want_ids = 5;
|
repeated string want_ids = 5;
|
||||||
|
|
@ -155,12 +176,12 @@ message PartitionStatus {
|
||||||
}
|
}
|
||||||
enum PartitionStatusCode {
|
enum PartitionStatusCode {
|
||||||
// TODO how do we avoid copying job states here?
|
// TODO how do we avoid copying job states here?
|
||||||
Unknown = 0;
|
PartitionUnknown = 0;
|
||||||
Wanted = 1;
|
PartitionWanted = 1;
|
||||||
Building = 2;
|
PartitionBuilding = 2;
|
||||||
Live = 3;
|
PartitionLive = 3;
|
||||||
Failed = 4;
|
PartitionFailed = 4;
|
||||||
Tainted = 5;
|
PartitionTainted = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
message TaintDetail {
|
message TaintDetail {
|
||||||
|
|
|
||||||
2
databuild/event_defaults.rs
Normal file
2
databuild/event_defaults.rs
Normal file
|
|
@ -0,0 +1,2 @@
|
||||||
|
use uuid::Uuid;
|
||||||
|
use crate::{PartitionRef, WantCreateEventV1};
|
||||||
52
databuild/event_transforms.rs
Normal file
52
databuild/event_transforms.rs
Normal file
|
|
@ -0,0 +1,52 @@
|
||||||
|
use crate::util::current_timestamp;
|
||||||
|
use crate::{PartitionRef, WantCancelEventV1, WantCreateEventV1, WantDetail, WantStatus, WantStatusCode};
|
||||||
|
use crate::data_build_event::Event;
|
||||||
|
|
||||||
|
impl From<&WantCreateEventV1> for WantDetail {
|
||||||
|
fn from(e: &WantCreateEventV1) -> Self {
|
||||||
|
e.clone().into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl From<WantCreateEventV1> for WantDetail {
|
||||||
|
fn from(e: WantCreateEventV1) -> Self {
|
||||||
|
WantDetail {
|
||||||
|
want_id: e.want_id,
|
||||||
|
partitions: e.partitions,
|
||||||
|
data_timestamp: e.data_timestamp,
|
||||||
|
ttl_seconds: e.ttl_seconds,
|
||||||
|
sla_seconds: e.sla_seconds,
|
||||||
|
source: e.source,
|
||||||
|
comment: e.comment,
|
||||||
|
status: Default::default(),
|
||||||
|
last_updated_timestamp: current_timestamp(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl From<WantCreateEventV1> for Event {
|
||||||
|
fn from(value: WantCreateEventV1) -> Self {
|
||||||
|
Event::WantCreateV1(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl From<WantCancelEventV1> for Event {
|
||||||
|
fn from(value: WantCancelEventV1) -> Self {
|
||||||
|
Event::WantCancelV1(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl From<WantStatusCode> for WantStatus {
|
||||||
|
fn from(code: WantStatusCode) -> Self {
|
||||||
|
WantStatus {
|
||||||
|
code: code.into(),
|
||||||
|
name: code.as_str_name().to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&str> for PartitionRef {
|
||||||
|
fn from(value: &str) -> Self {
|
||||||
|
Self {
|
||||||
|
r#ref: value.to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -14,7 +14,7 @@ impl JobConfiguration {
|
||||||
/** Launch job to build the partitions specified by the provided wants. */
|
/** Launch job to build the partitions specified by the provided wants. */
|
||||||
pub fn spawn(&self, wants: Vec<WantDetail>) -> Result<JobRun, Box<dyn Error>> {
|
pub fn spawn(&self, wants: Vec<WantDetail>) -> Result<JobRun, Box<dyn Error>> {
|
||||||
let wanted_refs: Vec<PartitionRef> =
|
let wanted_refs: Vec<PartitionRef> =
|
||||||
wants.iter().flat_map(|want| want.refs.clone()).collect();
|
wants.iter().flat_map(|want| want.partitions.clone()).collect();
|
||||||
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
|
let args: Vec<String> = wanted_refs.iter().map(|pref| pref.r#ref.clone()).collect();
|
||||||
JobRun::spawn(self.entrypoint.clone(), args)
|
JobRun::spawn(self.entrypoint.clone(), args)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@ use crate::data_build_event::Event::{JobRunFailureV1, JobRunSuccessV1};
|
||||||
use crate::{DataBuildEvent, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunSuccessEventV1};
|
use crate::{DataBuildEvent, JobRunFailureEventV1, JobRunHeartbeatEventV1, JobRunSuccessEventV1};
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::io::{BufRead, BufReader};
|
use std::io::{BufRead, BufReader};
|
||||||
use std::ops::{Deref, DerefMut};
|
|
||||||
use std::process::{Child, Command, ExitStatus, Stdio};
|
use std::process::{Child, Command, ExitStatus, Stdio};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
|
@ -69,7 +68,7 @@ impl JobRun {
|
||||||
// Parse BEL events from child process
|
// Parse BEL events from child process
|
||||||
let new_events = Self::process_lines(self.job_run_id, &self.unhandled_lines);
|
let new_events = Self::process_lines(self.job_run_id, &self.unhandled_lines);
|
||||||
for event in new_events {
|
for event in new_events {
|
||||||
self.events.append_event(event)?;
|
self.events.append_event(&event)?;
|
||||||
}
|
}
|
||||||
self.unhandled_lines.drain(..);
|
self.unhandled_lines.drain(..);
|
||||||
|
|
||||||
|
|
@ -79,12 +78,12 @@ impl JobRun {
|
||||||
Some(status) => {
|
Some(status) => {
|
||||||
if status.success() {
|
if status.success() {
|
||||||
self.events
|
self.events
|
||||||
.append_event(JobRunSuccessV1(JobRunSuccessEventV1 {
|
.append_event(&JobRunSuccessV1(JobRunSuccessEventV1 {
|
||||||
job_run_id: self.job_run_id.into(),
|
job_run_id: self.job_run_id.into(),
|
||||||
}))?;
|
}))?;
|
||||||
} else {
|
} else {
|
||||||
self.events
|
self.events
|
||||||
.append_event(JobRunFailureV1(JobRunFailureEventV1 {
|
.append_event(&JobRunFailureV1(JobRunFailureEventV1 {
|
||||||
job_run_id: self.job_run_id.into(),
|
job_run_id: self.job_run_id.into(),
|
||||||
}))?;
|
}))?;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,11 @@ mod build_event_log;
|
||||||
mod orchestrator;
|
mod orchestrator;
|
||||||
mod job_run;
|
mod job_run;
|
||||||
mod job;
|
mod job;
|
||||||
|
mod bel_reducers;
|
||||||
|
mod util;
|
||||||
|
mod build_state;
|
||||||
|
mod event_transforms;
|
||||||
|
mod event_defaults;
|
||||||
|
|
||||||
// Include generated protobuf code
|
// Include generated protobuf code
|
||||||
include!("databuild.rs");
|
include!("databuild.rs");
|
||||||
|
|
|
||||||
|
|
@ -5,14 +5,15 @@ use crate::{PartitionRef, WantDetail};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
|
use crate::build_state::BuildState;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads +
|
Orchestrator turns wants, config, and BEL state into scheduled jobs. It uses lightweight threads +
|
||||||
the visitor pattern to monitor job exec progress and liveness, and adds
|
the visitor pattern to monitor job exec progress and liveness, and adds
|
||||||
*/
|
*/
|
||||||
|
|
||||||
struct Orchestrator<B: BELStorage + Debug> {
|
struct Orchestrator<S: BELStorage + Debug, B: BuildState + Default> {
|
||||||
bel: BuildEventLog<B>,
|
bel: BuildEventLog<S, B>,
|
||||||
job_runs: Vec<JobRunHandle>,
|
job_runs: Vec<JobRunHandle>,
|
||||||
config: OrchestratorConfig,
|
config: OrchestratorConfig,
|
||||||
}
|
}
|
||||||
|
|
@ -58,10 +59,10 @@ struct GroupedWants {
|
||||||
unhandled_wants: Vec<WantDetail>,
|
unhandled_wants: Vec<WantDetail>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B: BELStorage + Debug> Orchestrator<B> {
|
impl<S: BELStorage + Debug, B: BuildState + Default> Orchestrator<S, B> {
|
||||||
fn new(storage: B, config: OrchestratorConfig) -> Self {
|
fn new(storage: S, config: OrchestratorConfig) -> Self {
|
||||||
Self {
|
Self {
|
||||||
bel: BuildEventLog::new(storage),
|
bel: BuildEventLog::new(storage, Default::default()),
|
||||||
job_runs: Vec::new(),
|
job_runs: Vec::new(),
|
||||||
config,
|
config,
|
||||||
}
|
}
|
||||||
|
|
@ -81,7 +82,7 @@ impl<B: BELStorage + Debug> Orchestrator<B> {
|
||||||
.filter_map(|event| event.event.clone())
|
.filter_map(|event| event.event.clone())
|
||||||
.for_each(|event| {
|
.for_each(|event| {
|
||||||
self.bel
|
self.bel
|
||||||
.append_event(event.clone())
|
.append_event(&event)
|
||||||
.expect("Failed to append event");
|
.expect("Failed to append event");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
@ -96,7 +97,7 @@ impl<B: BELStorage + Debug> Orchestrator<B> {
|
||||||
fn poll_wants(&mut self) -> Result<(), Box<dyn Error>> {
|
fn poll_wants(&mut self) -> Result<(), Box<dyn Error>> {
|
||||||
// Collect unhandled wants, group by job that handles each partition,
|
// Collect unhandled wants, group by job that handles each partition,
|
||||||
let grouped_wants =
|
let grouped_wants =
|
||||||
Orchestrator::<B>::group_wants(&self.config, &self.bel.schedulable_wants());
|
Orchestrator::<S, B>::group_wants(&self.config, &self.bel.schedulable_wants());
|
||||||
|
|
||||||
if !grouped_wants.want_groups.is_empty() {
|
if !grouped_wants.want_groups.is_empty() {
|
||||||
// All wants must be mapped to jobs that can be handled
|
// All wants must be mapped to jobs that can be handled
|
||||||
|
|
@ -120,7 +121,7 @@ impl<B: BELStorage + Debug> Orchestrator<B> {
|
||||||
let mut want_groups: HashMap<String, Vec<WantDetail>> = Default::default();
|
let mut want_groups: HashMap<String, Vec<WantDetail>> = Default::default();
|
||||||
let mut unhandled_wants: Vec<WantDetail> = Default::default();
|
let mut unhandled_wants: Vec<WantDetail> = Default::default();
|
||||||
wants.iter().for_each(|want| {
|
wants.iter().for_each(|want| {
|
||||||
want.refs.iter().for_each(|pref| {
|
want.partitions.iter().for_each(|pref| {
|
||||||
let matched_job = config.match_job_partition(pref);
|
let matched_job = config.match_job_partition(pref);
|
||||||
match matched_job {
|
match matched_job {
|
||||||
None => unhandled_wants.push(want.clone()),
|
None => unhandled_wants.push(want.clone()),
|
||||||
|
|
@ -194,6 +195,7 @@ mod tests {
|
||||||
use super::super::*;
|
use super::super::*;
|
||||||
use crate::build_event_log::MemoryBELStorage;
|
use crate::build_event_log::MemoryBELStorage;
|
||||||
use crate::{PartitionRef, WantDetail};
|
use crate::{PartitionRef, WantDetail};
|
||||||
|
use crate::build_state::SqliteBuildState;
|
||||||
|
|
||||||
fn create_job_config(label: &str, pattern: &str) -> JobConfiguration {
|
fn create_job_config(label: &str, pattern: &str) -> JobConfiguration {
|
||||||
JobConfiguration {
|
JobConfiguration {
|
||||||
|
|
@ -206,12 +208,19 @@ mod tests {
|
||||||
fn create_want_detail(want_id: &str, partition_refs: Vec<&str>) -> WantDetail {
|
fn create_want_detail(want_id: &str, partition_refs: Vec<&str>) -> WantDetail {
|
||||||
WantDetail {
|
WantDetail {
|
||||||
want_id: want_id.to_string(),
|
want_id: want_id.to_string(),
|
||||||
refs: partition_refs
|
partitions: partition_refs
|
||||||
.iter()
|
.iter()
|
||||||
.map(|r| PartitionRef {
|
.map(|r| PartitionRef {
|
||||||
r#ref: r.to_string(),
|
r#ref: r.to_string(),
|
||||||
})
|
})
|
||||||
.collect(),
|
.collect(),
|
||||||
|
data_timestamp: 0,
|
||||||
|
ttl_seconds: 0,
|
||||||
|
sla_seconds: 0,
|
||||||
|
source: None,
|
||||||
|
comment: None,
|
||||||
|
status: None,
|
||||||
|
last_updated_timestamp: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -220,7 +229,7 @@ mod tests {
|
||||||
let config = OrchestratorConfig { jobs: vec![] };
|
let config = OrchestratorConfig { jobs: vec![] };
|
||||||
let wants = vec![];
|
let wants = vec![];
|
||||||
|
|
||||||
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
|
let result = Orchestrator::<MemoryBELStorage, SqliteBuildState>::group_wants(&config, &wants);
|
||||||
|
|
||||||
assert!(result.want_groups.is_empty());
|
assert!(result.want_groups.is_empty());
|
||||||
assert!(result.unhandled_wants.is_empty());
|
assert!(result.unhandled_wants.is_empty());
|
||||||
|
|
@ -235,7 +244,7 @@ mod tests {
|
||||||
let want = create_want_detail("want1", vec!["partition1"]);
|
let want = create_want_detail("want1", vec!["partition1"]);
|
||||||
let wants = vec![want.clone()];
|
let wants = vec![want.clone()];
|
||||||
|
|
||||||
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
|
let result = Orchestrator::<MemoryBELStorage, SqliteBuildState>::group_wants(&config, &wants);
|
||||||
|
|
||||||
assert!(result.unhandled_wants.is_empty());
|
assert!(result.unhandled_wants.is_empty());
|
||||||
assert_eq!(result.want_groups.len(), 1);
|
assert_eq!(result.want_groups.len(), 1);
|
||||||
|
|
@ -253,7 +262,7 @@ mod tests {
|
||||||
let want = create_want_detail("want1", vec!["different_partition"]);
|
let want = create_want_detail("want1", vec!["different_partition"]);
|
||||||
let wants = vec![want.clone()];
|
let wants = vec![want.clone()];
|
||||||
|
|
||||||
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
|
let result = Orchestrator::<MemoryBELStorage, SqliteBuildState>::group_wants(&config, &wants);
|
||||||
|
|
||||||
assert_eq!(result.unhandled_wants.len(), 1);
|
assert_eq!(result.unhandled_wants.len(), 1);
|
||||||
assert_eq!(result.unhandled_wants[0].want_id, "want1");
|
assert_eq!(result.unhandled_wants[0].want_id, "want1");
|
||||||
|
|
@ -273,7 +282,7 @@ mod tests {
|
||||||
let want3 = create_want_detail("want3", vec!["pattern2_partition"]);
|
let want3 = create_want_detail("want3", vec!["pattern2_partition"]);
|
||||||
let wants = vec![want1, want2, want3];
|
let wants = vec![want1, want2, want3];
|
||||||
|
|
||||||
let result = Orchestrator::<MemoryBELStorage>::group_wants(&config, &wants);
|
let result = Orchestrator::<MemoryBELStorage, SqliteBuildState>::group_wants(&config, &wants);
|
||||||
|
|
||||||
assert!(result.unhandled_wants.is_empty());
|
assert!(result.unhandled_wants.is_empty());
|
||||||
assert_eq!(result.want_groups.len(), 2);
|
assert_eq!(result.want_groups.len(), 2);
|
||||||
|
|
|
||||||
7
databuild/util.rs
Normal file
7
databuild/util.rs
Normal file
|
|
@ -0,0 +1,7 @@
|
||||||
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
|
pub fn current_timestamp() -> u64 {
|
||||||
|
let now = SystemTime::now();
|
||||||
|
let duration_since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
|
||||||
|
duration_since_epoch.as_nanos() as u64
|
||||||
|
}
|
||||||
|
|
@ -5,10 +5,10 @@ Purpose: Enable continuous reconciliation of partition wants through distributed
|
||||||
DataBuild uses a want-driven reconciliation model inspired by Kubernetes. Users declare wants (desired partitions), and the system continuously attempts to satisfy them through job execution.
|
DataBuild uses a want-driven reconciliation model inspired by Kubernetes. Users declare wants (desired partitions), and the system continuously attempts to satisfy them through job execution.
|
||||||
|
|
||||||
### Key Components
|
### Key Components
|
||||||
- [**Wants**](./wants.md): Declarations of desired partitions with TTLs and SLAs
|
- [**Wants**](wants.md): Declarations of desired partitions with TTLs and SLAs
|
||||||
- **Jobs**: Stateless executables that transform input partitions to outputs
|
- **Jobs**: Stateless executables that transform input partitions to outputs
|
||||||
- **Graph**: Reconciliation runtime that monitors wants and dispatches jobs
|
- **Graph**: Reconciliation runtime that monitors wants and dispatches jobs
|
||||||
- [**Build Event Log (BEL)**](./build-event-log.md): Event-sourced ledger of all system activity
|
- [**Build Event Log (BEL)**](build-event-log.md): Event-sourced ledger of all system activity
|
||||||
|
|
||||||
## Reconciliation Loop
|
## Reconciliation Loop
|
||||||
The graph continuously:
|
The graph continuously:
|
||||||
|
|
@ -1,9 +1,9 @@
|
||||||
|
|
||||||
# `Job`
|
# `Job`
|
||||||
Atomic unit of work, producing and consuming specific partitions. See [jobs](./core-build.md#jobs).
|
Atomic unit of work, producing and consuming specific partitions. See [jobs](core-build.md#jobs).
|
||||||
|
|
||||||
# `Graph`
|
# `Graph`
|
||||||
Composes [jobs](#job) to build partitions. See [graphs](./core-build.md#graphs)
|
Composes [jobs](#job) to build partitions. See [graphs](core-build.md#graphs)
|
||||||
|
|
||||||
# `Partition`
|
# `Partition`
|
||||||
Partitions are atomic units of data, produced and depended on by jobs. A job can produce multiple partitions, but
|
Partitions are atomic units of data, produced and depended on by jobs. A job can produce multiple partitions, but
|
||||||
|
|
@ -13,7 +13,7 @@ multiple jobs cannot produce the same partition - e.g. job -> partition relation
|
||||||
PartitionsRefs are strings that uniquely identify partitions. They can contain anything, but generally they are S3
|
PartitionsRefs are strings that uniquely identify partitions. They can contain anything, but generally they are S3
|
||||||
URIs, like `s3://companybkt/datasets/foo/date=2025-01-01`, or custom formats like
|
URIs, like `s3://companybkt/datasets/foo/date=2025-01-01`, or custom formats like
|
||||||
`dal://prod/clicks/region=4/date=2025-01-01/`. PartitionRefs are used as dependency signals during
|
`dal://prod/clicks/region=4/date=2025-01-01/`. PartitionRefs are used as dependency signals during
|
||||||
[task graph analysis](./core-build.md#graphanalyze). To enable explicit coupling and ergonomics, there are generally
|
[task graph analysis](core-build.md#graphanalyze). To enable explicit coupling and ergonomics, there are generally
|
||||||
helper classes for creating, parsing, and accessing fields for PartitionRefs in [GDLs](#graph-specification-language-gsl).
|
helper classes for creating, parsing, and accessing fields for PartitionRefs in [GDLs](#graph-specification-language-gsl).
|
||||||
|
|
||||||
# `PartitionPattern`
|
# `PartitionPattern`
|
||||||
|
|
@ -22,5 +22,5 @@ expected output partition?)
|
||||||
|
|
||||||
# Graph Definition Language (GDL)
|
# Graph Definition Language (GDL)
|
||||||
Language-specific libraries that make implementing databuild graphs and jobs more succinct and ergonomic.
|
Language-specific libraries that make implementing databuild graphs and jobs more succinct and ergonomic.
|
||||||
See [graph specification](./graph-specification.md).
|
See [graph specification](graph-specification.md).
|
||||||
|
|
||||||
|
|
@ -14,7 +14,7 @@ AKA the different ways databuild applications can be described.
|
||||||
- Graph lookup binary (lookup)
|
- Graph lookup binary (lookup)
|
||||||
- Job target (with working exec binary)
|
- Job target (with working exec binary)
|
||||||
- Graph target
|
- Graph target
|
||||||
- See [core build](./core-build.md) for details
|
- See [core build](core-build.md) for details
|
||||||
|
|
||||||
## Python
|
## Python
|
||||||
|
|
||||||
|
|
@ -11,7 +11,7 @@ All observability flows through the job wrapper:
|
||||||
- **Jobs** emit application logs to stdout/stderr
|
- **Jobs** emit application logs to stdout/stderr
|
||||||
- **Wrapper** captures and enriches with structured metadata
|
- **Wrapper** captures and enriches with structured metadata
|
||||||
- **Graph** parses structured logs into metrics, events, and monitoring data
|
- **Graph** parses structured logs into metrics, events, and monitoring data
|
||||||
- [**BEL**](./build-event-log.md) stores aggregated telemetry for historical analysis
|
- [**BEL**](build-event-log.md) stores aggregated telemetry for historical analysis
|
||||||
|
|
||||||
### Communication Protocol
|
### Communication Protocol
|
||||||
Log-based telemetry using protobuf-defined structured messages:
|
Log-based telemetry using protobuf-defined structured messages:
|
||||||
12
docs/narrative/why-not-dags.md
Normal file
12
docs/narrative/why-not-dags.md
Normal file
|
|
@ -0,0 +1,12 @@
|
||||||
|
|
||||||
|
- Airflow and Luigi are OG data orchestrators that inspired databuild
|
||||||
|
- Airflow uses explicit declaration of DAG structure
|
||||||
|
- Luigi uses implicit, discovered DAG structure
|
||||||
|
- Both use DAG runs as a top-level unit of execution
|
||||||
|
- This is nice because you can see what's going to happen after the DAG run has launched
|
||||||
|
- This is not nice because you have to deal with mid-execution DAG runs during deployments - what do you do?
|
||||||
|
- Do you terminate existing dag runs and retrigger? (what if the workload is stateful? Don't do that!)
|
||||||
|
- Do you let existing dag runs finish?
|
||||||
|
- How do you deal with DAG run identity under changing DAG definition?
|
||||||
|
- These questions are all red herrings. We don't care about the DAG definition - we care about the data we want to produce.
|
||||||
|
- We should instead declare what partitions we want, and iteratively propagate
|
||||||
2
docs/narrative/why-not-push.md
Normal file
2
docs/narrative/why-not-push.md
Normal file
|
|
@ -0,0 +1,2 @@
|
||||||
|
Or "why pull"?
|
||||||
|
|
||||||
Loading…
Reference in a new issue