Update docs and protos
Some checks are pending
/ setup (push) Waiting to run

This commit is contained in:
Stuart Axelbrooke 2025-05-04 18:52:34 -07:00
parent d7fd136ef3
commit fc4b09b007
No known key found for this signature in database
GPG key ID: 1B0A848C29D46A35
14 changed files with 322 additions and 115 deletions

View file

@ -3,4 +3,4 @@
A bazel-based data build system.
For important context, check out [the manifesto](./manifesto.md), and [core concepts](./core-concepts.md).
For important context, check out [the manifesto](./manifesto.md), and [core concepts](./core-concepts.md). Also, check out [`databuild.proto`](./databuild.proto) for key system interfaces.

View file

@ -3,95 +3,80 @@
- Only local dependency knowledge to develop
- Not a framework (what does this mean?)
# Verbs / Phases
- What do engineers describe?
- Jobs - unit of data processing
- Configure executable (rule impl)
- Run executable (generated file contents)
- Graphs - data service
- Plan (resolving partition refs to jobs that build them)
# Organizing Philosophy
Jobs
1. `job.configure(refs) -> Seq[JobConfig]` - Produces a `JobConfig` for producing a set of partitions
2. `job.execute(refs) -> Seq[PartitionManifest]` - Produces a job config and immediately runs job run executable with it
Many large-scale systems for producing data leave the complexity of true orchestration to the user - even DAG-based systems for implementing dependencies leave the system as a collection of DAGs, requiring engineers to solve the same "why doesn't this data exist?" and "how do I build this data?"
Graphs
1. `graph.plan(refs) -> JobGraph` - Produces a `JobGraph` that fully describes the building of requested partition refs (with preconditions)
2. `graph.execute(refs) -> Seq[PartitionManifest]` - Executes the `JobGraph` eagerly, produces the partition manifests of the underlying finished jobs (emits build events?)
3. `graph.lookup(refs) -> Map[JobLabel, ref]`
DataBuild takes inspiration from modern data orchestration and build systems to fully internalize this complexity, using the Job concept to localize all decisions of turning upstream data into output data (and making all dependencies explicit); and the Graph concept to handle composition of jobs, answering what sequence of jobs must be run to build a specific partition of data. With Jobs and Graphs, DataBuild takes complete responsibility for the data build process, allowing engineers to consider concerns only local to the jobs relevant to their feature.
## Job Configuration
The process of fully parameterizing a job run based on the desired partitions. This parameterizes the job run executable.
```
case class JobConfig(
// The partitions that this parameterization produces
outputs: Set[PartitionRef],
inputs: Set[DataDep],
args: Seq[String],
env: Map[String, String],
// Path to executable that will be run
executable: String,
)
Graphs and jobs are defined in [bazel](https://bazel.build), allowing graphs (and their constituent jobs) to be built and deployed trivially.
case class DataDep (
// E.g. query, materialize
depType: DepType,
ref: PartitionRef,
// Excluded for now
// timeoutSeconds: int,
)
```
## Job Execution
Jobs produce partition manifests:
```
case class PartitionManifest(
outputs: Set[PartitionRef],
inputs: Set[PartitionManifest],
startTime: long,
endTime: long,
config: JobConfig,
)
```
## Graph Planning
The set of job configs that, if run in topo-sorted order, will produce these `outputs`.
```
case class JobGraph(
outputs: Set[PartitionRef],
// Needs the executable too? How do we reference here?
nodes: Set[JobConfig],
)
```
# Nouns / Verbs / Phases
The databuild graph needs to:
- Analyze: use the provided partition refs to determine all involved jobs and their configs in a job graph
- Plan: Determine the literal jobs to execute (skipping/pruning valid cached partitions)
- Compile: compile the graph into an artifact that runs the materialize process <-- sounds like an application consideration?
## Partitions
DataBuild is fundamentally about composing graphs of jobs and partitions of data, where partitions are the things we want to produce, or are the nodes between jobs. E.g., in a machine learning pipeline, a partition would be the specific training dataset produced for a given date, model version, etc, that would in turn be read by the model training job, which would itself produce a partition representing the trained model itself.
Perhaps these are different capabilities - e.g. producing a bash script that runs the build process is a fundamentally separate thing than the smarter stateful thing that manages these builds over time, pruning cached builds, etc. And we could make the **partition catalog pluggable**!
Partitions are assumed to be atomic and final (for final input partitions), such that it is unambiguous in what cases a partition must be (re)calculated.
## Partition References
A partition reference (or partition ref) is a serialized reference to a literal partition of data. This can be anything, so long as it uniquely identifies its partition, but something path-like or URI-like is generally advisable for ergonomics purposes; e.g. `/datasets/reviews/v1/date=2025-05-04/country=usa` or `dal://ranker/features/return_stats/2025/05/04/`.
## Jobs
```mermaid
flowchart
jobs & outputs --> plan --> job_graph
job_graph --> compile.bash --> build_script
job_graph & partition_catalog --> partition_build_service
flowchart LR
upstream_a[(Upstream Partition A)]
upstream_b[(Upstream Partition B)]
job[Job]
output_c[(Output Partition C)]
output_d[(Output Partition D)]
upstream_a & upstream_b --> job --> output_c & output_d
```
# Build Graph / Action Graph
- merkle tree + dataset versions (semver?)
- mask upstream changes that aren't major
- content addressable storage based on action keys that point to merkle tree
- compile to set of build files? (thrash with action graph?)
- catalog of partition manifests + code artifacts enables this
- start with basic presence check
- side effects expected
- partition manifests as output artifact?
- this is orchestration layer concern because `configure` needs to be able to invalidate cache
# Assumptions
- Job runs are independent, e.g. if run X is already producing partition A, run Y can safely prune A... during configure?
- Job runs are idempotent (e.g. overwrite)
- A `databuild_graph` can be deployed "unambiguously" (lol)
In DataBuild, `Job`s are the atomic unit of data processing, representing the mapping of upstream partitions into output partitions. A job is defined by two capabilities: 1) expose an executable to run the job and produce the desired partitions of data (configured via env vars and args), retuning manifests that describe produced partitions; and 2) exposes a configuration executable that turns references to desired partitions into a job config that fully configures said job executable to produce the desired partitions.
# Questions
Jobs are assumed to be idempotent and independent, such that two jobs configured to produce separate partitions can run without interaction. These assumptions allow jobs to state only their immediate upstream and output data dependencies (the partitions they consume and produce), and in a graph leave no ambiguity about what must be done to produce a desired partition.
Jobs are implemented via the [`databuild_job`](./rules.bzl) bazel rule. An extremely basic job definition can be found in the [basic_job example](./examples/basic_job/).
## Graphs
A `Graph` is the composition of jobs and partitions via their data dependencies. Graphs answer "what partitions does a job require to produce its outputs?", and "what job must be run to produce a given partition?" Defining a graph relies on only the list of involved jobs, and a lookup executable that transforms desired partitions into the job(s) that produce.
Graphs expose two entrypoints: `graph.analyze`, which produces the literal `JobGraph` specifying the structure of the build graph to be execute to build a specific set of partitions (enabling visualization, planning, precondition checking, etc); and `graph.build`, which runs the build process for a set of requested partitions (relying on `graph.analyze` to plan). Other entrypoints are described in the [graph README](./graph/README.md).
Graphs are implemented via the [`databuild_graph`](./rules.bzl) bazel rule. A basic graph definition can be found in the [basic_graph example](./examples/basic_graph/).
### Implementing a Graph
To make a fully described graph, engineers must define:
- `databuild_job`s
- Implementing the exec and config targets for each
- A `databuild_graph` (referencing a `lookup` binary to resolve jobs)
And that's it!
## Catalog
A catalog is a database of partition manifests and past/in-progress graph builds and job runs. When run with a catalog, graphs can:
- Skip jobs whose outputs are already present and up to date.
- Safely run data builds in parallel, delegating overlapping partition requests to already scheduled/running jobs.
TODO - plan and implement this functionality.
---
# Appendix
## Future
- Partition versions - e.g. how to not invalidate prior produced data with every code change?
- merkle tree + semver as implementation?
- mask upstream changes that aren't major
- content addressable storage based on action keys that point to merkle tree
- compile to set of build files? (thrash with action graph?)
- catalog of partition manifests + code artifacts enables this
- start with basic presence check?
## Questions
- How does partition overlap work? Can it be pruned? Or throw during configure? This sounds like a very common case
- Answer: this is a responsibility of a live service backed by a datastore. If jobs are in-fact independent, then refs requested by another build can be "delegated" to the already jobs for those refs.
- How do we implement job lookup for graphs? Is this a job catalog thing?
@ -136,8 +121,8 @@ This needs to be deployable trivially from day one because:
1. `databuild_graph.analyze`
2. `databuild_graph` provider ✅
3. `databuild_graph.exec`
4. `databuild_graph.build`
5. `databuild_graph.mermaid`
4. `databuild_graph.build`
5. `databuild_graph.mermaid`
5. podcast reviews example
6. Reflect (data versioning/caching/partition manifests, partition overlap, ...?)
@ -170,7 +155,7 @@ Implementation:
- Bazel to describe jobs/graphs
- Whatever you want to implement jobs and graphs (need solid interfaces)
```python
```starlark
databuild_graph(
name = "my_graph",
jobs = [":my_job", ...],

View file

@ -1,9 +1,22 @@
edition = "2023";
syntax = "proto3";
import "google/protobuf/timestamp.proto";
import "google/protobuf/struct.proto";
package databuild.v1;
message PartitionRef {
string str = 1;
}
///////////////////////////////////////////////////////////////////////////////////////////////
// Jobs
///////////////////////////////////////////////////////////////////////////////////////////////
//
// Job Config
//
// The type of dependency
enum DepType {
QUERY = 0; // Default
@ -13,13 +26,13 @@ enum DepType {
// Represents a data dependency
message DataDep {
DepType dep_type = 1;
string partition_ref = 2;
PartitionRef partition_ref = 2;
}
// Configuration for a job
message JobConfig {
// The partitions that this parameterization produces
repeated string outputs = 1;
repeated PartitionRef outputs = 1;
// Required data dependencies
repeated DataDep inputs = 2;
@ -31,10 +44,26 @@ message JobConfig {
map<string, string> env = 4;
}
// Represents a partition manifest
// Request message for job configuration service
message JobConfigureRequest { repeated PartitionRef outputs = 1; }
// Response message for job configuration service
message JobConfigureResponse { repeated JobConfig configs = 1; }
// Implemented by the job.cfg bazel rule
service JobConfigure {
rpc Configure(JobConfigureRequest) returns (JobConfigureResponse);
}
//
// Job Exec
//
// Manifest that records the literal partitions consumed (and their manifests) in order to
// produce the specified partitions
message PartitionManifest {
// The refs of the partitions produced by this job
repeated string outputs = 1;
repeated PartitionRef outputs = 1;
// Input partition manifests
repeated PartitionManifest inputs = 2;
@ -49,52 +78,231 @@ message PartitionManifest {
Task task = 5;
}
message JobExecuteRequest { repeated PartitionRef outputs = 1; }
// Metadata for the complete set of partitions produced by this job
message JobExecuteResponse { repeated PartitionManifest manifests = 1; }
// Implemented by the job.exec bazel rule
service JobExecute {
rpc Execute(JobExecuteRequest) returns (JobExecuteResponse);
}
///////////////////////////////////////////////////////////////////////////////////////////////
// Graphs
///////////////////////////////////////////////////////////////////////////////////////////////
//
// GraphLookup
//
message JobLabel {
// The bazel label the references the job_target
string label = 1;
}
message GraphLookupRequest { repeated PartitionRef outputs = 1; }
// Represents a not-yet configured task
message TaskRef {
// The job whose configure/exec targets will be used
JobLabel job = 1;
// The partition refs this task is responsible for producing, and with which the configure
// target will be invoked
repeated PartitionRef outputs = 2;
}
// Represents the complete set of tasks needed to produce the requested partitions
message GraphLookupResponse { repeated TaskRef task_refs = 1; }
// Implemented per graph
service GraphLookup {
rpc Lookup(GraphLookupRequest) returns (GraphLookupResponse);
}
// Request message for graph analyze service
message GraphAnalyzeRequest { repeated PartitionRef outputs = 1; }
//
// JobGraph
//
message Task {
// The bazel label uniquely identifying the job
string job_label = 1;
JobLabel job = 1;
// The configuration for the job
JobConfig config = 2;
}
// The bazel label referencing the graph
message GraphLabel { string label = 1; }
// Represents a job graph
message JobGraph {
// The bazel label of the graph to be executed
GraphLabel label = 1;
// The output partitions to be produced by this graph
repeated string outputs = 1;
repeated PartitionRef outputs = 2;
// The job configurations that make up this graph
repeated Task nodes = 2;
}
// Request message for job configuration service
message JobConfigRequest {
// The output references to configure jobs for
repeated string output_refs = 1;
}
// Response message for job configuration service
message JobConfigResponse {
// The job configurations
repeated JobConfig configs = 1;
}
// Request message for graph analyze service
message GraphAnalyzeRequest {
// The output references to analyze
repeated string output_refs = 1;
repeated Task nodes = 3;
}
// Response message for graph analyze service
message GraphAnalyzeResponse {
// The job graph
JobGraph graph = 1;
}
message GraphAnalyzeResponse { JobGraph graph = 1; }
message GraphExecuteResponse { repeated PartitionManifest manifests = 1; }
message GraphBuildRequest { repeated PartitionRef outputs = 1; }
message GraphBuildResponse { repeated PartitionManifest manifests = 1; }
// Service for job configuration and graph analysis
service DataBuildService {
// Get job configurations for the specified output references
rpc GetJobConfigs(JobConfigRequest) returns (JobConfigResponse) {}
// // Get job configurations for the specified output references
// rpc GetJobConfigs(JobConfigureRequest) returns (JobConfigureResponse) {}
// Analyze and get the job graph for the specified output references
rpc AnalyzeGraph(GraphAnalyzeRequest) returns (GraphAnalyzeResponse) {}
rpc AnalyzeGraph(GraphAnalyzeRequest) returns (GraphAnalyzeResponse);
// Execute the specified job graph (implemented by databuild)
rpc Execute(JobGraph) returns (GraphExecuteResponse);
// User-facing: build the desired partitions
rpc Build(GraphBuildRequest) returns (GraphBuildResponse);
}
///////////////////////////////////////////////////////////////////////////////////////////////
// Catalog
///////////////////////////////////////////////////////////////////////////////////////////////
//
// Job Run Events
//
// The complete lifecycle that each job goes through
enum JobRunStatus {
// Initial state
NOT_SCHEDULED = 0;
// Job preconditions met, job scheduled
JOB_SCHEDULED = 1;
// Job execution has begun
JOB_RUNNING = 2;
// Job execution finished successfully
JOB_SUCCESS = 3;
// Job execution failed
JOB_FAILED = 4;
}
message JobConfigEvent { JobConfig config = 1; }
message JobLogsEvent { repeated string logs = 1; }
//
message JobEvent {
// IDs
// Unique ID for this job graph run
string job_graph_run_id = 1;
// Unique ID for this job run
string job_run_id = 2;
// Unique ID for this event
string job_graph_run_event_id = 3;
// Sequence number for this event, such that the ordering within job_graph_run_id is correct
int64 sequence = 4;
// Metadata
// Status of the job run as of this event
JobRunStatus status = 10;
// Hash key of (label, outputs) to associate different runs over time
string job_key = 11;
JobLabel label = 12;
repeated PartitionRef outputs = 13;
// Sum type of the below events
JobConfigEvent config = 20;
JobLogsEvent logs = 21;
}
//
// Job Graph Run Events
//
message GraphAnalyzeEvent { JobGraph graph = 1; }
// Represents a change in status for a datadep
message GraphDataDepEvent {}
// The complete lifecycle that each job graph run goes through
enum JobGraphRunStatus {
// Initial state - graph will be analyzed before executing
GRAPH_STARTED = 0;
// Analysis completed
GRAPH_ANALYZED = 1;
// Graph is in this state until deps are satisfied for at least 1 job
AWAITING_DEPS = 2;
// Graph is executing at least 1 job (if this drops to 0, state goes back to AWAITING_DEPS)
GRAPH_RUNNING = 3;
// Graph execution finished successfully
GRAPH_SUCCESS = 4;
// Graph execution failed
GRAPH_FAILED = 5;
}
message GraphEvent {
// IDs
// Unique ID for this job graph run
string job_graph_run_id = 1;
// Unique ID for this event
string job_graph_run_event_id = 2;
// Sequence number for this event, such that the ordering within job_graph_run_id is correct
int64 sequence = 3;
// Metadata
// Current status of the job graph run as of this event
JobGraphRunStatus status = 10;
GraphLabel label = 11;
// Sum type of below events
GraphAnalyzeEvent analysis = 20;
}
// The sequence of events that completely describes progress of the job graph build
message JobGraphRunEvent {
string job_graph_run_event_id = 1;
google.protobuf.Timestamp timestamp = 2;
// Sum type for potential events
JobEvent job_event = 10;
GraphEvent graph_event = 11;
}
message JobGraphRun {
string job_graph_run_id = 1;
JobGraph graph = 2;
repeated JobGraphRunEvent events = 3;
}
message JobGraphRunQuery {
// TODO
}
message ListJobGraphRunsRequest { JobGraphRunQuery query = 1; }
message ListJobGraphRunsResponse { repeated JobGraphRun runs = 1; }
message PartitionManifestsQuery {
// TODO
}
message ListPartitionManifestsRequest { PartitionManifestsQuery query = 1; }
message ListPartitionManifestsResponse { repeated PartitionManifest manifests = 1; }
service Catalog {
// JTBDs
// -
// Enables lookup of job graph runs (current or past)
rpc ListJobGraphRuns(ListJobGraphRunsRequest) returns (ListJobGraphRunsResponse);
// Enables lookup of partition manifests produced as part of prior job runs
rpc ListPartitions(ListPartitionManifestsRequest) returns (ListPartitionManifestsResponse);
}

10
graph/README.md Normal file
View file

@ -0,0 +1,10 @@
# DataBuild Graph
## Entrypoints
- `graph.build` - Build the requested partitions.
- `graph.analyze` - Calculate the `JobGraph` that would produce the requested partitions.
- `graph.mermaid` - Calculate a [mermaid](https://mermaid.js.org/syntax/flowchart.html) diagram describing the `JobGraph`.
- `graph.serve` - Run the databuild server for this graph.
- `graph.image` / `graph.load` - Build a deployable graph artifact and wrap it in a container. `load` registers the container locally.

4
job/README.md Normal file
View file

@ -0,0 +1,4 @@
# DataBuild Jobs
Contains wrappers and tools for implementing DataBuild jobs.