Compare commits
8 commits
bel-refact
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| ad2cc7498b | |||
| 5c9c2a05cc | |||
| 8fd1c9b046 | |||
| dc622dd0ac | |||
| b3298e7213 | |||
| f92cfeb9b5 | |||
| 07d2a9faec | |||
| 952366ab66 |
23 changed files with 772 additions and 8 deletions
|
|
@ -17,7 +17,9 @@ Graphs and jobs are defined in [bazel](https://bazel.build), allowing graphs (an
|
|||
- **Jobs** - Their `exec` entrypoint builds partitions from partitions, and their `config` entrypoint specifies what partitions are required to produce the requested partition(s), along with the specific config to run `exec` with to build said partitions.
|
||||
- **Graphs** - Composes jobs together to achieve multi-job orchestration, using a `lookup` mechanism to resolve a requested partition to the job that can build it. Together with its constituent jobs, Graphs can fully plan the build of any set of partitions. Most interactions with a DataBuild app happen with a graph.
|
||||
- **Build Event Log** - Encodes the state of the system, recording build requests, job activity, partition production, etc to enable running databuild as a deployed application.
|
||||
- **Bazel Targets** - Bazel is a fast, extensible, and hermetic build system. DataBuild uses bazel targets to describe graphs and jobs, making graphs themselves deployable application. Implementing a DataBuild app is the process of integrating your data build jobs in `databuild_job` bazel targets, and connecting them with a `databuild_graph` target.
|
||||
- **Wants** - Partition wants can be registered with DataBuild, causing it to build the wanted partitions as soon as its graph-external dependencies are met.
|
||||
- **Taints** - Taints mark a partition as invalid, indicating that readers should not use it, and that it should be rebuilt when requested or depended upon. If there is a still-active want for the tainted partition, it will be rebuilt immediately.
|
||||
- **Bazel Targets** - Bazel is a fast, extensible, and hermetic build system. DataBuild uses bazel targets to describe graphs and jobs, making graphs themselves deployable application. Implementing a DataBuild app is the process of integrating your data build jobs in `databuild_job` bazel targets, and connecting them with a `databuild_graph` target.
|
||||
- [**Graph Specification Strategies**](design/graph-specification.md) (coming soon) Application libraries in Python/Rust/Scala that use language features to enable ergonomic and succinct specification of jobs and graphs.
|
||||
|
||||
### Partition / Job Assumptions and Best Practices
|
||||
|
|
@ -65,7 +67,8 @@ You can also use cron-based triggers, which return partition refs that they want
|
|||
# Key Insights
|
||||
|
||||
- Orchestration logic changes all the time - better to not write it at all.
|
||||
- Orchestration decisions and application logic is innately coupled
|
||||
- Orchestration decisions and application logic is innately coupled.
|
||||
- "systemd for data platforms"
|
||||
|
||||
## Assumptions
|
||||
|
||||
|
|
|
|||
22
README.md
22
README.md
|
|
@ -1,5 +1,25 @@
|
|||
|
||||
# DataBuild
|
||||
```
|
||||
██████╗ ████╗ ███████████╗ ████╗
|
||||
██╔═══██╗ ██╔██║ ╚═══██╔════╝ ██╔██║
|
||||
██╔╝ ██║ ██╔╝██║ ██╔╝ ██╔╝██║
|
||||
██╔╝ ██║ ██╔╝ ██║ ██╔╝ ██╔╝ ██║
|
||||
██╔╝ ██╔╝ ██╔╝ ██║ ██╔╝ ██╔╝ ██║
|
||||
██╔╝ ██╔═╝ █████████║ ██╔╝ █████████║
|
||||
████████╔═╝ ██╔═════██║ ██╔╝ ██╔═════██║
|
||||
╚═══════╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝
|
||||
|
||||
██████╗ ██╗ ██╗ ██╗ ██╗ █████╗
|
||||
██╔═══██╗ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔══██╗
|
||||
██╔╝ ██║ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██║
|
||||
█████████╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██║
|
||||
██╔═══██╔═╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝
|
||||
██╔╝ ██║ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔═╝
|
||||
█████████╔╝ ██████╔═╝ ██╔╝ ████████╗ ███████╔═╝
|
||||
╚════════╝ ╚═════╝ ╚═╝ ╚═══════╝ ╚══════╝
|
||||
|
||||
- -- S Y S T E M O N L I N E -- -
|
||||
```
|
||||
|
||||
DataBuild is a trivially-deployable, partition-oriented, declarative data build system.
|
||||
|
||||
|
|
|
|||
19
databuild/ascii_logo.txt
Normal file
19
databuild/ascii_logo.txt
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
██████╗ ████╗ ███████████╗ ████╗
|
||||
██╔═══██╗ ██╔██║ ╚═══██╔════╝ ██╔██║
|
||||
██╔╝ ██║ ██╔╝██║ ██╔╝ ██╔╝██║
|
||||
██╔╝ ██║ ██╔╝ ██║ ██╔╝ ██╔╝ ██║
|
||||
██╔╝ ██╔╝ ██╔╝ ██║ ██╔╝ ██╔╝ ██║
|
||||
██╔╝ ██╔═╝ █████████║ ██╔╝ █████████║
|
||||
████████╔═╝ ██╔═════██║ ██╔╝ ██╔═════██║
|
||||
╚═══════╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝
|
||||
|
||||
██████╗ ██╗ ██╗ ██╗ ██╗ █████╗
|
||||
██╔═══██╗ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔══██╗
|
||||
██╔╝ ██║ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██║
|
||||
█████████╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██║
|
||||
██╔═══██╔═╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝
|
||||
██╔╝ ██║ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔═╝
|
||||
█████████╔╝ ██████╔═╝ ██╔╝ ████████╗ ███████╔═╝
|
||||
╚════════╝ ╚═════╝ ╚═╝ ╚═══════╝ ╚══════╝
|
||||
|
||||
- -- S Y S T E M O N L I N E -- -
|
||||
|
|
@ -462,6 +462,8 @@ export const BuildStatus: TypedComponent<BuildStatusAttrs> = {
|
|||
...(build.completed_at ? [{stage: 'Build Completed', time: build.completed_at, icon: '✅'}] : []),
|
||||
];
|
||||
|
||||
let startedAt = build.started_at || build.requested_at;
|
||||
|
||||
return m('div.container.mx-auto.p-4', [
|
||||
// Build Header
|
||||
m('.build-header.mb-6', [
|
||||
|
|
@ -485,8 +487,8 @@ export const BuildStatus: TypedComponent<BuildStatusAttrs> = {
|
|||
]),
|
||||
m('.stat.bg-base-100.shadow.rounded-lg.p-4', [
|
||||
m('.stat-title', 'Duration'),
|
||||
m('.stat-value.text-2xl', (build.completed_at - build.started_at) ? formatDuration((build.completed_at - build.started_at)) : '—'),
|
||||
m('.stat-desc', build.started_at ? formatDateTime(build.started_at) : 'Not started')
|
||||
m('.stat-value.text-2xl', (build.completed_at - startedAt) ? formatDuration((build.completed_at - startedAt)) : '—'),
|
||||
m('.stat-desc', startedAt ? formatDateTime(startedAt) : 'Not started')
|
||||
])
|
||||
])
|
||||
]),
|
||||
|
|
|
|||
|
|
@ -462,6 +462,7 @@ export function formatDateTime(epochNanos: number): string {
|
|||
|
||||
export function formatDuration(durationNanos?: number | null): string {
|
||||
let durationMs = durationNanos ? durationNanos / 1000000 : null;
|
||||
console.warn('Formatting duration:', durationMs);
|
||||
if (!durationMs || durationMs <= 0) {
|
||||
return '—';
|
||||
}
|
||||
|
|
|
|||
|
|
@ -120,7 +120,7 @@ class DataBuildGraph:
|
|||
import os
|
||||
|
||||
# Get job classes from the lookup table
|
||||
job_classes = list(set(self.lookup.values()))
|
||||
job_classes = sorted(set(self.lookup.values()), key=lambda cls: cls.__name__)
|
||||
|
||||
# Format deps for BUILD.bazel
|
||||
if deps:
|
||||
|
|
@ -172,6 +172,15 @@ databuild_graph(
|
|||
lookup = ":{name}_job_lookup",
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
|
||||
# Create tar archive of generated files for testing
|
||||
genrule(
|
||||
name = "existing_generated",
|
||||
srcs = glob(["*.py", "BUILD.bazel"]),
|
||||
outs = ["existing_generated.tar"],
|
||||
cmd = "mkdir -p temp && cp $(SRCS) temp/ && find temp -exec touch -t 197001010000 {{}} + && tar -cf $@ -C temp .",
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
'''
|
||||
|
||||
with open(os.path.join(output_dir, "BUILD.bazel"), "w") as f:
|
||||
|
|
|
|||
|
|
@ -1,9 +1,15 @@
|
|||
py_library(
|
||||
name = "job_src",
|
||||
srcs = glob(["**/*.py"]),
|
||||
srcs = glob(["**/*.py"], exclude=["e2e_test_common.py"]),
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//databuild:py_proto",
|
||||
"//databuild/dsl/python:dsl",
|
||||
],
|
||||
)
|
||||
|
||||
py_library(
|
||||
name = "e2e_test_common",
|
||||
srcs = ["e2e_test_common.py"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
|
|
|
|||
|
|
@ -65,6 +65,14 @@ py_test(
|
|||
],
|
||||
)
|
||||
|
||||
py_test(
|
||||
name = "test_e2e",
|
||||
srcs = ["test_e2e.py"],
|
||||
data = [":bazel_graph.build"],
|
||||
main = "test_e2e.py",
|
||||
deps = ["//databuild/test/app:e2e_test_common"],
|
||||
)
|
||||
|
||||
# Bazel-defined
|
||||
## Graph
|
||||
databuild_graph(
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ from collections import defaultdict
|
|||
import sys
|
||||
import json
|
||||
|
||||
LABEL_BASE = "//databuild/test/app"
|
||||
LABEL_BASE = "//databuild/test/app/bazel"
|
||||
|
||||
|
||||
def lookup(raw_ref: str):
|
||||
|
|
|
|||
37
databuild/test/app/bazel/test_e2e.py
Normal file
37
databuild/test/app/bazel/test_e2e.py
Normal file
|
|
@ -0,0 +1,37 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
End-to-end test for the bazel-defined test app.
|
||||
|
||||
Tests the full pipeline: build execution -> output verification -> JSON validation.
|
||||
"""
|
||||
|
||||
import os
|
||||
from databuild.test.app.e2e_test_common import DataBuildE2ETestBase
|
||||
|
||||
|
||||
class BazelE2ETest(DataBuildE2ETestBase):
|
||||
"""End-to-end test for the bazel-defined test app."""
|
||||
|
||||
def test_end_to_end_execution(self):
|
||||
"""Test full end-to-end execution of the bazel graph."""
|
||||
# Build possible paths for the bazel graph build binary
|
||||
possible_paths = self.get_standard_runfiles_paths(
|
||||
'databuild/test/app/bazel/bazel_graph.build'
|
||||
)
|
||||
|
||||
# Add fallback paths for local testing
|
||||
possible_paths.extend([
|
||||
'bazel-bin/databuild/test/app/bazel/bazel_graph.build',
|
||||
'./bazel_graph.build'
|
||||
])
|
||||
|
||||
# Find the graph build binary
|
||||
graph_build_path = self.find_graph_build_binary(possible_paths)
|
||||
|
||||
# Execute and verify the graph build
|
||||
self.execute_and_verify_graph_build(graph_build_path)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import unittest
|
||||
unittest.main()
|
||||
|
|
@ -22,3 +22,33 @@ databuild_dsl_generator(
|
|||
deps = [":dsl_src"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
|
||||
# Generate fresh DSL output for comparison testing
|
||||
genrule(
|
||||
name = "generate_fresh_dsl",
|
||||
outs = ["generated_fresh.tar"],
|
||||
cmd_bash = """
|
||||
# Create temporary directory for generation
|
||||
mkdir -p temp_workspace/databuild/test/app/dsl
|
||||
|
||||
# Set environment to generate to temp directory
|
||||
export BUILD_WORKSPACE_DIRECTORY="temp_workspace"
|
||||
|
||||
# Run the generator
|
||||
$(location :graph.generate)
|
||||
|
||||
# Create tar archive of generated files
|
||||
if [ -d "temp_workspace/databuild/test/app/dsl/generated" ]; then
|
||||
find temp_workspace/databuild/test/app/dsl/generated -exec touch -t 197001010000 {} +
|
||||
tar -cf $@ -C temp_workspace/databuild/test/app/dsl/generated .
|
||||
else
|
||||
# Create empty tar if no files generated
|
||||
tar -cf $@ -T /dev/null
|
||||
fi
|
||||
|
||||
# Clean up
|
||||
rm -rf temp_workspace
|
||||
""",
|
||||
tools = [":graph.generate"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
|
|
|
|||
9
databuild/test/app/dsl/claude-generated-dsl-test.md
Normal file
9
databuild/test/app/dsl/claude-generated-dsl-test.md
Normal file
|
|
@ -0,0 +1,9 @@
|
|||
|
||||
We can't write a direct `bazel test` for the DSL generated graph, because:
|
||||
|
||||
1. Bazel doesn't allow you to `bazel run graph.generate` to generate a BUILD.bazel that will be used in the same build.
|
||||
2. We don't want to leak test generation into the graph generation code (since tests here are app specific)
|
||||
|
||||
Instead, we need to use a two phase process, where we rely on the graph to already be generated here, which will contain a test, such that `bazel test //...` will give us recall over generated source as well. This implies that this generated source is going to be checked in to git (gasp, I know), and we need a mechanism to ensure it stays up to date. To achieve this, we'll create a test that asserts that the contents of the `generated` dir is the exact same as the output of a new run of `graph.generate`.
|
||||
|
||||
Our task is to implement this test that asserts equality between the two, e.g. the target could depend on `graph.generate`, and in the test run it and md5 the results, comparing it to the md5 of the existing generated dir.
|
||||
71
databuild/test/app/dsl/generated/BUILD.bazel
Normal file
71
databuild/test/app/dsl/generated/BUILD.bazel
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
load("@databuild//databuild:rules.bzl", "databuild_job", "databuild_graph")
|
||||
|
||||
# Generated by DataBuild DSL - do not edit manually
|
||||
# This file is generated in a subdirectory to avoid overwriting the original BUILD.bazel
|
||||
|
||||
py_binary(
|
||||
name = "aggregate_color_votes_binary",
|
||||
srcs = ["aggregate_color_votes.py"],
|
||||
main = "aggregate_color_votes.py",
|
||||
deps = ["@@//databuild/test/app/dsl:dsl_src"],
|
||||
)
|
||||
|
||||
databuild_job(
|
||||
name = "aggregate_color_votes",
|
||||
binary = ":aggregate_color_votes_binary",
|
||||
)
|
||||
py_binary(
|
||||
name = "color_vote_report_calc_binary",
|
||||
srcs = ["color_vote_report_calc.py"],
|
||||
main = "color_vote_report_calc.py",
|
||||
deps = ["@@//databuild/test/app/dsl:dsl_src"],
|
||||
)
|
||||
|
||||
databuild_job(
|
||||
name = "color_vote_report_calc",
|
||||
binary = ":color_vote_report_calc_binary",
|
||||
)
|
||||
py_binary(
|
||||
name = "ingest_color_votes_binary",
|
||||
srcs = ["ingest_color_votes.py"],
|
||||
main = "ingest_color_votes.py",
|
||||
deps = ["@@//databuild/test/app/dsl:dsl_src"],
|
||||
)
|
||||
|
||||
databuild_job(
|
||||
name = "ingest_color_votes",
|
||||
binary = ":ingest_color_votes_binary",
|
||||
)
|
||||
py_binary(
|
||||
name = "trailing_color_votes_binary",
|
||||
srcs = ["trailing_color_votes.py"],
|
||||
main = "trailing_color_votes.py",
|
||||
deps = ["@@//databuild/test/app/dsl:dsl_src"],
|
||||
)
|
||||
|
||||
databuild_job(
|
||||
name = "trailing_color_votes",
|
||||
binary = ":trailing_color_votes_binary",
|
||||
)
|
||||
|
||||
py_binary(
|
||||
name = "dsl_job_lookup",
|
||||
srcs = ["dsl_job_lookup.py"],
|
||||
deps = ["@@//databuild/test/app/dsl:dsl_src"],
|
||||
)
|
||||
|
||||
databuild_graph(
|
||||
name = "dsl_graph",
|
||||
jobs = ["aggregate_color_votes", "color_vote_report_calc", "ingest_color_votes", "trailing_color_votes"],
|
||||
lookup = ":dsl_job_lookup",
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
|
||||
# Create tar archive of generated files for testing
|
||||
genrule(
|
||||
name = "existing_generated",
|
||||
srcs = glob(["*.py", "BUILD.bazel"]),
|
||||
outs = ["existing_generated.tar"],
|
||||
cmd = "mkdir -p temp && cp $(SRCS) temp/ && find temp -exec touch -t 197001010000 {} + && tar -cf $@ -C temp .",
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
58
databuild/test/app/dsl/generated/aggregate_color_votes.py
Executable file
58
databuild/test/app/dsl/generated/aggregate_color_votes.py
Executable file
|
|
@ -0,0 +1,58 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Generated job script for AggregateColorVotes.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import json
|
||||
from databuild.test.app.dsl.graph import AggregateColorVotes
|
||||
from databuild.proto import PartitionRef, JobConfigureResponse, to_dict
|
||||
|
||||
|
||||
def parse_outputs_from_args(args: list[str]) -> list:
|
||||
"""Parse partition output references from command line arguments."""
|
||||
outputs = []
|
||||
for arg in args:
|
||||
# Find which output type can deserialize this partition reference
|
||||
for output_type in AggregateColorVotes.output_types:
|
||||
try:
|
||||
partition = output_type.deserialize(arg)
|
||||
outputs.append(partition)
|
||||
break
|
||||
except ValueError:
|
||||
continue
|
||||
else:
|
||||
raise ValueError(f"No output type in AggregateColorVotes can deserialize partition ref: {arg}")
|
||||
|
||||
return outputs
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) < 2:
|
||||
raise Exception(f"Invalid command usage")
|
||||
|
||||
command = sys.argv[1]
|
||||
job_instance = AggregateColorVotes()
|
||||
|
||||
if command == "config":
|
||||
# Parse output partition references as PartitionRef objects (for Rust wrapper)
|
||||
output_refs = [PartitionRef(str=raw_ref) for raw_ref in sys.argv[2:]]
|
||||
|
||||
# Also parse them into DSL partition objects (for DSL job.config())
|
||||
outputs = parse_outputs_from_args(sys.argv[2:])
|
||||
|
||||
# Call job's config method - returns list[JobConfig]
|
||||
configs = job_instance.config(outputs)
|
||||
|
||||
# Wrap in JobConfigureResponse and serialize using to_dict()
|
||||
response = JobConfigureResponse(configs=configs)
|
||||
print(json.dumps(to_dict(response)))
|
||||
|
||||
elif command == "exec":
|
||||
# The exec method expects a JobConfig but the Rust wrapper passes args
|
||||
# For now, let the DSL job handle the args directly
|
||||
# TODO: This needs to be refined based on actual Rust wrapper interface
|
||||
job_instance.exec(*sys.argv[2:])
|
||||
|
||||
else:
|
||||
raise Exception(f"Invalid command `{sys.argv[1]}`")
|
||||
58
databuild/test/app/dsl/generated/color_vote_report_calc.py
Executable file
58
databuild/test/app/dsl/generated/color_vote_report_calc.py
Executable file
|
|
@ -0,0 +1,58 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Generated job script for ColorVoteReportCalc.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import json
|
||||
from databuild.test.app.dsl.graph import ColorVoteReportCalc
|
||||
from databuild.proto import PartitionRef, JobConfigureResponse, to_dict
|
||||
|
||||
|
||||
def parse_outputs_from_args(args: list[str]) -> list:
|
||||
"""Parse partition output references from command line arguments."""
|
||||
outputs = []
|
||||
for arg in args:
|
||||
# Find which output type can deserialize this partition reference
|
||||
for output_type in ColorVoteReportCalc.output_types:
|
||||
try:
|
||||
partition = output_type.deserialize(arg)
|
||||
outputs.append(partition)
|
||||
break
|
||||
except ValueError:
|
||||
continue
|
||||
else:
|
||||
raise ValueError(f"No output type in ColorVoteReportCalc can deserialize partition ref: {arg}")
|
||||
|
||||
return outputs
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) < 2:
|
||||
raise Exception(f"Invalid command usage")
|
||||
|
||||
command = sys.argv[1]
|
||||
job_instance = ColorVoteReportCalc()
|
||||
|
||||
if command == "config":
|
||||
# Parse output partition references as PartitionRef objects (for Rust wrapper)
|
||||
output_refs = [PartitionRef(str=raw_ref) for raw_ref in sys.argv[2:]]
|
||||
|
||||
# Also parse them into DSL partition objects (for DSL job.config())
|
||||
outputs = parse_outputs_from_args(sys.argv[2:])
|
||||
|
||||
# Call job's config method - returns list[JobConfig]
|
||||
configs = job_instance.config(outputs)
|
||||
|
||||
# Wrap in JobConfigureResponse and serialize using to_dict()
|
||||
response = JobConfigureResponse(configs=configs)
|
||||
print(json.dumps(to_dict(response)))
|
||||
|
||||
elif command == "exec":
|
||||
# The exec method expects a JobConfig but the Rust wrapper passes args
|
||||
# For now, let the DSL job handle the args directly
|
||||
# TODO: This needs to be refined based on actual Rust wrapper interface
|
||||
job_instance.exec(*sys.argv[2:])
|
||||
|
||||
else:
|
||||
raise Exception(f"Invalid command `{sys.argv[1]}`")
|
||||
53
databuild/test/app/dsl/generated/dsl_job_lookup.py
Executable file
53
databuild/test/app/dsl/generated/dsl_job_lookup.py
Executable file
|
|
@ -0,0 +1,53 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Generated job lookup for DataBuild DSL graph.
|
||||
Maps partition patterns to job targets.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import re
|
||||
import json
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
# Mapping from partition patterns to job targets
|
||||
JOB_MAPPINGS = {
|
||||
r"daily_color_votes/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": "//databuild/test/app/dsl/generated:ingest_color_votes",
|
||||
r"color_votes_1m/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": "//databuild/test/app/dsl/generated:trailing_color_votes",
|
||||
r"color_votes_1w/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": "//databuild/test/app/dsl/generated:trailing_color_votes",
|
||||
r"daily_votes/(?P<data_date>\d{4}-\d{2}-\d{2})": "//databuild/test/app/dsl/generated:aggregate_color_votes",
|
||||
r"votes_1w/(?P<data_date>\d{4}-\d{2}-\d{2})": "//databuild/test/app/dsl/generated:aggregate_color_votes",
|
||||
r"votes_1m/(?P<data_date>\d{4}-\d{2}-\d{2})": "//databuild/test/app/dsl/generated:aggregate_color_votes",
|
||||
r"color_vote_report/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": "//databuild/test/app/dsl/generated:color_vote_report_calc",
|
||||
}
|
||||
|
||||
|
||||
def lookup_job_for_partition(partition_ref: str) -> str:
|
||||
"""Look up which job can build the given partition reference."""
|
||||
for pattern, job_target in JOB_MAPPINGS.items():
|
||||
if re.match(pattern, partition_ref):
|
||||
return job_target
|
||||
|
||||
raise ValueError(f"No job found for partition: {partition_ref}")
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: job_lookup.py <partition_ref> [partition_ref...]", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
results = defaultdict(list)
|
||||
try:
|
||||
for partition_ref in sys.argv[1:]:
|
||||
job_target = lookup_job_for_partition(partition_ref)
|
||||
results[job_target].append(partition_ref)
|
||||
|
||||
# Output the results as JSON (matching existing lookup format)
|
||||
print(json.dumps(dict(results)))
|
||||
except ValueError as e:
|
||||
print(f"ERROR: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
58
databuild/test/app/dsl/generated/ingest_color_votes.py
Executable file
58
databuild/test/app/dsl/generated/ingest_color_votes.py
Executable file
|
|
@ -0,0 +1,58 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Generated job script for IngestColorVotes.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import json
|
||||
from databuild.test.app.dsl.graph import IngestColorVotes
|
||||
from databuild.proto import PartitionRef, JobConfigureResponse, to_dict
|
||||
|
||||
|
||||
def parse_outputs_from_args(args: list[str]) -> list:
|
||||
"""Parse partition output references from command line arguments."""
|
||||
outputs = []
|
||||
for arg in args:
|
||||
# Find which output type can deserialize this partition reference
|
||||
for output_type in IngestColorVotes.output_types:
|
||||
try:
|
||||
partition = output_type.deserialize(arg)
|
||||
outputs.append(partition)
|
||||
break
|
||||
except ValueError:
|
||||
continue
|
||||
else:
|
||||
raise ValueError(f"No output type in IngestColorVotes can deserialize partition ref: {arg}")
|
||||
|
||||
return outputs
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) < 2:
|
||||
raise Exception(f"Invalid command usage")
|
||||
|
||||
command = sys.argv[1]
|
||||
job_instance = IngestColorVotes()
|
||||
|
||||
if command == "config":
|
||||
# Parse output partition references as PartitionRef objects (for Rust wrapper)
|
||||
output_refs = [PartitionRef(str=raw_ref) for raw_ref in sys.argv[2:]]
|
||||
|
||||
# Also parse them into DSL partition objects (for DSL job.config())
|
||||
outputs = parse_outputs_from_args(sys.argv[2:])
|
||||
|
||||
# Call job's config method - returns list[JobConfig]
|
||||
configs = job_instance.config(outputs)
|
||||
|
||||
# Wrap in JobConfigureResponse and serialize using to_dict()
|
||||
response = JobConfigureResponse(configs=configs)
|
||||
print(json.dumps(to_dict(response)))
|
||||
|
||||
elif command == "exec":
|
||||
# The exec method expects a JobConfig but the Rust wrapper passes args
|
||||
# For now, let the DSL job handle the args directly
|
||||
# TODO: This needs to be refined based on actual Rust wrapper interface
|
||||
job_instance.exec(*sys.argv[2:])
|
||||
|
||||
else:
|
||||
raise Exception(f"Invalid command `{sys.argv[1]}`")
|
||||
58
databuild/test/app/dsl/generated/trailing_color_votes.py
Executable file
58
databuild/test/app/dsl/generated/trailing_color_votes.py
Executable file
|
|
@ -0,0 +1,58 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Generated job script for TrailingColorVotes.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import json
|
||||
from databuild.test.app.dsl.graph import TrailingColorVotes
|
||||
from databuild.proto import PartitionRef, JobConfigureResponse, to_dict
|
||||
|
||||
|
||||
def parse_outputs_from_args(args: list[str]) -> list:
|
||||
"""Parse partition output references from command line arguments."""
|
||||
outputs = []
|
||||
for arg in args:
|
||||
# Find which output type can deserialize this partition reference
|
||||
for output_type in TrailingColorVotes.output_types:
|
||||
try:
|
||||
partition = output_type.deserialize(arg)
|
||||
outputs.append(partition)
|
||||
break
|
||||
except ValueError:
|
||||
continue
|
||||
else:
|
||||
raise ValueError(f"No output type in TrailingColorVotes can deserialize partition ref: {arg}")
|
||||
|
||||
return outputs
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) < 2:
|
||||
raise Exception(f"Invalid command usage")
|
||||
|
||||
command = sys.argv[1]
|
||||
job_instance = TrailingColorVotes()
|
||||
|
||||
if command == "config":
|
||||
# Parse output partition references as PartitionRef objects (for Rust wrapper)
|
||||
output_refs = [PartitionRef(str=raw_ref) for raw_ref in sys.argv[2:]]
|
||||
|
||||
# Also parse them into DSL partition objects (for DSL job.config())
|
||||
outputs = parse_outputs_from_args(sys.argv[2:])
|
||||
|
||||
# Call job's config method - returns list[JobConfig]
|
||||
configs = job_instance.config(outputs)
|
||||
|
||||
# Wrap in JobConfigureResponse and serialize using to_dict()
|
||||
response = JobConfigureResponse(configs=configs)
|
||||
print(json.dumps(to_dict(response)))
|
||||
|
||||
elif command == "exec":
|
||||
# The exec method expects a JobConfig but the Rust wrapper passes args
|
||||
# For now, let the DSL job handle the args directly
|
||||
# TODO: This needs to be refined based on actual Rust wrapper interface
|
||||
job_instance.exec(*sys.argv[2:])
|
||||
|
||||
else:
|
||||
raise Exception(f"Invalid command `{sys.argv[1]}`")
|
||||
7
databuild/test/app/dsl/generated_test/BUILD.bazel
Normal file
7
databuild/test/app/dsl/generated_test/BUILD.bazel
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
py_test(
|
||||
name = "test_e2e",
|
||||
srcs = ["test_e2e.py"],
|
||||
data = ["//databuild/test/app/dsl/generated:dsl_graph.build"],
|
||||
main = "test_e2e.py",
|
||||
deps = ["//databuild/test/app:e2e_test_common"],
|
||||
)
|
||||
37
databuild/test/app/dsl/generated_test/test_e2e.py
Normal file
37
databuild/test/app/dsl/generated_test/test_e2e.py
Normal file
|
|
@ -0,0 +1,37 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
End-to-end test for the DSL-generated test app.
|
||||
|
||||
Tests the full pipeline: build execution -> output verification -> JSON validation.
|
||||
"""
|
||||
|
||||
import os
|
||||
from databuild.test.app.e2e_test_common import DataBuildE2ETestBase
|
||||
|
||||
|
||||
class DSLGeneratedE2ETest(DataBuildE2ETestBase):
|
||||
"""End-to-end test for the DSL-generated test app."""
|
||||
|
||||
def test_end_to_end_execution(self):
|
||||
"""Test full end-to-end execution of the DSL-generated graph."""
|
||||
# Build possible paths for the DSL-generated graph build binary
|
||||
possible_paths = self.get_standard_runfiles_paths(
|
||||
'databuild/test/app/dsl/generated/dsl_graph.build'
|
||||
)
|
||||
|
||||
# Add fallback paths for local testing
|
||||
possible_paths.extend([
|
||||
'bazel-bin/databuild/test/app/dsl/generated/dsl_graph.build',
|
||||
'./dsl_graph.build'
|
||||
])
|
||||
|
||||
# Find the graph build binary
|
||||
graph_build_path = self.find_graph_build_binary(possible_paths)
|
||||
|
||||
# Execute and verify the graph build
|
||||
self.execute_and_verify_graph_build(graph_build_path)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import unittest
|
||||
unittest.main()
|
||||
|
|
@ -73,3 +73,15 @@ py_test(
|
|||
"//databuild/test/app/dsl:dsl_src",
|
||||
],
|
||||
)
|
||||
|
||||
# DSL generation consistency test
|
||||
py_test(
|
||||
name = "test_dsl_generation_consistency",
|
||||
srcs = ["test_dsl_generation_consistency.py"],
|
||||
main = "test_dsl_generation_consistency.py",
|
||||
data = [
|
||||
"//databuild/test/app/dsl:generate_fresh_dsl",
|
||||
"//databuild/test/app/dsl/generated:existing_generated",
|
||||
],
|
||||
deps = [],
|
||||
)
|
||||
|
|
|
|||
105
databuild/test/app/dsl/test/test_dsl_generation_consistency.py
Normal file
105
databuild/test/app/dsl/test/test_dsl_generation_consistency.py
Normal file
|
|
@ -0,0 +1,105 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test that verifies the generated DSL code is up-to-date.
|
||||
|
||||
This test ensures that the checked-in generated directory contents match
|
||||
exactly what would be produced by a fresh run of graph.generate.
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class TestDSLGenerationConsistency(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Find the test runfiles directory to locate tar files
|
||||
runfiles_dir = os.environ.get("RUNFILES_DIR")
|
||||
if runfiles_dir:
|
||||
self.runfiles_root = Path(runfiles_dir) / "_main"
|
||||
else:
|
||||
# Fallback for development - not expected to work in this case
|
||||
self.fail("RUNFILES_DIR not set - test must be run via bazel test")
|
||||
|
||||
def _compute_tar_hash(self, tar_path: Path) -> str:
|
||||
"""Compute MD5 hash of a tar file's contents."""
|
||||
if not tar_path.exists():
|
||||
self.fail(f"Tar file not found: {tar_path}")
|
||||
|
||||
with open(tar_path, "rb") as f:
|
||||
content = f.read()
|
||||
return hashlib.md5(content).hexdigest()
|
||||
|
||||
def _extract_and_list_tar(self, tar_path: Path) -> set:
|
||||
"""Extract tar file and return set of file paths and their content hashes."""
|
||||
if not tar_path.exists():
|
||||
return set()
|
||||
|
||||
result = subprocess.run([
|
||||
"tar", "-tf", str(tar_path)
|
||||
], capture_output=True, text=True)
|
||||
|
||||
if result.returncode != 0:
|
||||
self.fail(f"Failed to list tar contents: {result.stderr}")
|
||||
|
||||
return set(result.stdout.strip().split('\n')) if result.stdout.strip() else set()
|
||||
|
||||
def test_generated_code_is_up_to_date(self):
|
||||
"""Test that the existing generated tar matches the fresh generated tar."""
|
||||
|
||||
# Find the tar files from data dependencies
|
||||
existing_tar = self.runfiles_root / "databuild/test/app/dsl/generated/existing_generated.tar"
|
||||
fresh_tar = self.runfiles_root / "databuild/test/app/dsl/generated_fresh.tar"
|
||||
|
||||
# Compute hashes of both tar files
|
||||
existing_hash = self._compute_tar_hash(existing_tar)
|
||||
fresh_hash = self._compute_tar_hash(fresh_tar)
|
||||
|
||||
# Compare hashes
|
||||
if existing_hash != fresh_hash:
|
||||
# Provide detailed diff information
|
||||
existing_files = self._extract_and_list_tar(existing_tar)
|
||||
fresh_files = self._extract_and_list_tar(fresh_tar)
|
||||
|
||||
only_in_existing = existing_files - fresh_files
|
||||
only_in_fresh = fresh_files - existing_files
|
||||
|
||||
error_msg = [
|
||||
"Generated DSL code is out of date!",
|
||||
f"Existing tar hash: {existing_hash}",
|
||||
f"Fresh tar hash: {fresh_hash}",
|
||||
"",
|
||||
"To fix this, run:",
|
||||
" bazel run //databuild/test/app/dsl:graph.generate",
|
||||
""
|
||||
]
|
||||
|
||||
if only_in_existing:
|
||||
error_msg.extend([
|
||||
"Files only in existing generated code:",
|
||||
*[f" - {f}" for f in sorted(only_in_existing)],
|
||||
""
|
||||
])
|
||||
|
||||
if only_in_fresh:
|
||||
error_msg.extend([
|
||||
"Files only in fresh generated code:",
|
||||
*[f" + {f}" for f in sorted(only_in_fresh)],
|
||||
""
|
||||
])
|
||||
|
||||
common_files = existing_files & fresh_files
|
||||
if common_files:
|
||||
error_msg.extend([
|
||||
f"Common files: {len(common_files)}",
|
||||
"This suggests files have different contents.",
|
||||
])
|
||||
|
||||
self.fail("\n".join(error_msg))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
103
databuild/test/app/e2e_test_common.py
Normal file
103
databuild/test/app/e2e_test_common.py
Normal file
|
|
@ -0,0 +1,103 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Common end-to-end test logic for DataBuild test apps.
|
||||
|
||||
Provides shared functionality for testing both bazel-defined and DSL-generated graphs.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
class DataBuildE2ETestBase(unittest.TestCase):
|
||||
"""Base class for DataBuild end-to-end tests."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test environment."""
|
||||
self.output_dir = Path("/tmp/data/color_votes_1w/2025-09-01/red")
|
||||
self.output_file = self.output_dir / "data.json"
|
||||
self.partition_ref = "color_votes_1w/2025-09-01/red"
|
||||
|
||||
# Clean up any existing test data
|
||||
if self.output_dir.exists():
|
||||
shutil.rmtree(self.output_dir)
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up test environment."""
|
||||
if self.output_dir.exists():
|
||||
shutil.rmtree(self.output_dir)
|
||||
|
||||
def find_graph_build_binary(self, possible_paths: List[str]) -> str:
|
||||
"""Find the graph.build binary from a list of possible paths."""
|
||||
graph_build_path = None
|
||||
for path in possible_paths:
|
||||
if os.path.exists(path):
|
||||
graph_build_path = path
|
||||
break
|
||||
|
||||
self.assertIsNotNone(graph_build_path,
|
||||
f"Graph build binary not found in any of: {possible_paths}")
|
||||
return graph_build_path
|
||||
|
||||
def execute_and_verify_graph_build(self, graph_build_path: str) -> None:
|
||||
"""Execute the graph build and verify the results."""
|
||||
# Record start time for file modification check
|
||||
start_time = time.time()
|
||||
|
||||
# Execute the graph build (shell script)
|
||||
result = subprocess.run(
|
||||
["bash", graph_build_path, self.partition_ref],
|
||||
capture_output=True,
|
||||
text=True
|
||||
)
|
||||
|
||||
# Verify execution succeeded
|
||||
self.assertEqual(result.returncode, 0,
|
||||
f"Graph build failed with stderr: {result.stderr}")
|
||||
|
||||
# Verify output file was created
|
||||
self.assertTrue(self.output_file.exists(),
|
||||
f"Output file {self.output_file} was not created")
|
||||
|
||||
# Verify file was created recently (within 60 seconds)
|
||||
file_mtime = os.path.getmtime(self.output_file)
|
||||
time_diff = file_mtime - start_time
|
||||
self.assertGreaterEqual(time_diff, -1, # Allow 1 second clock skew
|
||||
f"File appears to be too old: {time_diff} seconds")
|
||||
self.assertLessEqual(time_diff, 60,
|
||||
f"File creation took too long: {time_diff} seconds")
|
||||
|
||||
# Verify file contains valid JSON
|
||||
with open(self.output_file, 'r') as f:
|
||||
content = f.read()
|
||||
|
||||
try:
|
||||
data = json.loads(content)
|
||||
except json.JSONDecodeError as e:
|
||||
self.fail(f"Output file does not contain valid JSON: {e}")
|
||||
|
||||
# Basic sanity check on JSON structure
|
||||
self.assertIsInstance(data, (dict, list),
|
||||
"JSON should be an object or array")
|
||||
|
||||
def get_standard_runfiles_paths(self, relative_path: str) -> List[str]:
|
||||
"""Get standard list of possible runfiles paths for a binary."""
|
||||
runfiles_dir = os.environ.get("RUNFILES_DIR")
|
||||
test_srcdir = os.environ.get("TEST_SRCDIR")
|
||||
|
||||
possible_paths = []
|
||||
if runfiles_dir:
|
||||
possible_paths.append(os.path.join(runfiles_dir, '_main', relative_path))
|
||||
possible_paths.append(os.path.join(runfiles_dir, relative_path))
|
||||
|
||||
if test_srcdir:
|
||||
possible_paths.append(os.path.join(test_srcdir, '_main', relative_path))
|
||||
possible_paths.append(os.path.join(test_srcdir, relative_path))
|
||||
|
||||
return possible_paths
|
||||
Loading…
Reference in a new issue