Compare commits
8 commits
bel-refact
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| ad2cc7498b | |||
| 5c9c2a05cc | |||
| 8fd1c9b046 | |||
| dc622dd0ac | |||
| b3298e7213 | |||
| f92cfeb9b5 | |||
| 07d2a9faec | |||
| 952366ab66 |
23 changed files with 772 additions and 8 deletions
|
|
@ -17,7 +17,9 @@ Graphs and jobs are defined in [bazel](https://bazel.build), allowing graphs (an
|
||||||
- **Jobs** - Their `exec` entrypoint builds partitions from partitions, and their `config` entrypoint specifies what partitions are required to produce the requested partition(s), along with the specific config to run `exec` with to build said partitions.
|
- **Jobs** - Their `exec` entrypoint builds partitions from partitions, and their `config` entrypoint specifies what partitions are required to produce the requested partition(s), along with the specific config to run `exec` with to build said partitions.
|
||||||
- **Graphs** - Composes jobs together to achieve multi-job orchestration, using a `lookup` mechanism to resolve a requested partition to the job that can build it. Together with its constituent jobs, Graphs can fully plan the build of any set of partitions. Most interactions with a DataBuild app happen with a graph.
|
- **Graphs** - Composes jobs together to achieve multi-job orchestration, using a `lookup` mechanism to resolve a requested partition to the job that can build it. Together with its constituent jobs, Graphs can fully plan the build of any set of partitions. Most interactions with a DataBuild app happen with a graph.
|
||||||
- **Build Event Log** - Encodes the state of the system, recording build requests, job activity, partition production, etc to enable running databuild as a deployed application.
|
- **Build Event Log** - Encodes the state of the system, recording build requests, job activity, partition production, etc to enable running databuild as a deployed application.
|
||||||
- **Bazel Targets** - Bazel is a fast, extensible, and hermetic build system. DataBuild uses bazel targets to describe graphs and jobs, making graphs themselves deployable application. Implementing a DataBuild app is the process of integrating your data build jobs in `databuild_job` bazel targets, and connecting them with a `databuild_graph` target.
|
- **Wants** - Partition wants can be registered with DataBuild, causing it to build the wanted partitions as soon as its graph-external dependencies are met.
|
||||||
|
- **Taints** - Taints mark a partition as invalid, indicating that readers should not use it, and that it should be rebuilt when requested or depended upon. If there is a still-active want for the tainted partition, it will be rebuilt immediately.
|
||||||
|
- **Bazel Targets** - Bazel is a fast, extensible, and hermetic build system. DataBuild uses bazel targets to describe graphs and jobs, making graphs themselves deployable application. Implementing a DataBuild app is the process of integrating your data build jobs in `databuild_job` bazel targets, and connecting them with a `databuild_graph` target.
|
||||||
- [**Graph Specification Strategies**](design/graph-specification.md) (coming soon) Application libraries in Python/Rust/Scala that use language features to enable ergonomic and succinct specification of jobs and graphs.
|
- [**Graph Specification Strategies**](design/graph-specification.md) (coming soon) Application libraries in Python/Rust/Scala that use language features to enable ergonomic and succinct specification of jobs and graphs.
|
||||||
|
|
||||||
### Partition / Job Assumptions and Best Practices
|
### Partition / Job Assumptions and Best Practices
|
||||||
|
|
@ -65,7 +67,8 @@ You can also use cron-based triggers, which return partition refs that they want
|
||||||
# Key Insights
|
# Key Insights
|
||||||
|
|
||||||
- Orchestration logic changes all the time - better to not write it at all.
|
- Orchestration logic changes all the time - better to not write it at all.
|
||||||
- Orchestration decisions and application logic is innately coupled
|
- Orchestration decisions and application logic is innately coupled.
|
||||||
|
- "systemd for data platforms"
|
||||||
|
|
||||||
## Assumptions
|
## Assumptions
|
||||||
|
|
||||||
|
|
|
||||||
22
README.md
22
README.md
|
|
@ -1,5 +1,25 @@
|
||||||
|
|
||||||
# DataBuild
|
```
|
||||||
|
██████╗ ████╗ ███████████╗ ████╗
|
||||||
|
██╔═══██╗ ██╔██║ ╚═══██╔════╝ ██╔██║
|
||||||
|
██╔╝ ██║ ██╔╝██║ ██╔╝ ██╔╝██║
|
||||||
|
██╔╝ ██║ ██╔╝ ██║ ██╔╝ ██╔╝ ██║
|
||||||
|
██╔╝ ██╔╝ ██╔╝ ██║ ██╔╝ ██╔╝ ██║
|
||||||
|
██╔╝ ██╔═╝ █████████║ ██╔╝ █████████║
|
||||||
|
████████╔═╝ ██╔═════██║ ██╔╝ ██╔═════██║
|
||||||
|
╚═══════╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝
|
||||||
|
|
||||||
|
██████╗ ██╗ ██╗ ██╗ ██╗ █████╗
|
||||||
|
██╔═══██╗ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔══██╗
|
||||||
|
██╔╝ ██║ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██║
|
||||||
|
█████████╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██║
|
||||||
|
██╔═══██╔═╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝
|
||||||
|
██╔╝ ██║ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔═╝
|
||||||
|
█████████╔╝ ██████╔═╝ ██╔╝ ████████╗ ███████╔═╝
|
||||||
|
╚════════╝ ╚═════╝ ╚═╝ ╚═══════╝ ╚══════╝
|
||||||
|
|
||||||
|
- -- S Y S T E M O N L I N E -- -
|
||||||
|
```
|
||||||
|
|
||||||
DataBuild is a trivially-deployable, partition-oriented, declarative data build system.
|
DataBuild is a trivially-deployable, partition-oriented, declarative data build system.
|
||||||
|
|
||||||
|
|
|
||||||
19
databuild/ascii_logo.txt
Normal file
19
databuild/ascii_logo.txt
Normal file
|
|
@ -0,0 +1,19 @@
|
||||||
|
██████╗ ████╗ ███████████╗ ████╗
|
||||||
|
██╔═══██╗ ██╔██║ ╚═══██╔════╝ ██╔██║
|
||||||
|
██╔╝ ██║ ██╔╝██║ ██╔╝ ██╔╝██║
|
||||||
|
██╔╝ ██║ ██╔╝ ██║ ██╔╝ ██╔╝ ██║
|
||||||
|
██╔╝ ██╔╝ ██╔╝ ██║ ██╔╝ ██╔╝ ██║
|
||||||
|
██╔╝ ██╔═╝ █████████║ ██╔╝ █████████║
|
||||||
|
████████╔═╝ ██╔═════██║ ██╔╝ ██╔═════██║
|
||||||
|
╚═══════╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝
|
||||||
|
|
||||||
|
██████╗ ██╗ ██╗ ██╗ ██╗ █████╗
|
||||||
|
██╔═══██╗ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔══██╗
|
||||||
|
██╔╝ ██║ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██║
|
||||||
|
█████████╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██║
|
||||||
|
██╔═══██╔═╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝
|
||||||
|
██╔╝ ██║ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔╝ ██╔═╝
|
||||||
|
█████████╔╝ ██████╔═╝ ██╔╝ ████████╗ ███████╔═╝
|
||||||
|
╚════════╝ ╚═════╝ ╚═╝ ╚═══════╝ ╚══════╝
|
||||||
|
|
||||||
|
- -- S Y S T E M O N L I N E -- -
|
||||||
|
|
@ -462,6 +462,8 @@ export const BuildStatus: TypedComponent<BuildStatusAttrs> = {
|
||||||
...(build.completed_at ? [{stage: 'Build Completed', time: build.completed_at, icon: '✅'}] : []),
|
...(build.completed_at ? [{stage: 'Build Completed', time: build.completed_at, icon: '✅'}] : []),
|
||||||
];
|
];
|
||||||
|
|
||||||
|
let startedAt = build.started_at || build.requested_at;
|
||||||
|
|
||||||
return m('div.container.mx-auto.p-4', [
|
return m('div.container.mx-auto.p-4', [
|
||||||
// Build Header
|
// Build Header
|
||||||
m('.build-header.mb-6', [
|
m('.build-header.mb-6', [
|
||||||
|
|
@ -485,8 +487,8 @@ export const BuildStatus: TypedComponent<BuildStatusAttrs> = {
|
||||||
]),
|
]),
|
||||||
m('.stat.bg-base-100.shadow.rounded-lg.p-4', [
|
m('.stat.bg-base-100.shadow.rounded-lg.p-4', [
|
||||||
m('.stat-title', 'Duration'),
|
m('.stat-title', 'Duration'),
|
||||||
m('.stat-value.text-2xl', (build.completed_at - build.started_at) ? formatDuration((build.completed_at - build.started_at)) : '—'),
|
m('.stat-value.text-2xl', (build.completed_at - startedAt) ? formatDuration((build.completed_at - startedAt)) : '—'),
|
||||||
m('.stat-desc', build.started_at ? formatDateTime(build.started_at) : 'Not started')
|
m('.stat-desc', startedAt ? formatDateTime(startedAt) : 'Not started')
|
||||||
])
|
])
|
||||||
])
|
])
|
||||||
]),
|
]),
|
||||||
|
|
|
||||||
|
|
@ -462,6 +462,7 @@ export function formatDateTime(epochNanos: number): string {
|
||||||
|
|
||||||
export function formatDuration(durationNanos?: number | null): string {
|
export function formatDuration(durationNanos?: number | null): string {
|
||||||
let durationMs = durationNanos ? durationNanos / 1000000 : null;
|
let durationMs = durationNanos ? durationNanos / 1000000 : null;
|
||||||
|
console.warn('Formatting duration:', durationMs);
|
||||||
if (!durationMs || durationMs <= 0) {
|
if (!durationMs || durationMs <= 0) {
|
||||||
return '—';
|
return '—';
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -120,7 +120,7 @@ class DataBuildGraph:
|
||||||
import os
|
import os
|
||||||
|
|
||||||
# Get job classes from the lookup table
|
# Get job classes from the lookup table
|
||||||
job_classes = list(set(self.lookup.values()))
|
job_classes = sorted(set(self.lookup.values()), key=lambda cls: cls.__name__)
|
||||||
|
|
||||||
# Format deps for BUILD.bazel
|
# Format deps for BUILD.bazel
|
||||||
if deps:
|
if deps:
|
||||||
|
|
@ -172,6 +172,15 @@ databuild_graph(
|
||||||
lookup = ":{name}_job_lookup",
|
lookup = ":{name}_job_lookup",
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Create tar archive of generated files for testing
|
||||||
|
genrule(
|
||||||
|
name = "existing_generated",
|
||||||
|
srcs = glob(["*.py", "BUILD.bazel"]),
|
||||||
|
outs = ["existing_generated.tar"],
|
||||||
|
cmd = "mkdir -p temp && cp $(SRCS) temp/ && find temp -exec touch -t 197001010000 {{}} + && tar -cf $@ -C temp .",
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
||||||
'''
|
'''
|
||||||
|
|
||||||
with open(os.path.join(output_dir, "BUILD.bazel"), "w") as f:
|
with open(os.path.join(output_dir, "BUILD.bazel"), "w") as f:
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,15 @@
|
||||||
py_library(
|
py_library(
|
||||||
name = "job_src",
|
name = "job_src",
|
||||||
srcs = glob(["**/*.py"]),
|
srcs = glob(["**/*.py"], exclude=["e2e_test_common.py"]),
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
deps = [
|
deps = [
|
||||||
"//databuild:py_proto",
|
"//databuild:py_proto",
|
||||||
"//databuild/dsl/python:dsl",
|
"//databuild/dsl/python:dsl",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
py_library(
|
||||||
|
name = "e2e_test_common",
|
||||||
|
srcs = ["e2e_test_common.py"],
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
||||||
|
|
|
||||||
|
|
@ -65,6 +65,14 @@ py_test(
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
py_test(
|
||||||
|
name = "test_e2e",
|
||||||
|
srcs = ["test_e2e.py"],
|
||||||
|
data = [":bazel_graph.build"],
|
||||||
|
main = "test_e2e.py",
|
||||||
|
deps = ["//databuild/test/app:e2e_test_common"],
|
||||||
|
)
|
||||||
|
|
||||||
# Bazel-defined
|
# Bazel-defined
|
||||||
## Graph
|
## Graph
|
||||||
databuild_graph(
|
databuild_graph(
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ from collections import defaultdict
|
||||||
import sys
|
import sys
|
||||||
import json
|
import json
|
||||||
|
|
||||||
LABEL_BASE = "//databuild/test/app"
|
LABEL_BASE = "//databuild/test/app/bazel"
|
||||||
|
|
||||||
|
|
||||||
def lookup(raw_ref: str):
|
def lookup(raw_ref: str):
|
||||||
|
|
|
||||||
37
databuild/test/app/bazel/test_e2e.py
Normal file
37
databuild/test/app/bazel/test_e2e.py
Normal file
|
|
@ -0,0 +1,37 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
End-to-end test for the bazel-defined test app.
|
||||||
|
|
||||||
|
Tests the full pipeline: build execution -> output verification -> JSON validation.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
from databuild.test.app.e2e_test_common import DataBuildE2ETestBase
|
||||||
|
|
||||||
|
|
||||||
|
class BazelE2ETest(DataBuildE2ETestBase):
|
||||||
|
"""End-to-end test for the bazel-defined test app."""
|
||||||
|
|
||||||
|
def test_end_to_end_execution(self):
|
||||||
|
"""Test full end-to-end execution of the bazel graph."""
|
||||||
|
# Build possible paths for the bazel graph build binary
|
||||||
|
possible_paths = self.get_standard_runfiles_paths(
|
||||||
|
'databuild/test/app/bazel/bazel_graph.build'
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add fallback paths for local testing
|
||||||
|
possible_paths.extend([
|
||||||
|
'bazel-bin/databuild/test/app/bazel/bazel_graph.build',
|
||||||
|
'./bazel_graph.build'
|
||||||
|
])
|
||||||
|
|
||||||
|
# Find the graph build binary
|
||||||
|
graph_build_path = self.find_graph_build_binary(possible_paths)
|
||||||
|
|
||||||
|
# Execute and verify the graph build
|
||||||
|
self.execute_and_verify_graph_build(graph_build_path)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
import unittest
|
||||||
|
unittest.main()
|
||||||
|
|
@ -22,3 +22,33 @@ databuild_dsl_generator(
|
||||||
deps = [":dsl_src"],
|
deps = [":dsl_src"],
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Generate fresh DSL output for comparison testing
|
||||||
|
genrule(
|
||||||
|
name = "generate_fresh_dsl",
|
||||||
|
outs = ["generated_fresh.tar"],
|
||||||
|
cmd_bash = """
|
||||||
|
# Create temporary directory for generation
|
||||||
|
mkdir -p temp_workspace/databuild/test/app/dsl
|
||||||
|
|
||||||
|
# Set environment to generate to temp directory
|
||||||
|
export BUILD_WORKSPACE_DIRECTORY="temp_workspace"
|
||||||
|
|
||||||
|
# Run the generator
|
||||||
|
$(location :graph.generate)
|
||||||
|
|
||||||
|
# Create tar archive of generated files
|
||||||
|
if [ -d "temp_workspace/databuild/test/app/dsl/generated" ]; then
|
||||||
|
find temp_workspace/databuild/test/app/dsl/generated -exec touch -t 197001010000 {} +
|
||||||
|
tar -cf $@ -C temp_workspace/databuild/test/app/dsl/generated .
|
||||||
|
else
|
||||||
|
# Create empty tar if no files generated
|
||||||
|
tar -cf $@ -T /dev/null
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Clean up
|
||||||
|
rm -rf temp_workspace
|
||||||
|
""",
|
||||||
|
tools = [":graph.generate"],
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
||||||
|
|
|
||||||
9
databuild/test/app/dsl/claude-generated-dsl-test.md
Normal file
9
databuild/test/app/dsl/claude-generated-dsl-test.md
Normal file
|
|
@ -0,0 +1,9 @@
|
||||||
|
|
||||||
|
We can't write a direct `bazel test` for the DSL generated graph, because:
|
||||||
|
|
||||||
|
1. Bazel doesn't allow you to `bazel run graph.generate` to generate a BUILD.bazel that will be used in the same build.
|
||||||
|
2. We don't want to leak test generation into the graph generation code (since tests here are app specific)
|
||||||
|
|
||||||
|
Instead, we need to use a two phase process, where we rely on the graph to already be generated here, which will contain a test, such that `bazel test //...` will give us recall over generated source as well. This implies that this generated source is going to be checked in to git (gasp, I know), and we need a mechanism to ensure it stays up to date. To achieve this, we'll create a test that asserts that the contents of the `generated` dir is the exact same as the output of a new run of `graph.generate`.
|
||||||
|
|
||||||
|
Our task is to implement this test that asserts equality between the two, e.g. the target could depend on `graph.generate`, and in the test run it and md5 the results, comparing it to the md5 of the existing generated dir.
|
||||||
71
databuild/test/app/dsl/generated/BUILD.bazel
Normal file
71
databuild/test/app/dsl/generated/BUILD.bazel
Normal file
|
|
@ -0,0 +1,71 @@
|
||||||
|
load("@databuild//databuild:rules.bzl", "databuild_job", "databuild_graph")
|
||||||
|
|
||||||
|
# Generated by DataBuild DSL - do not edit manually
|
||||||
|
# This file is generated in a subdirectory to avoid overwriting the original BUILD.bazel
|
||||||
|
|
||||||
|
py_binary(
|
||||||
|
name = "aggregate_color_votes_binary",
|
||||||
|
srcs = ["aggregate_color_votes.py"],
|
||||||
|
main = "aggregate_color_votes.py",
|
||||||
|
deps = ["@@//databuild/test/app/dsl:dsl_src"],
|
||||||
|
)
|
||||||
|
|
||||||
|
databuild_job(
|
||||||
|
name = "aggregate_color_votes",
|
||||||
|
binary = ":aggregate_color_votes_binary",
|
||||||
|
)
|
||||||
|
py_binary(
|
||||||
|
name = "color_vote_report_calc_binary",
|
||||||
|
srcs = ["color_vote_report_calc.py"],
|
||||||
|
main = "color_vote_report_calc.py",
|
||||||
|
deps = ["@@//databuild/test/app/dsl:dsl_src"],
|
||||||
|
)
|
||||||
|
|
||||||
|
databuild_job(
|
||||||
|
name = "color_vote_report_calc",
|
||||||
|
binary = ":color_vote_report_calc_binary",
|
||||||
|
)
|
||||||
|
py_binary(
|
||||||
|
name = "ingest_color_votes_binary",
|
||||||
|
srcs = ["ingest_color_votes.py"],
|
||||||
|
main = "ingest_color_votes.py",
|
||||||
|
deps = ["@@//databuild/test/app/dsl:dsl_src"],
|
||||||
|
)
|
||||||
|
|
||||||
|
databuild_job(
|
||||||
|
name = "ingest_color_votes",
|
||||||
|
binary = ":ingest_color_votes_binary",
|
||||||
|
)
|
||||||
|
py_binary(
|
||||||
|
name = "trailing_color_votes_binary",
|
||||||
|
srcs = ["trailing_color_votes.py"],
|
||||||
|
main = "trailing_color_votes.py",
|
||||||
|
deps = ["@@//databuild/test/app/dsl:dsl_src"],
|
||||||
|
)
|
||||||
|
|
||||||
|
databuild_job(
|
||||||
|
name = "trailing_color_votes",
|
||||||
|
binary = ":trailing_color_votes_binary",
|
||||||
|
)
|
||||||
|
|
||||||
|
py_binary(
|
||||||
|
name = "dsl_job_lookup",
|
||||||
|
srcs = ["dsl_job_lookup.py"],
|
||||||
|
deps = ["@@//databuild/test/app/dsl:dsl_src"],
|
||||||
|
)
|
||||||
|
|
||||||
|
databuild_graph(
|
||||||
|
name = "dsl_graph",
|
||||||
|
jobs = ["aggregate_color_votes", "color_vote_report_calc", "ingest_color_votes", "trailing_color_votes"],
|
||||||
|
lookup = ":dsl_job_lookup",
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create tar archive of generated files for testing
|
||||||
|
genrule(
|
||||||
|
name = "existing_generated",
|
||||||
|
srcs = glob(["*.py", "BUILD.bazel"]),
|
||||||
|
outs = ["existing_generated.tar"],
|
||||||
|
cmd = "mkdir -p temp && cp $(SRCS) temp/ && find temp -exec touch -t 197001010000 {} + && tar -cf $@ -C temp .",
|
||||||
|
visibility = ["//visibility:public"],
|
||||||
|
)
|
||||||
58
databuild/test/app/dsl/generated/aggregate_color_votes.py
Executable file
58
databuild/test/app/dsl/generated/aggregate_color_votes.py
Executable file
|
|
@ -0,0 +1,58 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Generated job script for AggregateColorVotes.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
from databuild.test.app.dsl.graph import AggregateColorVotes
|
||||||
|
from databuild.proto import PartitionRef, JobConfigureResponse, to_dict
|
||||||
|
|
||||||
|
|
||||||
|
def parse_outputs_from_args(args: list[str]) -> list:
|
||||||
|
"""Parse partition output references from command line arguments."""
|
||||||
|
outputs = []
|
||||||
|
for arg in args:
|
||||||
|
# Find which output type can deserialize this partition reference
|
||||||
|
for output_type in AggregateColorVotes.output_types:
|
||||||
|
try:
|
||||||
|
partition = output_type.deserialize(arg)
|
||||||
|
outputs.append(partition)
|
||||||
|
break
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
raise ValueError(f"No output type in AggregateColorVotes can deserialize partition ref: {arg}")
|
||||||
|
|
||||||
|
return outputs
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
raise Exception(f"Invalid command usage")
|
||||||
|
|
||||||
|
command = sys.argv[1]
|
||||||
|
job_instance = AggregateColorVotes()
|
||||||
|
|
||||||
|
if command == "config":
|
||||||
|
# Parse output partition references as PartitionRef objects (for Rust wrapper)
|
||||||
|
output_refs = [PartitionRef(str=raw_ref) for raw_ref in sys.argv[2:]]
|
||||||
|
|
||||||
|
# Also parse them into DSL partition objects (for DSL job.config())
|
||||||
|
outputs = parse_outputs_from_args(sys.argv[2:])
|
||||||
|
|
||||||
|
# Call job's config method - returns list[JobConfig]
|
||||||
|
configs = job_instance.config(outputs)
|
||||||
|
|
||||||
|
# Wrap in JobConfigureResponse and serialize using to_dict()
|
||||||
|
response = JobConfigureResponse(configs=configs)
|
||||||
|
print(json.dumps(to_dict(response)))
|
||||||
|
|
||||||
|
elif command == "exec":
|
||||||
|
# The exec method expects a JobConfig but the Rust wrapper passes args
|
||||||
|
# For now, let the DSL job handle the args directly
|
||||||
|
# TODO: This needs to be refined based on actual Rust wrapper interface
|
||||||
|
job_instance.exec(*sys.argv[2:])
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise Exception(f"Invalid command `{sys.argv[1]}`")
|
||||||
58
databuild/test/app/dsl/generated/color_vote_report_calc.py
Executable file
58
databuild/test/app/dsl/generated/color_vote_report_calc.py
Executable file
|
|
@ -0,0 +1,58 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Generated job script for ColorVoteReportCalc.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
from databuild.test.app.dsl.graph import ColorVoteReportCalc
|
||||||
|
from databuild.proto import PartitionRef, JobConfigureResponse, to_dict
|
||||||
|
|
||||||
|
|
||||||
|
def parse_outputs_from_args(args: list[str]) -> list:
|
||||||
|
"""Parse partition output references from command line arguments."""
|
||||||
|
outputs = []
|
||||||
|
for arg in args:
|
||||||
|
# Find which output type can deserialize this partition reference
|
||||||
|
for output_type in ColorVoteReportCalc.output_types:
|
||||||
|
try:
|
||||||
|
partition = output_type.deserialize(arg)
|
||||||
|
outputs.append(partition)
|
||||||
|
break
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
raise ValueError(f"No output type in ColorVoteReportCalc can deserialize partition ref: {arg}")
|
||||||
|
|
||||||
|
return outputs
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
raise Exception(f"Invalid command usage")
|
||||||
|
|
||||||
|
command = sys.argv[1]
|
||||||
|
job_instance = ColorVoteReportCalc()
|
||||||
|
|
||||||
|
if command == "config":
|
||||||
|
# Parse output partition references as PartitionRef objects (for Rust wrapper)
|
||||||
|
output_refs = [PartitionRef(str=raw_ref) for raw_ref in sys.argv[2:]]
|
||||||
|
|
||||||
|
# Also parse them into DSL partition objects (for DSL job.config())
|
||||||
|
outputs = parse_outputs_from_args(sys.argv[2:])
|
||||||
|
|
||||||
|
# Call job's config method - returns list[JobConfig]
|
||||||
|
configs = job_instance.config(outputs)
|
||||||
|
|
||||||
|
# Wrap in JobConfigureResponse and serialize using to_dict()
|
||||||
|
response = JobConfigureResponse(configs=configs)
|
||||||
|
print(json.dumps(to_dict(response)))
|
||||||
|
|
||||||
|
elif command == "exec":
|
||||||
|
# The exec method expects a JobConfig but the Rust wrapper passes args
|
||||||
|
# For now, let the DSL job handle the args directly
|
||||||
|
# TODO: This needs to be refined based on actual Rust wrapper interface
|
||||||
|
job_instance.exec(*sys.argv[2:])
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise Exception(f"Invalid command `{sys.argv[1]}`")
|
||||||
53
databuild/test/app/dsl/generated/dsl_job_lookup.py
Executable file
53
databuild/test/app/dsl/generated/dsl_job_lookup.py
Executable file
|
|
@ -0,0 +1,53 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Generated job lookup for DataBuild DSL graph.
|
||||||
|
Maps partition patterns to job targets.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import re
|
||||||
|
import json
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
|
|
||||||
|
# Mapping from partition patterns to job targets
|
||||||
|
JOB_MAPPINGS = {
|
||||||
|
r"daily_color_votes/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": "//databuild/test/app/dsl/generated:ingest_color_votes",
|
||||||
|
r"color_votes_1m/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": "//databuild/test/app/dsl/generated:trailing_color_votes",
|
||||||
|
r"color_votes_1w/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": "//databuild/test/app/dsl/generated:trailing_color_votes",
|
||||||
|
r"daily_votes/(?P<data_date>\d{4}-\d{2}-\d{2})": "//databuild/test/app/dsl/generated:aggregate_color_votes",
|
||||||
|
r"votes_1w/(?P<data_date>\d{4}-\d{2}-\d{2})": "//databuild/test/app/dsl/generated:aggregate_color_votes",
|
||||||
|
r"votes_1m/(?P<data_date>\d{4}-\d{2}-\d{2})": "//databuild/test/app/dsl/generated:aggregate_color_votes",
|
||||||
|
r"color_vote_report/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)": "//databuild/test/app/dsl/generated:color_vote_report_calc",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def lookup_job_for_partition(partition_ref: str) -> str:
|
||||||
|
"""Look up which job can build the given partition reference."""
|
||||||
|
for pattern, job_target in JOB_MAPPINGS.items():
|
||||||
|
if re.match(pattern, partition_ref):
|
||||||
|
return job_target
|
||||||
|
|
||||||
|
raise ValueError(f"No job found for partition: {partition_ref}")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
print("Usage: job_lookup.py <partition_ref> [partition_ref...]", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
results = defaultdict(list)
|
||||||
|
try:
|
||||||
|
for partition_ref in sys.argv[1:]:
|
||||||
|
job_target = lookup_job_for_partition(partition_ref)
|
||||||
|
results[job_target].append(partition_ref)
|
||||||
|
|
||||||
|
# Output the results as JSON (matching existing lookup format)
|
||||||
|
print(json.dumps(dict(results)))
|
||||||
|
except ValueError as e:
|
||||||
|
print(f"ERROR: {e}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
58
databuild/test/app/dsl/generated/ingest_color_votes.py
Executable file
58
databuild/test/app/dsl/generated/ingest_color_votes.py
Executable file
|
|
@ -0,0 +1,58 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Generated job script for IngestColorVotes.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
from databuild.test.app.dsl.graph import IngestColorVotes
|
||||||
|
from databuild.proto import PartitionRef, JobConfigureResponse, to_dict
|
||||||
|
|
||||||
|
|
||||||
|
def parse_outputs_from_args(args: list[str]) -> list:
|
||||||
|
"""Parse partition output references from command line arguments."""
|
||||||
|
outputs = []
|
||||||
|
for arg in args:
|
||||||
|
# Find which output type can deserialize this partition reference
|
||||||
|
for output_type in IngestColorVotes.output_types:
|
||||||
|
try:
|
||||||
|
partition = output_type.deserialize(arg)
|
||||||
|
outputs.append(partition)
|
||||||
|
break
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
raise ValueError(f"No output type in IngestColorVotes can deserialize partition ref: {arg}")
|
||||||
|
|
||||||
|
return outputs
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
raise Exception(f"Invalid command usage")
|
||||||
|
|
||||||
|
command = sys.argv[1]
|
||||||
|
job_instance = IngestColorVotes()
|
||||||
|
|
||||||
|
if command == "config":
|
||||||
|
# Parse output partition references as PartitionRef objects (for Rust wrapper)
|
||||||
|
output_refs = [PartitionRef(str=raw_ref) for raw_ref in sys.argv[2:]]
|
||||||
|
|
||||||
|
# Also parse them into DSL partition objects (for DSL job.config())
|
||||||
|
outputs = parse_outputs_from_args(sys.argv[2:])
|
||||||
|
|
||||||
|
# Call job's config method - returns list[JobConfig]
|
||||||
|
configs = job_instance.config(outputs)
|
||||||
|
|
||||||
|
# Wrap in JobConfigureResponse and serialize using to_dict()
|
||||||
|
response = JobConfigureResponse(configs=configs)
|
||||||
|
print(json.dumps(to_dict(response)))
|
||||||
|
|
||||||
|
elif command == "exec":
|
||||||
|
# The exec method expects a JobConfig but the Rust wrapper passes args
|
||||||
|
# For now, let the DSL job handle the args directly
|
||||||
|
# TODO: This needs to be refined based on actual Rust wrapper interface
|
||||||
|
job_instance.exec(*sys.argv[2:])
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise Exception(f"Invalid command `{sys.argv[1]}`")
|
||||||
58
databuild/test/app/dsl/generated/trailing_color_votes.py
Executable file
58
databuild/test/app/dsl/generated/trailing_color_votes.py
Executable file
|
|
@ -0,0 +1,58 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Generated job script for TrailingColorVotes.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
from databuild.test.app.dsl.graph import TrailingColorVotes
|
||||||
|
from databuild.proto import PartitionRef, JobConfigureResponse, to_dict
|
||||||
|
|
||||||
|
|
||||||
|
def parse_outputs_from_args(args: list[str]) -> list:
|
||||||
|
"""Parse partition output references from command line arguments."""
|
||||||
|
outputs = []
|
||||||
|
for arg in args:
|
||||||
|
# Find which output type can deserialize this partition reference
|
||||||
|
for output_type in TrailingColorVotes.output_types:
|
||||||
|
try:
|
||||||
|
partition = output_type.deserialize(arg)
|
||||||
|
outputs.append(partition)
|
||||||
|
break
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
raise ValueError(f"No output type in TrailingColorVotes can deserialize partition ref: {arg}")
|
||||||
|
|
||||||
|
return outputs
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
raise Exception(f"Invalid command usage")
|
||||||
|
|
||||||
|
command = sys.argv[1]
|
||||||
|
job_instance = TrailingColorVotes()
|
||||||
|
|
||||||
|
if command == "config":
|
||||||
|
# Parse output partition references as PartitionRef objects (for Rust wrapper)
|
||||||
|
output_refs = [PartitionRef(str=raw_ref) for raw_ref in sys.argv[2:]]
|
||||||
|
|
||||||
|
# Also parse them into DSL partition objects (for DSL job.config())
|
||||||
|
outputs = parse_outputs_from_args(sys.argv[2:])
|
||||||
|
|
||||||
|
# Call job's config method - returns list[JobConfig]
|
||||||
|
configs = job_instance.config(outputs)
|
||||||
|
|
||||||
|
# Wrap in JobConfigureResponse and serialize using to_dict()
|
||||||
|
response = JobConfigureResponse(configs=configs)
|
||||||
|
print(json.dumps(to_dict(response)))
|
||||||
|
|
||||||
|
elif command == "exec":
|
||||||
|
# The exec method expects a JobConfig but the Rust wrapper passes args
|
||||||
|
# For now, let the DSL job handle the args directly
|
||||||
|
# TODO: This needs to be refined based on actual Rust wrapper interface
|
||||||
|
job_instance.exec(*sys.argv[2:])
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise Exception(f"Invalid command `{sys.argv[1]}`")
|
||||||
7
databuild/test/app/dsl/generated_test/BUILD.bazel
Normal file
7
databuild/test/app/dsl/generated_test/BUILD.bazel
Normal file
|
|
@ -0,0 +1,7 @@
|
||||||
|
py_test(
|
||||||
|
name = "test_e2e",
|
||||||
|
srcs = ["test_e2e.py"],
|
||||||
|
data = ["//databuild/test/app/dsl/generated:dsl_graph.build"],
|
||||||
|
main = "test_e2e.py",
|
||||||
|
deps = ["//databuild/test/app:e2e_test_common"],
|
||||||
|
)
|
||||||
37
databuild/test/app/dsl/generated_test/test_e2e.py
Normal file
37
databuild/test/app/dsl/generated_test/test_e2e.py
Normal file
|
|
@ -0,0 +1,37 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
End-to-end test for the DSL-generated test app.
|
||||||
|
|
||||||
|
Tests the full pipeline: build execution -> output verification -> JSON validation.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
from databuild.test.app.e2e_test_common import DataBuildE2ETestBase
|
||||||
|
|
||||||
|
|
||||||
|
class DSLGeneratedE2ETest(DataBuildE2ETestBase):
|
||||||
|
"""End-to-end test for the DSL-generated test app."""
|
||||||
|
|
||||||
|
def test_end_to_end_execution(self):
|
||||||
|
"""Test full end-to-end execution of the DSL-generated graph."""
|
||||||
|
# Build possible paths for the DSL-generated graph build binary
|
||||||
|
possible_paths = self.get_standard_runfiles_paths(
|
||||||
|
'databuild/test/app/dsl/generated/dsl_graph.build'
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add fallback paths for local testing
|
||||||
|
possible_paths.extend([
|
||||||
|
'bazel-bin/databuild/test/app/dsl/generated/dsl_graph.build',
|
||||||
|
'./dsl_graph.build'
|
||||||
|
])
|
||||||
|
|
||||||
|
# Find the graph build binary
|
||||||
|
graph_build_path = self.find_graph_build_binary(possible_paths)
|
||||||
|
|
||||||
|
# Execute and verify the graph build
|
||||||
|
self.execute_and_verify_graph_build(graph_build_path)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
import unittest
|
||||||
|
unittest.main()
|
||||||
|
|
@ -73,3 +73,15 @@ py_test(
|
||||||
"//databuild/test/app/dsl:dsl_src",
|
"//databuild/test/app/dsl:dsl_src",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# DSL generation consistency test
|
||||||
|
py_test(
|
||||||
|
name = "test_dsl_generation_consistency",
|
||||||
|
srcs = ["test_dsl_generation_consistency.py"],
|
||||||
|
main = "test_dsl_generation_consistency.py",
|
||||||
|
data = [
|
||||||
|
"//databuild/test/app/dsl:generate_fresh_dsl",
|
||||||
|
"//databuild/test/app/dsl/generated:existing_generated",
|
||||||
|
],
|
||||||
|
deps = [],
|
||||||
|
)
|
||||||
|
|
|
||||||
105
databuild/test/app/dsl/test/test_dsl_generation_consistency.py
Normal file
105
databuild/test/app/dsl/test/test_dsl_generation_consistency.py
Normal file
|
|
@ -0,0 +1,105 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Test that verifies the generated DSL code is up-to-date.
|
||||||
|
|
||||||
|
This test ensures that the checked-in generated directory contents match
|
||||||
|
exactly what would be produced by a fresh run of graph.generate.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
import tempfile
|
||||||
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
class TestDSLGenerationConsistency(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
# Find the test runfiles directory to locate tar files
|
||||||
|
runfiles_dir = os.environ.get("RUNFILES_DIR")
|
||||||
|
if runfiles_dir:
|
||||||
|
self.runfiles_root = Path(runfiles_dir) / "_main"
|
||||||
|
else:
|
||||||
|
# Fallback for development - not expected to work in this case
|
||||||
|
self.fail("RUNFILES_DIR not set - test must be run via bazel test")
|
||||||
|
|
||||||
|
def _compute_tar_hash(self, tar_path: Path) -> str:
|
||||||
|
"""Compute MD5 hash of a tar file's contents."""
|
||||||
|
if not tar_path.exists():
|
||||||
|
self.fail(f"Tar file not found: {tar_path}")
|
||||||
|
|
||||||
|
with open(tar_path, "rb") as f:
|
||||||
|
content = f.read()
|
||||||
|
return hashlib.md5(content).hexdigest()
|
||||||
|
|
||||||
|
def _extract_and_list_tar(self, tar_path: Path) -> set:
|
||||||
|
"""Extract tar file and return set of file paths and their content hashes."""
|
||||||
|
if not tar_path.exists():
|
||||||
|
return set()
|
||||||
|
|
||||||
|
result = subprocess.run([
|
||||||
|
"tar", "-tf", str(tar_path)
|
||||||
|
], capture_output=True, text=True)
|
||||||
|
|
||||||
|
if result.returncode != 0:
|
||||||
|
self.fail(f"Failed to list tar contents: {result.stderr}")
|
||||||
|
|
||||||
|
return set(result.stdout.strip().split('\n')) if result.stdout.strip() else set()
|
||||||
|
|
||||||
|
def test_generated_code_is_up_to_date(self):
|
||||||
|
"""Test that the existing generated tar matches the fresh generated tar."""
|
||||||
|
|
||||||
|
# Find the tar files from data dependencies
|
||||||
|
existing_tar = self.runfiles_root / "databuild/test/app/dsl/generated/existing_generated.tar"
|
||||||
|
fresh_tar = self.runfiles_root / "databuild/test/app/dsl/generated_fresh.tar"
|
||||||
|
|
||||||
|
# Compute hashes of both tar files
|
||||||
|
existing_hash = self._compute_tar_hash(existing_tar)
|
||||||
|
fresh_hash = self._compute_tar_hash(fresh_tar)
|
||||||
|
|
||||||
|
# Compare hashes
|
||||||
|
if existing_hash != fresh_hash:
|
||||||
|
# Provide detailed diff information
|
||||||
|
existing_files = self._extract_and_list_tar(existing_tar)
|
||||||
|
fresh_files = self._extract_and_list_tar(fresh_tar)
|
||||||
|
|
||||||
|
only_in_existing = existing_files - fresh_files
|
||||||
|
only_in_fresh = fresh_files - existing_files
|
||||||
|
|
||||||
|
error_msg = [
|
||||||
|
"Generated DSL code is out of date!",
|
||||||
|
f"Existing tar hash: {existing_hash}",
|
||||||
|
f"Fresh tar hash: {fresh_hash}",
|
||||||
|
"",
|
||||||
|
"To fix this, run:",
|
||||||
|
" bazel run //databuild/test/app/dsl:graph.generate",
|
||||||
|
""
|
||||||
|
]
|
||||||
|
|
||||||
|
if only_in_existing:
|
||||||
|
error_msg.extend([
|
||||||
|
"Files only in existing generated code:",
|
||||||
|
*[f" - {f}" for f in sorted(only_in_existing)],
|
||||||
|
""
|
||||||
|
])
|
||||||
|
|
||||||
|
if only_in_fresh:
|
||||||
|
error_msg.extend([
|
||||||
|
"Files only in fresh generated code:",
|
||||||
|
*[f" + {f}" for f in sorted(only_in_fresh)],
|
||||||
|
""
|
||||||
|
])
|
||||||
|
|
||||||
|
common_files = existing_files & fresh_files
|
||||||
|
if common_files:
|
||||||
|
error_msg.extend([
|
||||||
|
f"Common files: {len(common_files)}",
|
||||||
|
"This suggests files have different contents.",
|
||||||
|
])
|
||||||
|
|
||||||
|
self.fail("\n".join(error_msg))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
103
databuild/test/app/e2e_test_common.py
Normal file
103
databuild/test/app/e2e_test_common.py
Normal file
|
|
@ -0,0 +1,103 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Common end-to-end test logic for DataBuild test apps.
|
||||||
|
|
||||||
|
Provides shared functionality for testing both bazel-defined and DSL-generated graphs.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import subprocess
|
||||||
|
import time
|
||||||
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import List, Optional
|
||||||
|
|
||||||
|
|
||||||
|
class DataBuildE2ETestBase(unittest.TestCase):
|
||||||
|
"""Base class for DataBuild end-to-end tests."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
"""Set up test environment."""
|
||||||
|
self.output_dir = Path("/tmp/data/color_votes_1w/2025-09-01/red")
|
||||||
|
self.output_file = self.output_dir / "data.json"
|
||||||
|
self.partition_ref = "color_votes_1w/2025-09-01/red"
|
||||||
|
|
||||||
|
# Clean up any existing test data
|
||||||
|
if self.output_dir.exists():
|
||||||
|
shutil.rmtree(self.output_dir)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
"""Clean up test environment."""
|
||||||
|
if self.output_dir.exists():
|
||||||
|
shutil.rmtree(self.output_dir)
|
||||||
|
|
||||||
|
def find_graph_build_binary(self, possible_paths: List[str]) -> str:
|
||||||
|
"""Find the graph.build binary from a list of possible paths."""
|
||||||
|
graph_build_path = None
|
||||||
|
for path in possible_paths:
|
||||||
|
if os.path.exists(path):
|
||||||
|
graph_build_path = path
|
||||||
|
break
|
||||||
|
|
||||||
|
self.assertIsNotNone(graph_build_path,
|
||||||
|
f"Graph build binary not found in any of: {possible_paths}")
|
||||||
|
return graph_build_path
|
||||||
|
|
||||||
|
def execute_and_verify_graph_build(self, graph_build_path: str) -> None:
|
||||||
|
"""Execute the graph build and verify the results."""
|
||||||
|
# Record start time for file modification check
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
# Execute the graph build (shell script)
|
||||||
|
result = subprocess.run(
|
||||||
|
["bash", graph_build_path, self.partition_ref],
|
||||||
|
capture_output=True,
|
||||||
|
text=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify execution succeeded
|
||||||
|
self.assertEqual(result.returncode, 0,
|
||||||
|
f"Graph build failed with stderr: {result.stderr}")
|
||||||
|
|
||||||
|
# Verify output file was created
|
||||||
|
self.assertTrue(self.output_file.exists(),
|
||||||
|
f"Output file {self.output_file} was not created")
|
||||||
|
|
||||||
|
# Verify file was created recently (within 60 seconds)
|
||||||
|
file_mtime = os.path.getmtime(self.output_file)
|
||||||
|
time_diff = file_mtime - start_time
|
||||||
|
self.assertGreaterEqual(time_diff, -1, # Allow 1 second clock skew
|
||||||
|
f"File appears to be too old: {time_diff} seconds")
|
||||||
|
self.assertLessEqual(time_diff, 60,
|
||||||
|
f"File creation took too long: {time_diff} seconds")
|
||||||
|
|
||||||
|
# Verify file contains valid JSON
|
||||||
|
with open(self.output_file, 'r') as f:
|
||||||
|
content = f.read()
|
||||||
|
|
||||||
|
try:
|
||||||
|
data = json.loads(content)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
self.fail(f"Output file does not contain valid JSON: {e}")
|
||||||
|
|
||||||
|
# Basic sanity check on JSON structure
|
||||||
|
self.assertIsInstance(data, (dict, list),
|
||||||
|
"JSON should be an object or array")
|
||||||
|
|
||||||
|
def get_standard_runfiles_paths(self, relative_path: str) -> List[str]:
|
||||||
|
"""Get standard list of possible runfiles paths for a binary."""
|
||||||
|
runfiles_dir = os.environ.get("RUNFILES_DIR")
|
||||||
|
test_srcdir = os.environ.get("TEST_SRCDIR")
|
||||||
|
|
||||||
|
possible_paths = []
|
||||||
|
if runfiles_dir:
|
||||||
|
possible_paths.append(os.path.join(runfiles_dir, '_main', relative_path))
|
||||||
|
possible_paths.append(os.path.join(runfiles_dir, relative_path))
|
||||||
|
|
||||||
|
if test_srcdir:
|
||||||
|
possible_paths.append(os.path.join(test_srcdir, '_main', relative_path))
|
||||||
|
possible_paths.append(os.path.join(test_srcdir, relative_path))
|
||||||
|
|
||||||
|
return possible_paths
|
||||||
Loading…
Reference in a new issue