Compare commits

...

8 commits

Author SHA1 Message Date
ad2cc7498b add sick ass logo
Some checks failed
/ setup (push) Has been cancelled
2025-08-20 19:39:01 -07:00
5c9c2a05cc Add systemd comment 2025-08-18 22:04:53 -07:00
8fd1c9b046 Describe wants and taints in readme
Some checks failed
/ setup (push) Has been cancelled
2025-08-18 20:54:31 -07:00
dc622dd0ac Minor timestamp fix
Some checks failed
/ setup (push) Has been cancelled
2025-08-16 16:21:43 -07:00
b3298e7213 Add test app e2e test coverage for generated graph 2025-08-16 15:53:26 -07:00
f92cfeb9b5 Add test app generated package 2025-08-16 15:37:47 -07:00
07d2a9faec Detect out of date generated source 2025-08-16 15:37:07 -07:00
952366ab66 Add e2e test for test app bazel impl 2025-08-16 09:39:56 -07:00
23 changed files with 772 additions and 8 deletions

View file

@ -17,7 +17,9 @@ Graphs and jobs are defined in [bazel](https://bazel.build), allowing graphs (an
- **Jobs** - Their `exec` entrypoint builds partitions from partitions, and their `config` entrypoint specifies what partitions are required to produce the requested partition(s), along with the specific config to run `exec` with to build said partitions.
- **Graphs** - Composes jobs together to achieve multi-job orchestration, using a `lookup` mechanism to resolve a requested partition to the job that can build it. Together with its constituent jobs, Graphs can fully plan the build of any set of partitions. Most interactions with a DataBuild app happen with a graph.
- **Build Event Log** - Encodes the state of the system, recording build requests, job activity, partition production, etc to enable running databuild as a deployed application.
- **Bazel Targets** - Bazel is a fast, extensible, and hermetic build system. DataBuild uses bazel targets to describe graphs and jobs, making graphs themselves deployable application. Implementing a DataBuild app is the process of integrating your data build jobs in `databuild_job` bazel targets, and connecting them with a `databuild_graph` target.
- **Wants** - Partition wants can be registered with DataBuild, causing it to build the wanted partitions as soon as its graph-external dependencies are met.
- **Taints** - Taints mark a partition as invalid, indicating that readers should not use it, and that it should be rebuilt when requested or depended upon. If there is a still-active want for the tainted partition, it will be rebuilt immediately.
- **Bazel Targets** - Bazel is a fast, extensible, and hermetic build system. DataBuild uses bazel targets to describe graphs and jobs, making graphs themselves deployable application. Implementing a DataBuild app is the process of integrating your data build jobs in `databuild_job` bazel targets, and connecting them with a `databuild_graph` target.
- [**Graph Specification Strategies**](design/graph-specification.md) (coming soon) Application libraries in Python/Rust/Scala that use language features to enable ergonomic and succinct specification of jobs and graphs.
### Partition / Job Assumptions and Best Practices
@ -65,7 +67,8 @@ You can also use cron-based triggers, which return partition refs that they want
# Key Insights
- Orchestration logic changes all the time - better to not write it at all.
- Orchestration decisions and application logic is innately coupled
- Orchestration decisions and application logic is innately coupled.
- "systemd for data platforms"
## Assumptions

View file

@ -1,5 +1,25 @@
# DataBuild
```
██████╗ ████╗ ███████████╗ ████╗
██╔═══██╗ ██╔██║ ╚═══██╔════╝ ██╔██║
██╔╝ ██║ ██╔╝██║ ██╔╝ ██╔╝██║
██╔╝ ██║ ██╔╝ ██║ ██╔╝ ██╔╝ ██║
██╔╝ ██╔╝ ██╔╝ ██║ ██╔╝ ██╔╝ ██║
██╔╝ ██╔═╝ █████████║ ██╔╝ █████████║
████████╔═╝ ██╔═════██║ ██╔╝ ██╔═════██║
╚═══════╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝
██████╗ ██╗ ██╗ ██╗ ██╗ █████╗
██╔═══██╗ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔══██╗
██╔╝ ██║ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██║
█████████╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██║
██╔═══██╔═╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝
██╔╝ ██║ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔═╝
█████████╔╝ ██████╔═╝ ██╔╝ ████████╗ ███████╔═╝
╚════════╝ ╚═════╝ ╚═╝ ╚═══════╝ ╚══════╝
- -- S Y S T E M O N L I N E -- -
```
DataBuild is a trivially-deployable, partition-oriented, declarative data build system.

19
databuild/ascii_logo.txt Normal file
View file

@ -0,0 +1,19 @@
██████╗ ████╗ ███████████╗ ████╗
██╔═══██╗ ██╔██║ ╚═══██╔════╝ ██╔██║
██╔╝ ██║ ██╔╝██║ ██╔╝ ██╔╝██║
██╔╝ ██║ ██╔╝ ██║ ██╔╝ ██╔╝ ██║
██╔╝ ██╔╝ ██╔╝ ██║ ██╔╝ ██╔╝ ██║
██╔╝ ██╔═╝ █████████║ ██╔╝ █████████║
████████╔═╝ ██╔═════██║ ██╔╝ ██╔═════██║
╚═══════╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝
██████╗ ██╗ ██╗ ██╗ ██╗ █████╗
██╔═══██╗ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔══██╗
██╔╝ ██║ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██║
█████████╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██║
██╔═══██╔═╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝
██╔╝ ██║ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔═╝
█████████╔╝ ██████╔═╝ ██╔╝ ████████╗ ███████╔═╝
╚════════╝ ╚═════╝ ╚═╝ ╚═══════╝ ╚══════╝
- -- S Y S T E M O N L I N E -- -

View file

@ -462,6 +462,8 @@ export const BuildStatus: TypedComponent<BuildStatusAttrs> = {
...(build.completed_at ? [{stage: 'Build Completed', time: build.completed_at, icon: '✅'}] : []),
];
let startedAt = build.started_at || build.requested_at;
return m('div.container.mx-auto.p-4', [
// Build Header
m('.build-header.mb-6', [
@ -485,8 +487,8 @@ export const BuildStatus: TypedComponent<BuildStatusAttrs> = {
]),
m('.stat.bg-base-100.shadow.rounded-lg.p-4', [
m('.stat-title', 'Duration'),
m('.stat-value.text-2xl', (build.completed_at - build.started_at) ? formatDuration((build.completed_at - build.started_at)) : '—'),
m('.stat-desc', build.started_at ? formatDateTime(build.started_at) : 'Not started')
m('.stat-value.text-2xl', (build.completed_at - startedAt) ? formatDuration((build.completed_at - startedAt)) : '—'),
m('.stat-desc', startedAt ? formatDateTime(startedAt) : 'Not started')
])
])
]),

View file

@ -462,6 +462,7 @@ export function formatDateTime(epochNanos: number): string {
export function formatDuration(durationNanos?: number | null): string {
let durationMs = durationNanos ? durationNanos / 1000000 : null;
console.warn('Formatting duration:', durationMs);
if (!durationMs || durationMs <= 0) {
return '—';
}

View file

@ -120,7 +120,7 @@ class DataBuildGraph:
import os
# Get job classes from the lookup table
job_classes = list(set(self.lookup.values()))
job_classes = sorted(set(self.lookup.values()), key=lambda cls: cls.__name__)
# Format deps for BUILD.bazel
if deps:
@ -172,6 +172,15 @@ databuild_graph(
lookup = ":{name}_job_lookup",
visibility = ["//visibility:public"],
)
# Create tar archive of generated files for testing
genrule(
name = "existing_generated",
srcs = glob(["*.py", "BUILD.bazel"]),
outs = ["existing_generated.tar"],
cmd = "mkdir -p temp && cp $(SRCS) temp/ && find temp -exec touch -t 197001010000 {{}} + && tar -cf $@ -C temp .",
visibility = ["//visibility:public"],
)
'''
with open(os.path.join(output_dir, "BUILD.bazel"), "w") as f:

View file

@ -1,9 +1,15 @@
py_library(
name = "job_src",
srcs = glob(["**/*.py"]),
srcs = glob(["**/*.py"], exclude=["e2e_test_common.py"]),
visibility = ["//visibility:public"],
deps = [
"//databuild:py_proto",
"//databuild/dsl/python:dsl",
],
)
py_library(
name = "e2e_test_common",
srcs = ["e2e_test_common.py"],
visibility = ["//visibility:public"],
)

View file

@ -65,6 +65,14 @@ py_test(
],
)
py_test(
name = "test_e2e",
srcs = ["test_e2e.py"],
data = [":bazel_graph.build"],
main = "test_e2e.py",
deps = ["//databuild/test/app:e2e_test_common"],
)
# Bazel-defined
## Graph
databuild_graph(

View file

@ -4,7 +4,7 @@ from collections import defaultdict
import sys
import json
LABEL_BASE = "//databuild/test/app"
LABEL_BASE = "//databuild/test/app/bazel"
def lookup(raw_ref: str):

View file

@ -0,0 +1,37 @@
#!/usr/bin/env python3
"""
End-to-end test for the bazel-defined test app.
Tests the full pipeline: build execution -> output verification -> JSON validation.
"""
import os
from databuild.test.app.e2e_test_common import DataBuildE2ETestBase
class BazelE2ETest(DataBuildE2ETestBase):
"""End-to-end test for the bazel-defined test app."""
def test_end_to_end_execution(self):
"""Test full end-to-end execution of the bazel graph."""
# Build possible paths for the bazel graph build binary
possible_paths = self.get_standard_runfiles_paths(
'databuild/test/app/bazel/bazel_graph.build'
)
# Add fallback paths for local testing
possible_paths.extend([
'bazel-bin/databuild/test/app/bazel/bazel_graph.build',
'./bazel_graph.build'
])
# Find the graph build binary
graph_build_path = self.find_graph_build_binary(possible_paths)
# Execute and verify the graph build
self.execute_and_verify_graph_build(graph_build_path)
if __name__ == '__main__':
import unittest
unittest.main()

View file

@ -22,3 +22,33 @@ databuild_dsl_generator(
deps = [":dsl_src"],
visibility = ["//visibility:public"],
)
# Generate fresh DSL output for comparison testing
genrule(
name = "generate_fresh_dsl",
outs = ["generated_fresh.tar"],
cmd_bash = """
# Create temporary directory for generation
mkdir -p temp_workspace/databuild/test/app/dsl
# Set environment to generate to temp directory
export BUILD_WORKSPACE_DIRECTORY="temp_workspace"
# Run the generator
$(location :graph.generate)
# Create tar archive of generated files
if [ -d "temp_workspace/databuild/test/app/dsl/generated" ]; then
find temp_workspace/databuild/test/app/dsl/generated -exec touch -t 197001010000 {} +
tar -cf $@ -C temp_workspace/databuild/test/app/dsl/generated .
else
# Create empty tar if no files generated
tar -cf $@ -T /dev/null
fi
# Clean up
rm -rf temp_workspace
""",
tools = [":graph.generate"],
visibility = ["//visibility:public"],
)

View file

@ -0,0 +1,9 @@
We can't write a direct `bazel test` for the DSL generated graph, because:
1. Bazel doesn't allow you to `bazel run graph.generate` to generate a BUILD.bazel that will be used in the same build.
2. We don't want to leak test generation into the graph generation code (since tests here are app specific)
Instead, we need to use a two phase process, where we rely on the graph to already be generated here, which will contain a test, such that `bazel test //...` will give us recall over generated source as well. This implies that this generated source is going to be checked in to git (gasp, I know), and we need a mechanism to ensure it stays up to date. To achieve this, we'll create a test that asserts that the contents of the `generated` dir is the exact same as the output of a new run of `graph.generate`.
Our task is to implement this test that asserts equality between the two, e.g. the target could depend on `graph.generate`, and in the test run it and md5 the results, comparing it to the md5 of the existing generated dir.

View file

@ -0,0 +1,71 @@
load("@databuild//databuild:rules.bzl", "databuild_job", "databuild_graph")
# Generated by DataBuild DSL - do not edit manually
# This file is generated in a subdirectory to avoid overwriting the original BUILD.bazel
py_binary(
name = "aggregate_color_votes_binary",
srcs = ["aggregate_color_votes.py"],
main = "aggregate_color_votes.py",
deps = ["@@//databuild/test/app/dsl:dsl_src"],
)
databuild_job(
name = "aggregate_color_votes",
binary = ":aggregate_color_votes_binary",
)
py_binary(
name = "color_vote_report_calc_binary",
srcs = ["color_vote_report_calc.py"],
main = "color_vote_report_calc.py",
deps = ["@@//databuild/test/app/dsl:dsl_src"],
)
databuild_job(
name = "color_vote_report_calc",
binary = ":color_vote_report_calc_binary",
)
py_binary(
name = "ingest_color_votes_binary",
srcs = ["ingest_color_votes.py"],
main = "ingest_color_votes.py",
deps = ["@@//databuild/test/app/dsl:dsl_src"],
)
databuild_job(
name = "ingest_color_votes",
binary = ":ingest_color_votes_binary",
)
py_binary(
name = "trailing_color_votes_binary",
srcs = ["trailing_color_votes.py"],
main = "trailing_color_votes.py",
deps = ["@@//databuild/test/app/dsl:dsl_src"],
)
databuild_job(
name = "trailing_color_votes",
binary = ":trailing_color_votes_binary",
)
py_binary(
name = "dsl_job_lookup",
srcs = ["dsl_job_lookup.py"],
deps = ["@@//databuild/test/app/dsl:dsl_src"],
)
databuild_graph(
name = "dsl_graph",
jobs = ["aggregate_color_votes", "color_vote_report_calc", "ingest_color_votes", "trailing_color_votes"],
lookup = ":dsl_job_lookup",
visibility = ["//visibility:public"],
)
# Create tar archive of generated files for testing
genrule(
name = "existing_generated",
srcs = glob(["*.py", "BUILD.bazel"]),
outs = ["existing_generated.tar"],
cmd = "mkdir -p temp && cp $(SRCS) temp/ && find temp -exec touch -t 197001010000 {} + && tar -cf $@ -C temp .",
visibility = ["//visibility:public"],
)

View file

@ -0,0 +1,58 @@
#!/usr/bin/env python3
"""
Generated job script for AggregateColorVotes.
"""
import sys
import json
from databuild.test.app.dsl.graph import AggregateColorVotes
from databuild.proto import PartitionRef, JobConfigureResponse, to_dict
def parse_outputs_from_args(args: list[str]) -> list:
"""Parse partition output references from command line arguments."""
outputs = []
for arg in args:
# Find which output type can deserialize this partition reference
for output_type in AggregateColorVotes.output_types:
try:
partition = output_type.deserialize(arg)
outputs.append(partition)
break
except ValueError:
continue
else:
raise ValueError(f"No output type in AggregateColorVotes can deserialize partition ref: {arg}")
return outputs
if __name__ == "__main__":
if len(sys.argv) < 2:
raise Exception(f"Invalid command usage")
command = sys.argv[1]
job_instance = AggregateColorVotes()
if command == "config":
# Parse output partition references as PartitionRef objects (for Rust wrapper)
output_refs = [PartitionRef(str=raw_ref) for raw_ref in sys.argv[2:]]
# Also parse them into DSL partition objects (for DSL job.config())
outputs = parse_outputs_from_args(sys.argv[2:])
# Call job's config method - returns list[JobConfig]
configs = job_instance.config(outputs)
# Wrap in JobConfigureResponse and serialize using to_dict()
response = JobConfigureResponse(configs=configs)
print(json.dumps(to_dict(response)))
elif command == "exec":
# The exec method expects a JobConfig but the Rust wrapper passes args
# For now, let the DSL job handle the args directly
# TODO: This needs to be refined based on actual Rust wrapper interface
job_instance.exec(*sys.argv[2:])
else:
raise Exception(f"Invalid command `{sys.argv[1]}`")

View file

@ -0,0 +1,58 @@
#!/usr/bin/env python3
"""
Generated job script for ColorVoteReportCalc.
"""
import sys
import json
from databuild.test.app.dsl.graph import ColorVoteReportCalc
from databuild.proto import PartitionRef, JobConfigureResponse, to_dict
def parse_outputs_from_args(args: list[str]) -> list:
"""Parse partition output references from command line arguments."""
outputs = []
for arg in args:
# Find which output type can deserialize this partition reference
for output_type in ColorVoteReportCalc.output_types:
try:
partition = output_type.deserialize(arg)
outputs.append(partition)
break
except ValueError:
continue
else:
raise ValueError(f"No output type in ColorVoteReportCalc can deserialize partition ref: {arg}")
return outputs
if __name__ == "__main__":
if len(sys.argv) < 2:
raise Exception(f"Invalid command usage")
command = sys.argv[1]
job_instance = ColorVoteReportCalc()
if command == "config":
# Parse output partition references as PartitionRef objects (for Rust wrapper)
output_refs = [PartitionRef(str=raw_ref) for raw_ref in sys.argv[2:]]
# Also parse them into DSL partition objects (for DSL job.config())
outputs = parse_outputs_from_args(sys.argv[2:])
# Call job's config method - returns list[JobConfig]
configs = job_instance.config(outputs)
# Wrap in JobConfigureResponse and serialize using to_dict()
response = JobConfigureResponse(configs=configs)
print(json.dumps(to_dict(response)))
elif command == "exec":
# The exec method expects a JobConfig but the Rust wrapper passes args
# For now, let the DSL job handle the args directly
# TODO: This needs to be refined based on actual Rust wrapper interface
job_instance.exec(*sys.argv[2:])
else:
raise Exception(f"Invalid command `{sys.argv[1]}`")

View file

@ -0,0 +1,53 @@
#!/usr/bin/env python3
"""
Generated job lookup for DataBuild DSL graph.
Maps partition patterns to job targets.
"""
import sys
import re
import json
from collections import defaultdict
# Mapping from partition patterns to job targets
JOB_MAPPINGS = {
r"daily_color_votes/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": "//databuild/test/app/dsl/generated:ingest_color_votes",
r"color_votes_1m/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": "//databuild/test/app/dsl/generated:trailing_color_votes",
r"color_votes_1w/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": "//databuild/test/app/dsl/generated:trailing_color_votes",
r"daily_votes/(?P<data_date>\d{4}-\d{2}-\d{2})": "//databuild/test/app/dsl/generated:aggregate_color_votes",
r"votes_1w/(?P<data_date>\d{4}-\d{2}-\d{2})": "//databuild/test/app/dsl/generated:aggregate_color_votes",
r"votes_1m/(?P<data_date>\d{4}-\d{2}-\d{2})": "//databuild/test/app/dsl/generated:aggregate_color_votes",
r"color_vote_report/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": "//databuild/test/app/dsl/generated:color_vote_report_calc",
}
def lookup_job_for_partition(partition_ref: str) -> str:
"""Look up which job can build the given partition reference."""
for pattern, job_target in JOB_MAPPINGS.items():
if re.match(pattern, partition_ref):
return job_target
raise ValueError(f"No job found for partition: {partition_ref}")
def main():
if len(sys.argv) < 2:
print("Usage: job_lookup.py <partition_ref> [partition_ref...]", file=sys.stderr)
sys.exit(1)
results = defaultdict(list)
try:
for partition_ref in sys.argv[1:]:
job_target = lookup_job_for_partition(partition_ref)
results[job_target].append(partition_ref)
# Output the results as JSON (matching existing lookup format)
print(json.dumps(dict(results)))
except ValueError as e:
print(f"ERROR: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,58 @@
#!/usr/bin/env python3
"""
Generated job script for IngestColorVotes.
"""
import sys
import json
from databuild.test.app.dsl.graph import IngestColorVotes
from databuild.proto import PartitionRef, JobConfigureResponse, to_dict
def parse_outputs_from_args(args: list[str]) -> list:
"""Parse partition output references from command line arguments."""
outputs = []
for arg in args:
# Find which output type can deserialize this partition reference
for output_type in IngestColorVotes.output_types:
try:
partition = output_type.deserialize(arg)
outputs.append(partition)
break
except ValueError:
continue
else:
raise ValueError(f"No output type in IngestColorVotes can deserialize partition ref: {arg}")
return outputs
if __name__ == "__main__":
if len(sys.argv) < 2:
raise Exception(f"Invalid command usage")
command = sys.argv[1]
job_instance = IngestColorVotes()
if command == "config":
# Parse output partition references as PartitionRef objects (for Rust wrapper)
output_refs = [PartitionRef(str=raw_ref) for raw_ref in sys.argv[2:]]
# Also parse them into DSL partition objects (for DSL job.config())
outputs = parse_outputs_from_args(sys.argv[2:])
# Call job's config method - returns list[JobConfig]
configs = job_instance.config(outputs)
# Wrap in JobConfigureResponse and serialize using to_dict()
response = JobConfigureResponse(configs=configs)
print(json.dumps(to_dict(response)))
elif command == "exec":
# The exec method expects a JobConfig but the Rust wrapper passes args
# For now, let the DSL job handle the args directly
# TODO: This needs to be refined based on actual Rust wrapper interface
job_instance.exec(*sys.argv[2:])
else:
raise Exception(f"Invalid command `{sys.argv[1]}`")

View file

@ -0,0 +1,58 @@
#!/usr/bin/env python3
"""
Generated job script for TrailingColorVotes.
"""
import sys
import json
from databuild.test.app.dsl.graph import TrailingColorVotes
from databuild.proto import PartitionRef, JobConfigureResponse, to_dict
def parse_outputs_from_args(args: list[str]) -> list:
"""Parse partition output references from command line arguments."""
outputs = []
for arg in args:
# Find which output type can deserialize this partition reference
for output_type in TrailingColorVotes.output_types:
try:
partition = output_type.deserialize(arg)
outputs.append(partition)
break
except ValueError:
continue
else:
raise ValueError(f"No output type in TrailingColorVotes can deserialize partition ref: {arg}")
return outputs
if __name__ == "__main__":
if len(sys.argv) < 2:
raise Exception(f"Invalid command usage")
command = sys.argv[1]
job_instance = TrailingColorVotes()
if command == "config":
# Parse output partition references as PartitionRef objects (for Rust wrapper)
output_refs = [PartitionRef(str=raw_ref) for raw_ref in sys.argv[2:]]
# Also parse them into DSL partition objects (for DSL job.config())
outputs = parse_outputs_from_args(sys.argv[2:])
# Call job's config method - returns list[JobConfig]
configs = job_instance.config(outputs)
# Wrap in JobConfigureResponse and serialize using to_dict()
response = JobConfigureResponse(configs=configs)
print(json.dumps(to_dict(response)))
elif command == "exec":
# The exec method expects a JobConfig but the Rust wrapper passes args
# For now, let the DSL job handle the args directly
# TODO: This needs to be refined based on actual Rust wrapper interface
job_instance.exec(*sys.argv[2:])
else:
raise Exception(f"Invalid command `{sys.argv[1]}`")

View file

@ -0,0 +1,7 @@
py_test(
name = "test_e2e",
srcs = ["test_e2e.py"],
data = ["//databuild/test/app/dsl/generated:dsl_graph.build"],
main = "test_e2e.py",
deps = ["//databuild/test/app:e2e_test_common"],
)

View file

@ -0,0 +1,37 @@
#!/usr/bin/env python3
"""
End-to-end test for the DSL-generated test app.
Tests the full pipeline: build execution -> output verification -> JSON validation.
"""
import os
from databuild.test.app.e2e_test_common import DataBuildE2ETestBase
class DSLGeneratedE2ETest(DataBuildE2ETestBase):
"""End-to-end test for the DSL-generated test app."""
def test_end_to_end_execution(self):
"""Test full end-to-end execution of the DSL-generated graph."""
# Build possible paths for the DSL-generated graph build binary
possible_paths = self.get_standard_runfiles_paths(
'databuild/test/app/dsl/generated/dsl_graph.build'
)
# Add fallback paths for local testing
possible_paths.extend([
'bazel-bin/databuild/test/app/dsl/generated/dsl_graph.build',
'./dsl_graph.build'
])
# Find the graph build binary
graph_build_path = self.find_graph_build_binary(possible_paths)
# Execute and verify the graph build
self.execute_and_verify_graph_build(graph_build_path)
if __name__ == '__main__':
import unittest
unittest.main()

View file

@ -73,3 +73,15 @@ py_test(
"//databuild/test/app/dsl:dsl_src",
],
)
# DSL generation consistency test
py_test(
name = "test_dsl_generation_consistency",
srcs = ["test_dsl_generation_consistency.py"],
main = "test_dsl_generation_consistency.py",
data = [
"//databuild/test/app/dsl:generate_fresh_dsl",
"//databuild/test/app/dsl/generated:existing_generated",
],
deps = [],
)

View file

@ -0,0 +1,105 @@
#!/usr/bin/env python3
"""
Test that verifies the generated DSL code is up-to-date.
This test ensures that the checked-in generated directory contents match
exactly what would be produced by a fresh run of graph.generate.
"""
import hashlib
import os
import subprocess
import tempfile
import unittest
from pathlib import Path
class TestDSLGenerationConsistency(unittest.TestCase):
def setUp(self):
# Find the test runfiles directory to locate tar files
runfiles_dir = os.environ.get("RUNFILES_DIR")
if runfiles_dir:
self.runfiles_root = Path(runfiles_dir) / "_main"
else:
# Fallback for development - not expected to work in this case
self.fail("RUNFILES_DIR not set - test must be run via bazel test")
def _compute_tar_hash(self, tar_path: Path) -> str:
"""Compute MD5 hash of a tar file's contents."""
if not tar_path.exists():
self.fail(f"Tar file not found: {tar_path}")
with open(tar_path, "rb") as f:
content = f.read()
return hashlib.md5(content).hexdigest()
def _extract_and_list_tar(self, tar_path: Path) -> set:
"""Extract tar file and return set of file paths and their content hashes."""
if not tar_path.exists():
return set()
result = subprocess.run([
"tar", "-tf", str(tar_path)
], capture_output=True, text=True)
if result.returncode != 0:
self.fail(f"Failed to list tar contents: {result.stderr}")
return set(result.stdout.strip().split('\n')) if result.stdout.strip() else set()
def test_generated_code_is_up_to_date(self):
"""Test that the existing generated tar matches the fresh generated tar."""
# Find the tar files from data dependencies
existing_tar = self.runfiles_root / "databuild/test/app/dsl/generated/existing_generated.tar"
fresh_tar = self.runfiles_root / "databuild/test/app/dsl/generated_fresh.tar"
# Compute hashes of both tar files
existing_hash = self._compute_tar_hash(existing_tar)
fresh_hash = self._compute_tar_hash(fresh_tar)
# Compare hashes
if existing_hash != fresh_hash:
# Provide detailed diff information
existing_files = self._extract_and_list_tar(existing_tar)
fresh_files = self._extract_and_list_tar(fresh_tar)
only_in_existing = existing_files - fresh_files
only_in_fresh = fresh_files - existing_files
error_msg = [
"Generated DSL code is out of date!",
f"Existing tar hash: {existing_hash}",
f"Fresh tar hash: {fresh_hash}",
"",
"To fix this, run:",
" bazel run //databuild/test/app/dsl:graph.generate",
""
]
if only_in_existing:
error_msg.extend([
"Files only in existing generated code:",
*[f" - {f}" for f in sorted(only_in_existing)],
""
])
if only_in_fresh:
error_msg.extend([
"Files only in fresh generated code:",
*[f" + {f}" for f in sorted(only_in_fresh)],
""
])
common_files = existing_files & fresh_files
if common_files:
error_msg.extend([
f"Common files: {len(common_files)}",
"This suggests files have different contents.",
])
self.fail("\n".join(error_msg))
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,103 @@
#!/usr/bin/env python3
"""
Common end-to-end test logic for DataBuild test apps.
Provides shared functionality for testing both bazel-defined and DSL-generated graphs.
"""
import json
import os
import shutil
import subprocess
import time
import unittest
from pathlib import Path
from typing import List, Optional
class DataBuildE2ETestBase(unittest.TestCase):
"""Base class for DataBuild end-to-end tests."""
def setUp(self):
"""Set up test environment."""
self.output_dir = Path("/tmp/data/color_votes_1w/2025-09-01/red")
self.output_file = self.output_dir / "data.json"
self.partition_ref = "color_votes_1w/2025-09-01/red"
# Clean up any existing test data
if self.output_dir.exists():
shutil.rmtree(self.output_dir)
def tearDown(self):
"""Clean up test environment."""
if self.output_dir.exists():
shutil.rmtree(self.output_dir)
def find_graph_build_binary(self, possible_paths: List[str]) -> str:
"""Find the graph.build binary from a list of possible paths."""
graph_build_path = None
for path in possible_paths:
if os.path.exists(path):
graph_build_path = path
break
self.assertIsNotNone(graph_build_path,
f"Graph build binary not found in any of: {possible_paths}")
return graph_build_path
def execute_and_verify_graph_build(self, graph_build_path: str) -> None:
"""Execute the graph build and verify the results."""
# Record start time for file modification check
start_time = time.time()
# Execute the graph build (shell script)
result = subprocess.run(
["bash", graph_build_path, self.partition_ref],
capture_output=True,
text=True
)
# Verify execution succeeded
self.assertEqual(result.returncode, 0,
f"Graph build failed with stderr: {result.stderr}")
# Verify output file was created
self.assertTrue(self.output_file.exists(),
f"Output file {self.output_file} was not created")
# Verify file was created recently (within 60 seconds)
file_mtime = os.path.getmtime(self.output_file)
time_diff = file_mtime - start_time
self.assertGreaterEqual(time_diff, -1, # Allow 1 second clock skew
f"File appears to be too old: {time_diff} seconds")
self.assertLessEqual(time_diff, 60,
f"File creation took too long: {time_diff} seconds")
# Verify file contains valid JSON
with open(self.output_file, 'r') as f:
content = f.read()
try:
data = json.loads(content)
except json.JSONDecodeError as e:
self.fail(f"Output file does not contain valid JSON: {e}")
# Basic sanity check on JSON structure
self.assertIsInstance(data, (dict, list),
"JSON should be an object or array")
def get_standard_runfiles_paths(self, relative_path: str) -> List[str]:
"""Get standard list of possible runfiles paths for a binary."""
runfiles_dir = os.environ.get("RUNFILES_DIR")
test_srcdir = os.environ.get("TEST_SRCDIR")
possible_paths = []
if runfiles_dir:
possible_paths.append(os.path.join(runfiles_dir, '_main', relative_path))
possible_paths.append(os.path.join(runfiles_dir, relative_path))
if test_srcdir:
possible_paths.append(os.path.join(test_srcdir, '_main', relative_path))
possible_paths.append(os.path.join(test_srcdir, relative_path))
return possible_paths