Compare commits

...

6 commits

Author SHA1 Message Date
40d42e03dd Add plan for dsl graph generation
Some checks are pending
/ setup (push) Waiting to run
2025-08-01 20:17:56 -07:00
2ad4ae6d3c Add tests for dsl test app 2025-07-31 23:55:58 -07:00
ae5147cb36 Implement test app in python DSL 2025-07-31 22:42:07 -07:00
82e1d0eb26 Big bump 2025-07-31 19:59:13 -07:00
6d55d54267 Implement remaining test app jobs 2025-07-30 22:53:52 -07:00
63f9518486 Add graph and trailling job 2025-07-30 22:26:32 -07:00
51 changed files with 2490 additions and 2922 deletions

View file

@ -209,11 +209,29 @@ python.toolchain(
pip = use_extension("@rules_python//python/extensions:pip.bzl", "pip")
pip.parse(
hub_name = "pypi",
hub_name = "databuild_pypi",
python_version = "3.13",
requirements_lock = "//:requirements_lock.txt",
)
use_repo(pip, "pypi")
use_repo(pip, "databuild_pypi")
# OCI (Docker images)
oci = use_extension("@rules_oci//oci:extensions.bzl", "oci")
# Declare external images you need to pull
oci.pull(
name = "debian",
image = "docker.io/library/python",
platforms = [
"linux/arm64/v8",
"linux/amd64",
],
# Using a pinned version for reproducibility
tag = "3.12-bookworm",
)
# For each oci.pull call, repeat the "name" here to expose them as dependencies
use_repo(oci, "debian", "debian_linux_amd64", "debian_linux_arm64_v8")
# Ruff
# macOS ARM64 (Apple Silicon)

View file

@ -567,11 +567,54 @@
"@@rules_oci+//oci:extensions.bzl%oci": {
"general": {
"bzlTransitiveDigest": "KHcdN2ovRQGX1MKsH0nGoGPFd/84U43tssN2jImCeJU=",
"usagesDigest": "/O1PwnnkqSBmI9Oe08ZYYqjM4IS8JR+/9rjgzVTNDaQ=",
"usagesDigest": "Y6oSW43ZgWvZTMtL3eDjcxyo58BCPzyiFhH+D+xVgwM=",
"recordedFileInputs": {},
"recordedDirentsInputs": {},
"envVariables": {},
"generatedRepoSpecs": {
"debian_linux_arm64_v8": {
"repoRuleId": "@@rules_oci+//oci/private:pull.bzl%oci_pull",
"attributes": {
"www_authenticate_challenges": {},
"scheme": "https",
"registry": "index.docker.io",
"repository": "library/python",
"identifier": "3.12-bookworm",
"platform": "linux/arm64/v8",
"target_name": "debian_linux_arm64_v8",
"bazel_tags": []
}
},
"debian_linux_amd64": {
"repoRuleId": "@@rules_oci+//oci/private:pull.bzl%oci_pull",
"attributes": {
"www_authenticate_challenges": {},
"scheme": "https",
"registry": "index.docker.io",
"repository": "library/python",
"identifier": "3.12-bookworm",
"platform": "linux/amd64",
"target_name": "debian_linux_amd64",
"bazel_tags": []
}
},
"debian": {
"repoRuleId": "@@rules_oci+//oci/private:pull.bzl%oci_alias",
"attributes": {
"target_name": "debian",
"www_authenticate_challenges": {},
"scheme": "https",
"registry": "index.docker.io",
"repository": "library/python",
"identifier": "3.12-bookworm",
"platforms": {
"@@platforms//cpu:arm64": "@debian_linux_arm64_v8",
"@@platforms//cpu:x86_64": "@debian_linux_amd64"
},
"bzlmod_repository": "debian",
"reproducible": true
}
},
"oci_crane_darwin_amd64": {
"repoRuleId": "@@rules_oci+//oci:repositories.bzl%crane_repositories",
"attributes": {
@ -687,7 +730,11 @@
}
},
"moduleExtensionMetadata": {
"explicitRootModuleDirectDeps": [],
"explicitRootModuleDirectDeps": [
"debian",
"debian_linux_arm64_v8",
"debian_linux_amd64"
],
"explicitRootModuleDirectDevDeps": [],
"useAllRepos": "NO",
"reproducible": false

View file

@ -150,7 +150,7 @@ py_binary(
srcs = ["proto_wrapper.py"],
main = "proto_wrapper.py",
deps = [
"@pypi//betterproto2_compiler",
"@databuild_pypi//betterproto2_compiler",
],
)
@ -175,7 +175,7 @@ $(location @com_google_protobuf//:protoc) --python_betterproto2_out=$(GENDIR)/da
":protoc-gen-python_betterproto2",
"//:ruff_binary",
"@com_google_protobuf//:protoc",
"@pypi//betterproto2_compiler",
"@databuild_pypi//betterproto2_compiler",
],
)
@ -187,8 +187,8 @@ py_library(
],
visibility = ["//visibility:public"],
deps = [
"@pypi//betterproto2_compiler",
"@pypi//grpcio",
"@pypi//pytest",
"@databuild_pypi//betterproto2_compiler",
"@databuild_pypi//grpcio",
"@databuild_pypi//pytest",
],
)

View file

@ -3,5 +3,6 @@ py_library(
srcs = ["dsl.py"],
visibility = ["//visibility:public"],
deps = [
"//databuild:py_proto",
],
)

View file

@ -1,6 +1,7 @@
from databuild.proto import JobConfig, PartitionRef, DataDep, DepType
from typing import Self, Protocol, get_type_hints, get_origin, get_args
from dataclasses import fields, is_dataclass
from dataclasses import fields, is_dataclass, dataclass, field
import re
@ -58,21 +59,13 @@ class PartitionPattern:
return result
class JobConfig:
"""TODO need to generate this from databuild.proto"""
class PartitionManifest:
"""TODO need to generate this from databuild.proto"""
class DataBuildJob(Protocol):
# The types of partitions that this job produces
output_types: list[type[PartitionPattern]]
def config(self, outputs: list[PartitionPattern]) -> list[JobConfig]: ...
def exec(self, config: JobConfig) -> PartitionManifest: ...
def exec(self, config: JobConfig) -> None: ...
class DataBuildGraph:
@ -85,7 +78,54 @@ class DataBuildGraph:
for partition in cls.output_types:
assert partition not in self.lookup, f"Partition `{partition}` already registered"
self.lookup[partition] = cls
return cls
def generate_bazel_module(self):
"""Generates a complete databuild application, packaging up referenced jobs and this graph via bazel targets"""
raise NotImplementedError
@dataclass
class JobConfigBuilder:
outputs: list[PartitionRef] = field(default_factory=list)
inputs: list[DataDep] = field(default_factory=list)
args: list[str] = field(default_factory=list)
env: dict[str, str] = field(default_factory=dict)
def build(self) -> JobConfig:
return JobConfig(
outputs=self.outputs,
inputs=self.inputs,
args=self.args,
env=self.env,
)
def add_inputs(self, *partitions: PartitionPattern, dep_type: DepType=DepType.MATERIALIZE) -> Self:
for p in partitions:
dep_type_name = "materialize" if dep_type == DepType.MATERIALIZE else "query"
self.inputs.append(DataDep(dep_type_code=dep_type, dep_type_name=dep_type_name, partition_ref=PartitionRef(str=p.serialize())))
return self
def add_outputs(self, *partitions: PartitionPattern) -> Self:
for p in partitions:
self.outputs.append(PartitionRef(str=p.serialize()))
return self
def add_args(self, *args: str) -> Self:
self.args.extend(args)
return self
def set_args(self, args: list[str]) -> Self:
self.args = args
return self
def set_env(self, env: dict[str, str]) -> Self:
self.env = env
return self
def add_env(self, **kwargs) -> Self:
for k, v in kwargs.items():
assert isinstance(k, str), f"Expected a string key, got `{k}`"
assert isinstance(v, str), f"Expected a string key, got `{v}`"
self.env[k] = v
return self

View file

@ -3,6 +3,6 @@ py_test(
srcs = glob(["*.py"]),
deps = [
"//databuild/dsl/python:dsl",
"@pypi//pytest",
"@databuild_pypi//pytest",
],
)

View file

@ -1,5 +1,6 @@
from databuild.dsl.python.dsl import PartitionPattern, DataBuildGraph, DataBuildJob, JobConfig, PartitionManifest
from databuild.dsl.python.dsl import PartitionPattern, DataBuildGraph, DataBuildJob
from databuild.proto import JobConfig, PartitionManifest
from dataclasses import dataclass
import pytest
@ -45,7 +46,7 @@ def test_basic_graph_definition():
@graph.job
class TestJob(DataBuildJob):
output_types = [CategoryAnalysisPartition]
def exec(self, config: JobConfig) -> PartitionManifest: ...
def exec(self, config: JobConfig) -> None: ...
def config(self, outputs: list[PartitionPattern]) -> list[JobConfig]: ...
assert len(graph.lookup) == 1
@ -58,14 +59,15 @@ def test_graph_collision():
@graph.job
class TestJob1(DataBuildJob):
output_types = [CategoryAnalysisPartition]
def exec(self, config: JobConfig) -> PartitionManifest: ...
def exec(self, config: JobConfig) -> None: ...
def config(self, outputs: list[PartitionPattern]) -> list[JobConfig]: ...
with pytest.raises(AssertionError):
# Outputs the same partition, so should raise
@graph.job
class TestJob2(DataBuildJob):
output_types = [CategoryAnalysisPartition]
def exec(self, config: JobConfig) -> PartitionManifest: ...
def exec(self, config: JobConfig) -> None: ...
def config(self, outputs: list[PartitionPattern]) -> list[JobConfig]: ...

View file

@ -79,8 +79,11 @@ fn resolve(output_refs: &[String]) -> Result<HashMap<String, Vec<String>>, Strin
.map_err(|e| format!("Failed to execute job lookup: {}", e))?;
if !output.status.success() {
error!("Job lookup failed: {}", output.status);
let stderr = String::from_utf8_lossy(&output.stderr);
error!("Job lookup failed: {}", stderr);
error!("stderr: {}", stderr);
let stdout = String::from_utf8_lossy(&output.stdout);
error!("stdout: {}", stdout);
return Err(format!("Failed to run job lookup: {}", stderr));
}

View file

@ -1 +1,11 @@
from databuild.py_proto_out.databuild.v1 import *
from betterproto2 import Casing, OutputFormat
def to_dict(d) -> dict:
"""Helper for creating proper dicts from protobuf derived dataclasses."""
return d.to_dict(
casing=Casing.SNAKE,
output_format=OutputFormat.PYTHON,
include_default_values=True
)

View file

@ -4,6 +4,8 @@ load("@rules_oci//oci:defs.bzl", "oci_image", "oci_load")
RUNFILES_PREFIX = """
# ================= BEGIN RUNFILES INIT =================
SCRIPT_PATH="$(realpath "$0")"
# TODO should this be extracted to shared init script
# Get the directory where the script is located
if [[ -z "${RUNFILES_DIR:-}" ]]; then
@ -71,6 +73,7 @@ def _databuild_job_cfg_impl(ctx):
output = script,
substitutions = {
"%{EXECUTABLE_PATH}": configure_path,
"%{EXECUTABLE_SHORT_PATH}": ctx.attr.configure.files_to_run.executable.short_path,
"%{RUNFILES_PREFIX}": RUNFILES_PREFIX,
"%{PREFIX}": "EXECUTABLE_SUBCOMMAND=\"config\"\n",
},
@ -331,6 +334,7 @@ def _databuild_graph_lookup_impl(ctx):
"%{RUNFILES_PREFIX}": RUNFILES_PREFIX,
"%{PREFIX}": "",
"%{EXECUTABLE_PATH}": ctx.attr.lookup.files_to_run.executable.path,
"%{EXECUTABLE_SHORT_PATH}": ctx.attr.lookup.files_to_run.executable.short_path,
},
is_executable = True,
)
@ -399,6 +403,7 @@ export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path})
output = script,
substitutions = {
"%{EXECUTABLE_PATH}": ctx.attr._analyze.files_to_run.executable.path,
"%{EXECUTABLE_SHORT_PATH}": ctx.attr._analyze.files_to_run.executable.short_path,
"%{RUNFILES_PREFIX}": RUNFILES_PREFIX,
"%{PREFIX}": script_prefix,
},

View file

@ -5,7 +5,32 @@ set -e
%{PREFIX}
EXECUTABLE_BINARY="$(rlocation "_main/$(basename "%{EXECUTABLE_PATH}")")"
# Check if rlocation function is available
if ! type rlocation >/dev/null 2>&1; then
echo "Error: rlocation function not available. Runfiles may not be properly initialized." >&2
exit 1
fi
# Resolve the executable using rlocation
EXECUTABLE_BINARY="$(rlocation "_main/%{EXECUTABLE_SHORT_PATH}")"
# Check if rlocation returned something
if [[ -z "${EXECUTABLE_BINARY}" ]]; then
echo "Error: rlocation returned empty result for '_main/%{EXECUTABLE_SHORT_PATH}'" >&2
exit 1
fi
# Check if the resolved binary exists
if [[ ! -f "${EXECUTABLE_BINARY}" ]]; then
echo "Error: Resolved executable '${EXECUTABLE_BINARY}' does not exist" >&2
exit 1
fi
# Check if the resolved binary is executable
if [[ ! -x "${EXECUTABLE_BINARY}" ]]; then
echo "Error: Resolved executable '${EXECUTABLE_BINARY}' is not executable" >&2
exit 1
fi
# Run the configuration
if [[ -n "${EXECUTABLE_SUBCOMMAND:-}" ]]; then

View file

@ -1,25 +1,9 @@
load("//databuild:rules.bzl", "databuild_graph", "databuild_job")
py_library(
name = "job_src",
srcs = glob(["**/*.py"]),
visibility = ["//visibility:public"],
deps = ["//databuild:py_proto"],
deps = [
"//databuild:py_proto",
"//databuild/dsl/python:dsl",
],
)
# Tests
py_test(
name = "test",
srcs = glob(["**/test.py"]),
deps = [":job_src"],
)
# Bazel-defined
#databuild_job(
# name = "ingest_color_votes",
#)
# Python-DSL-defined
# TODO

View file

@ -0,0 +1,149 @@
load("//databuild:rules.bzl", "databuild_graph", "databuild_job")
py_library(
name = "job_src",
srcs = glob(["**/*.py"]),
visibility = ["//visibility:public"],
deps = [
"//databuild:py_proto",
"//databuild/dsl/python:dsl",
],
)
# Tests
py_test(
name = "test_trailing_color_votes",
srcs = ["jobs/trailing_color_votes/test.py"],
main = "jobs/trailing_color_votes/test.py",
deps = [
":job_src",
"//databuild/test/app:job_src",
],
)
py_test(
name = "test_ingest_color_votes",
srcs = ["jobs/ingest_color_votes/test.py"],
main = "jobs/ingest_color_votes/test.py",
deps = [
":job_src",
"//databuild/test/app:job_src",
],
)
py_test(
name = "test_aggregate_color_votes",
srcs = ["jobs/aggregate_color_votes/test.py"],
main = "jobs/aggregate_color_votes/test.py",
deps = [
":job_src",
"//databuild/test/app:job_src",
],
)
py_test(
name = "test_color_vote_report_calc",
srcs = ["jobs/color_vote_report_calc/test.py"],
main = "jobs/color_vote_report_calc/test.py",
deps = [
":job_src",
"//databuild/test/app:job_src",
],
)
py_test(
name = "test_graph_analysis",
srcs = ["graph/graph_test.py"],
data = [
":bazel_graph.analyze",
":bazel_graph_lookup",
],
main = "graph/graph_test.py",
deps = [
":job_src",
"//databuild/test/app:job_src",
],
)
# Bazel-defined
## Graph
databuild_graph(
name = "bazel_graph",
jobs = [
":ingest_color_votes",
":trailing_color_votes",
":aggregate_color_votes",
":color_vote_report_calc",
],
lookup = ":bazel_graph_lookup",
)
py_binary(
name = "bazel_graph_lookup",
srcs = ["graph/lookup.py"],
main = "graph/lookup.py",
)
## Ingest Color Votes
databuild_job(
name = "ingest_color_votes",
binary = ":ingest_color_votes_binary",
)
py_binary(
name = "ingest_color_votes_binary",
srcs = ["jobs/ingest_color_votes/main.py"],
main = "jobs/ingest_color_votes/main.py",
deps = [
":job_src",
"//databuild/test/app:job_src",
],
)
## Trailing Color Votes
databuild_job(
name = "trailing_color_votes",
binary = ":trailing_color_votes_binary",
)
py_binary(
name = "trailing_color_votes_binary",
srcs = ["jobs/trailing_color_votes/main.py"],
main = "jobs/trailing_color_votes/main.py",
deps = [
":job_src",
"//databuild/test/app:job_src",
],
)
## Aggregate Color Votes
databuild_job(
name = "aggregate_color_votes",
binary = ":aggregate_color_votes_binary",
)
py_binary(
name = "aggregate_color_votes_binary",
srcs = ["jobs/aggregate_color_votes/main.py"],
main = "jobs/aggregate_color_votes/main.py",
deps = [
":job_src",
"//databuild/test/app:job_src",
],
)
## Color Vote Report Calc
databuild_job(
name = "color_vote_report_calc",
binary = ":color_vote_report_calc_binary",
)
py_binary(
name = "color_vote_report_calc_binary",
srcs = ["jobs/color_vote_report_calc/main.py"],
main = "jobs/color_vote_report_calc/main.py",
deps = [
":job_src",
"//databuild/test/app:job_src",
],
)

View file

@ -0,0 +1,4 @@
# Bazel-Based Graph Definition
The bazel-based graph definition relies on declaring `databuild_job` and `databuild_graph` targets which reference binaries.

View file

@ -0,0 +1,91 @@
#!/usr/bin/env python3
"""
Integration test for the databuild graph analysis.
This test verifies that when we request color vote reports, the graph analyzer
correctly identifies all upstream dependencies and jobs required.
"""
import subprocess
import json
import unittest
import os
from pathlib import Path
class GraphAnalysisTest(unittest.TestCase):
def setUp(self):
# Determine the path to bazel_graph.analyze
# In bazel test, we need to find the executable in the runfiles
runfiles_dir = os.environ.get('RUNFILES_DIR')
test_srcdir = os.environ.get('TEST_SRCDIR')
possible_paths = []
if runfiles_dir:
possible_paths.append(os.path.join(runfiles_dir, '_main', 'databuild', 'test', 'app', 'bazel_graph.analyze'))
possible_paths.append(os.path.join(runfiles_dir, 'databuild', 'test', 'app', 'bazel_graph.analyze'))
if test_srcdir:
possible_paths.append(os.path.join(test_srcdir, '_main', 'databuild', 'test', 'app', 'bazel_graph.analyze'))
possible_paths.append(os.path.join(test_srcdir, 'databuild', 'test', 'app', 'bazel_graph.analyze'))
# Fallback for local testing
possible_paths.extend([
'bazel-bin/databuild/test/app/bazel_graph.analyze',
'./bazel_graph.analyze'
])
self.graph_analyze = None
for path in possible_paths:
if os.path.exists(path):
self.graph_analyze = path
break
# Ensure the executable exists
if not self.graph_analyze:
self.skipTest(f"Graph analyze executable not found in any of these paths: {possible_paths}")
def run_graph_analyze(self, partition_refs):
"""Run graph.analyze with the given partition references."""
cmd = [self.graph_analyze] + partition_refs
result = subprocess.run(cmd, capture_output=True, text=True, cwd=os.getcwd())
if result.returncode != 0:
self.fail(f"Graph analyze failed with return code {result.returncode}.\nStdout: {result.stdout}\nStderr: {result.stderr}")
# Parse the JSON output
try:
return json.loads(result.stdout)
except json.JSONDecodeError as e:
self.fail(f"Failed to parse JSON output: {e}\nOutput: {result.stdout}")
def test_single_color_report_dependencies(self):
"""Test dependencies for a single color vote report."""
partition_refs = ["color_vote_report/2024-01-15/red"]
result = self.run_graph_analyze(partition_refs)
self.assertIn('nodes', result)
# TODO expand
def test_multiple_color_reports_same_date(self):
"""Test dependencies when requesting multiple colors for the same date."""
partition_refs = [
"color_vote_report/2024-01-15/red",
"color_vote_report/2024-01-15/blue"
]
result = self.run_graph_analyze(partition_refs)
self.assertIn('nodes', result)
# TODO expand
def test_multiple_dates_dependencies(self):
"""Test dependencies when requesting reports for different dates."""
partition_refs = [
"color_vote_report/2024-01-15/red",
"color_vote_report/2024-01-16/red"
]
result = self.run_graph_analyze(partition_refs)
self.assertIn('nodes', result)
# TODO expand
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,29 @@
#!/usr/bin/env python3
from collections import defaultdict
import sys
import json
LABEL_BASE = "//databuild/test/app"
def lookup(raw_ref: str):
if raw_ref.startswith("daily_color_votes"):
return LABEL_BASE + ":ingest_color_votes"
elif raw_ref.startswith("color_votes_1"):
return LABEL_BASE + ":trailing_color_votes"
elif raw_ref.startswith("daily_votes") or raw_ref.startswith("votes_1w") or raw_ref.startswith("votes_1m"):
return LABEL_BASE + ":aggregate_color_votes"
elif raw_ref.startswith("color_vote_report"):
return LABEL_BASE + ":color_vote_report_calc"
else:
raise ValueError(f"Unable to resolve job for partition: `{raw_ref}`")
if __name__ == "__main__":
results = defaultdict(list)
for raw_ref in sys.argv[1:]:
results[lookup(raw_ref)].append(raw_ref)
# Output the results as JSON
print(json.dumps(dict(results)))

View file

View file

@ -0,0 +1 @@
jobs/aggregate_color_votes/README.md

View file

@ -0,0 +1,42 @@
from databuild.proto import PartitionRef, JobConfigureResponse, JobConfig, DepType, DataDep
from databuild.test.app.colors import COLORS
from datetime import date
def configure(outputs: list[PartitionRef]) -> JobConfigureResponse:
configs = []
for output in outputs:
parts = output.str.split("/")
if len(parts) == 2:
output_type, data_date = parts
date.fromisoformat(data_date) # Validate date format
# Determine input type based on output type
if output_type == "daily_votes":
input_prefix = "daily_color_votes"
elif output_type == "votes_1w":
input_prefix = "color_votes_1w"
elif output_type == "votes_1m":
input_prefix = "color_votes_1m"
else:
raise ValueError(f"Unknown output type: {output_type}")
# Create inputs for all colors
inputs = []
for color in COLORS:
input_ref = PartitionRef(str=f"{input_prefix}/{data_date}/{color}")
inputs.append(input_ref)
configs.append(JobConfig(
outputs=[output],
inputs=[DataDep(dep_type_code=DepType.MATERIALIZE, dep_type_name="materialize", partition_ref=ref) for ref in inputs],
args=[],
env={
"DATA_DATE": data_date,
"AGGREGATE_TYPE": output_type
}
))
else:
raise ValueError(f"Invalid output partition format: {output.str}")
return JobConfigureResponse(configs=configs)

View file

@ -0,0 +1,20 @@
"""Main entrypoint for the aggregate_color_votes job for use with bazel-defined graph."""
import sys
import os
import json
from databuild.proto import PartitionRef, to_dict
from databuild.test.app.bazel.jobs.aggregate_color_votes.config import configure
from databuild.test.app.jobs.aggregate_color_votes.execute import execute
if __name__ == "__main__":
if sys.argv[1] == "config":
response = configure([
PartitionRef(str=raw_ref)
for raw_ref in sys.argv[2:]
])
print(json.dumps(to_dict(response)))
elif sys.argv[1] == "exec":
execute(os.environ["DATA_DATE"], os.environ["AGGREGATE_TYPE"])
else:
raise Exception(f"Invalid command `{sys.argv[1]}`")

View file

@ -0,0 +1,59 @@
import unittest
from databuild.proto import PartitionRef
from databuild.test.app.bazel.jobs.aggregate_color_votes.config import configure
from databuild.test.app.colors import COLORS
class TestAggregateColorVotesConfig(unittest.TestCase):
def test_configure_daily_votes(self):
outputs = [PartitionRef(str="daily_votes/2024-01-15")]
response = configure(outputs)
self.assertEqual(len(response.configs), 1)
config = response.configs[0]
self.assertEqual(len(config.outputs), 1)
self.assertEqual(len(config.inputs), len(COLORS)) # One input per color
self.assertEqual(config.env["AGGREGATE_TYPE"], "daily_votes")
self.assertEqual(config.env["DATA_DATE"], "2024-01-15")
# Check that inputs are from daily_color_votes
for i, color in enumerate(COLORS):
expected_input = f"daily_color_votes/2024-01-15/{color}"
self.assertEqual(config.inputs[i].partition_ref.str, expected_input)
def test_configure_weekly_votes(self):
outputs = [PartitionRef(str="votes_1w/2024-01-21")]
response = configure(outputs)
self.assertEqual(len(response.configs), 1)
config = response.configs[0]
self.assertEqual(config.env["AGGREGATE_TYPE"], "votes_1w")
# Check that inputs are from color_votes_1w
for i, color in enumerate(COLORS):
expected_input = f"color_votes_1w/2024-01-21/{color}"
self.assertEqual(config.inputs[i].partition_ref.str, expected_input)
def test_configure_monthly_votes(self):
outputs = [PartitionRef(str="votes_1m/2024-01-31")]
response = configure(outputs)
self.assertEqual(len(response.configs), 1)
config = response.configs[0]
self.assertEqual(config.env["AGGREGATE_TYPE"], "votes_1m")
# Check that inputs are from color_votes_1m
for i, color in enumerate(COLORS):
expected_input = f"color_votes_1m/2024-01-31/{color}"
self.assertEqual(config.inputs[i].partition_ref.str, expected_input)
def test_configure_multiple_outputs(self):
outputs = [
PartitionRef(str="daily_votes/2024-01-15"),
PartitionRef(str="votes_1w/2024-01-21")
]
response = configure(outputs)
self.assertEqual(len(response.configs), 2) # One config per output
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1 @@
jobs/color_vote_report_calc/README.md

View file

@ -0,0 +1,48 @@
from databuild.proto import PartitionRef, JobConfigureResponse, JobConfig, DataDep, DepType
from datetime import date
from collections import defaultdict
def configure(outputs: list[PartitionRef]) -> JobConfigureResponse:
# This job produces a single job config that handles all requested outputs
all_dates = set()
all_colors = set()
for output in outputs:
parts = output.str.split("/")
if len(parts) == 3 and parts[0] == "color_vote_report":
prefix, data_date, color = parts
date.fromisoformat(data_date) # Validate date format
all_dates.add(data_date)
all_colors.add(color)
else:
raise ValueError(f"Invalid output partition format: {output.str}")
# Build inputs for all dates and colors that are actually requested
inputs = []
# Add total vote aggregates for all dates
for data_date in all_dates:
inputs.extend([
PartitionRef(str=f"daily_votes/{data_date}"),
PartitionRef(str=f"votes_1w/{data_date}"),
PartitionRef(str=f"votes_1m/{data_date}")
])
# Add color-specific inputs for all date/color combinations that are requested
for output in outputs:
data_date, color = output.str.split("/")[1], output.str.split("/")[2]
inputs.extend([
PartitionRef(str=f"daily_color_votes/{data_date}/{color}"),
PartitionRef(str=f"color_votes_1w/{data_date}/{color}"),
PartitionRef(str=f"color_votes_1m/{data_date}/{color}")
])
# Single job config for all outputs - pass output partition refs as args
config = JobConfig(
outputs=outputs,
inputs=[DataDep(dep_type_code=DepType.MATERIALIZE, dep_type_name="materialize", partition_ref=ref) for ref in inputs],
args=[output.str for output in outputs],
env={}
)
return JobConfigureResponse(configs=[config])

View file

@ -0,0 +1,20 @@
"""Main entrypoint for the color_vote_report_calc job for use with bazel-defined graph."""
import sys
import os
import json
from databuild.proto import PartitionRef, to_dict
from databuild.test.app.bazel.jobs.color_vote_report_calc.config import configure
from databuild.test.app.jobs.color_vote_report_calc.execute import execute
if __name__ == "__main__":
if sys.argv[1] == "config":
response = configure([
PartitionRef(str=raw_ref)
for raw_ref in sys.argv[2:]
])
print(json.dumps(to_dict(response)))
elif sys.argv[1] == "exec":
execute(sys.argv[2:])
else:
raise Exception(f"Invalid command `{sys.argv[1]}`")

View file

@ -0,0 +1,60 @@
import unittest
from databuild.proto import PartitionRef
from databuild.test.app.bazel.jobs.color_vote_report_calc.config import configure
class TestColorVoteReportCalcConfig(unittest.TestCase):
def test_configure_single_output(self):
outputs = [PartitionRef(str="color_vote_report/2024-01-15/red")]
response = configure(outputs)
self.assertEqual(len(response.configs), 1) # Always single config
config = response.configs[0]
self.assertEqual(len(config.outputs), 1)
self.assertEqual(config.args, ["color_vote_report/2024-01-15/red"])
# Should have inputs for total votes and color-specific votes
expected_inputs = [
"daily_votes/2024-01-15",
"votes_1w/2024-01-15",
"votes_1m/2024-01-15",
"daily_color_votes/2024-01-15/red",
"color_votes_1w/2024-01-15/red",
"color_votes_1m/2024-01-15/red"
]
actual_inputs = [inp.partition_ref.str for inp in config.inputs]
for expected in expected_inputs:
self.assertIn(expected, actual_inputs)
def test_configure_multiple_outputs_same_date(self):
outputs = [
PartitionRef(str="color_vote_report/2024-01-15/red"),
PartitionRef(str="color_vote_report/2024-01-15/blue")
]
response = configure(outputs)
self.assertEqual(len(response.configs), 1) # Single config for all outputs
config = response.configs[0]
self.assertEqual(len(config.outputs), 2)
self.assertEqual(set(config.args), {
"color_vote_report/2024-01-15/red",
"color_vote_report/2024-01-15/blue"
})
def test_configure_multiple_dates(self):
outputs = [
PartitionRef(str="color_vote_report/2024-01-15/red"),
PartitionRef(str="color_vote_report/2024-01-16/red")
]
response = configure(outputs)
self.assertEqual(len(response.configs), 1) # Single config for all outputs
config = response.configs[0]
self.assertEqual(len(config.outputs), 2)
# Should have total vote inputs for both dates
actual_inputs = [inp.partition_ref.str for inp in config.inputs]
self.assertIn("daily_votes/2024-01-15", actual_inputs)
self.assertIn("daily_votes/2024-01-16", actual_inputs)
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1 @@
jobs/ingest_color_votes/README.md

View file

@ -2,17 +2,19 @@
import sys
import os
from databuild.proto import PartitionRef
from databuild.test.app.jobs.ingest_color_votes.config import configure
import json
from databuild.proto import PartitionRef, to_dict
from databuild.test.app.bazel.jobs.ingest_color_votes.config import configure
from databuild.test.app.jobs.ingest_color_votes.execute import execute
if __name__ == "__main__":
if sys.argv[1] == "config":
configure([
response = configure([
PartitionRef(str=raw_ref)
for raw_ref in sys.argv[2:]
])
elif sys.argv[1] == "execute":
print(json.dumps(to_dict(response)))
elif sys.argv[1] == "exec":
execute(os.environ["DATA_DATE"], os.environ["COLOR"])
else:
raise Exception(f"Invalid command `{sys.argv[1]}`")

View file

@ -0,0 +1,32 @@
from databuild.test.app.bazel.jobs.ingest_color_votes.config import configure
from databuild.proto import PartitionRef
def test_ingest_color_votes_configure():
refs_single = [PartitionRef(str="daily_color_votes/2025-01-01/red")]
config_single = configure(refs_single)
assert len(config_single.configs) == 1
assert config_single.configs[0].outputs[0].str == "daily_color_votes/2025-01-01/red"
assert config_single.configs[0].env["COLOR"] == "red"
assert config_single.configs[0].env["DATA_DATE"] == "2025-01-01"
refs_multiple = [
PartitionRef(str="daily_color_votes/2025-01-02/red"),
PartitionRef(str="daily_color_votes/2025-01-02/blue"),
]
config_multiple = configure(refs_multiple)
assert len(config_multiple.configs) == 2
assert len(config_multiple.configs[0].outputs) == 1
assert config_multiple.configs[0].outputs[0].str == "daily_color_votes/2025-01-02/red"
assert config_multiple.configs[0].env["COLOR"] == "red"
assert config_multiple.configs[0].env["DATA_DATE"] == "2025-01-02"
assert len(config_multiple.configs[1].outputs) == 1
assert config_multiple.configs[1].outputs[0].str == "daily_color_votes/2025-01-02/blue"
assert config_multiple.configs[1].env["COLOR"] == "blue"
assert config_multiple.configs[1].env["DATA_DATE"] == "2025-01-02"
if __name__ == '__main__':
import pytest
raise SystemExit(pytest.main([__file__]))

View file

@ -0,0 +1 @@
jobs/trailing_color_votes/README.md

View file

@ -0,0 +1,46 @@
from databuild.proto import PartitionRef, JobConfigureResponse, JobConfig, DepType, DataDep
from datetime import date, timedelta
from collections import defaultdict
def configure(outputs: list[PartitionRef]) -> JobConfigureResponse:
# Group outputs by date and color
grouped_outputs = defaultdict(list)
for output in outputs:
parts = output.str.split("/")
if len(parts) == 3 and parts[0] in ["color_votes_1w", "color_votes_1m"]:
grouped_outputs[tuple(parts[1:])].append(output)
else:
raise ValueError(f"Invalid output partition format: {output.str}")
configs = []
for (data_date, color), output_partitions in grouped_outputs.items():
# Parse the output date
output_date = date.fromisoformat(data_date)
# Determine which windows are needed and the maximum window
has_weekly = any(output.str.startswith("color_votes_1w/") for output in output_partitions)
has_monthly = any(output.str.startswith("color_votes_1m/") for output in output_partitions)
max_window = max(7 if has_weekly else 0, 28 if has_monthly else 0)
# Generate input partition refs for the required trailing window
inputs = []
for i in range(max_window):
input_date = output_date - timedelta(days=i)
inputs.append(PartitionRef(str=f"daily_color_votes/{input_date.isoformat()}/{color}"))
env = {
"DATA_DATE": data_date,
"COLOR": color,
"WEEKLY": "true" if has_weekly else "false",
"MONTHLY": "true" if has_monthly else "false"
}
configs.append(JobConfig(
outputs=output_partitions,
inputs=[DataDep(dep_type_code=DepType.MATERIALIZE, dep_type_name="materialize", partition_ref=ref) for ref in inputs],
args=[],
env=env
))
return JobConfigureResponse(configs=configs)

View file

@ -0,0 +1,20 @@
"""Main entrypoint for the trailing_color_votes job for use with bazel-defined graph."""
import sys
import os
import json
from databuild.proto import PartitionRef, to_dict
from databuild.test.app.bazel.jobs.trailing_color_votes.config import configure
from databuild.test.app.jobs.trailing_color_votes.execute import execute
if __name__ == "__main__":
if sys.argv[1] == "config":
response = configure([
PartitionRef(str=raw_ref)
for raw_ref in sys.argv[2:]
])
print(json.dumps(to_dict(response)))
elif sys.argv[1] == "exec":
execute(os.environ["DATA_DATE"], os.environ["COLOR"])
else:
raise Exception(f"Invalid command `{sys.argv[1]}`")

View file

@ -0,0 +1,53 @@
import unittest
from databuild.proto import PartitionRef
from databuild.test.app.bazel.jobs.trailing_color_votes.config import configure
class TestTrailingColorVotesConfig(unittest.TestCase):
def test_configure_weekly_only(self):
outputs = [PartitionRef(str="color_votes_1w/2024-01-07/red")]
response = configure(outputs)
self.assertEqual(len(response.configs), 1)
config = response.configs[0]
self.assertEqual(len(config.outputs), 1)
self.assertEqual(len(config.inputs), 7) # 7 days for weekly
self.assertEqual(config.env["WEEKLY"], "true")
self.assertEqual(config.env["MONTHLY"], "false")
def test_configure_monthly_only(self):
outputs = [PartitionRef(str="color_votes_1m/2024-01-28/blue")]
response = configure(outputs)
self.assertEqual(len(response.configs), 1)
config = response.configs[0]
self.assertEqual(len(config.outputs), 1)
self.assertEqual(len(config.inputs), 28) # 28 days for monthly
self.assertEqual(config.env["WEEKLY"], "false")
self.assertEqual(config.env["MONTHLY"], "true")
def test_configure_both_weekly_and_monthly(self):
outputs = [
PartitionRef(str="color_votes_1w/2024-01-28/green"),
PartitionRef(str="color_votes_1m/2024-01-28/green")
]
response = configure(outputs)
self.assertEqual(len(response.configs), 1) # Single config for same date/color
config = response.configs[0]
self.assertEqual(len(config.outputs), 2) # Both outputs
self.assertEqual(len(config.inputs), 28) # 28 days (max of 7 and 28)
self.assertEqual(config.env["WEEKLY"], "true")
self.assertEqual(config.env["MONTHLY"], "true")
def test_configure_multiple_colors_dates(self):
outputs = [
PartitionRef(str="color_votes_1w/2024-01-07/red"),
PartitionRef(str="color_votes_1w/2024-01-07/blue"),
PartitionRef(str="color_votes_1m/2024-01-14/red")
]
response = configure(outputs)
self.assertEqual(len(response.configs), 3) # One config per unique date/color combination
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,13 @@
py_library(
name = "dsl_src",
srcs = glob(
["*.py"],
exclude = ["test_*.py"],
),
visibility = ["//visibility:public"],
deps = [
"//databuild:py_proto",
"//databuild/dsl/python:dsl",
"//databuild/test/app:job_src",
],
)

View file

@ -0,0 +1,130 @@
"""Python DSL implementation of test app"""
from collections import defaultdict
from databuild.dsl.python.dsl import DataBuildGraph, DataBuildJob, JobConfigBuilder
from databuild.proto import JobConfig
from databuild.test.app.colors import COLORS
from databuild.test.app.jobs.ingest_color_votes.execute import execute as ingest_color_votes_exec
from databuild.test.app.jobs.trailing_color_votes.execute import execute as trailing_color_votes_exec
from databuild.test.app.jobs.aggregate_color_votes.execute import execute as aggregate_color_votes_exec
from databuild.test.app.jobs.color_vote_report_calc.execute import execute as color_vote_report_calc_exec
from databuild.test.app.dsl.partitions import (
IngestedColorPartition,
TrailingColorVotes1MPartition,
TrailingColorVotes1WPartition,
DailyVotesPartition,
Votes1WPartition,
Votes1MPartition,
ColorVoteReportPartition
)
from datetime import date, timedelta
graph = DataBuildGraph("//databuild/test/app:dsl_graph")
@graph.job
class IngestColorVotes(DataBuildJob):
output_types = [IngestedColorPartition]
def config(self, outputs: list[IngestedColorPartition]) -> list[JobConfig]:
configs = []
for output in outputs:
env = {"DATA_DATE": output.data_date, "COLOR": output.color}
configs.append(JobConfigBuilder().add_outputs(output).set_env(env).build())
return configs
def exec(self, config: JobConfig) -> None:
ingest_color_votes_exec(data_date=config.env["DATA_DATE"], color=config.env["COLOR"])
@graph.job
class TrailingColorVotes(DataBuildJob):
output_types = [TrailingColorVotes1MPartition, TrailingColorVotes1WPartition]
def config(self, outputs: list[TrailingColorVotes1MPartition | TrailingColorVotes1WPartition]) -> list[JobConfig]:
groups = defaultdict(list)
for output in outputs:
groups[(output.data_date, output.color)].append(output)
configs = []
for (data_date, color), outputs in groups.items():
weekly = "false"
monthly = "false"
max_window = 0
for output in outputs:
if isinstance(output, TrailingColorVotes1WPartition):
weekly = "true"
max_window = max(max_window, 7)
elif isinstance(output, TrailingColorVotes1MPartition):
monthly = "true"
max_window = max(max_window, 28)
env = {"DATA_DATE": data_date, "COLOR": color, "WEEKLY": weekly, "MONTHLY": monthly}
config = JobConfigBuilder(env=env).add_outputs(*outputs)
for i in range(max_window):
in_date = (date.fromisoformat(data_date) - timedelta(days=i)).isoformat()
config.add_inputs(IngestedColorPartition(data_date=in_date, color=color))
configs.append(config.build())
return configs
def exec(self, config: JobConfig) -> None:
trailing_color_votes_exec(data_date=config.env["DATA_DATE"], color=config.env["COLOR"])
@graph.job
class AggregateColorVotes(DataBuildJob):
output_types = [DailyVotesPartition, Votes1WPartition, Votes1MPartition]
def config(self, outputs: list[DailyVotesPartition | Votes1WPartition | Votes1MPartition]) -> list[JobConfig]:
configs = []
for output in outputs:
if isinstance(output, DailyVotesPartition):
InPartition = IngestedColorPartition
agg_type = "daily_votes"
elif isinstance(output, Votes1WPartition):
InPartition = TrailingColorVotes1WPartition
agg_type = "votes_1w"
elif isinstance(output, Votes1MPartition):
InPartition = TrailingColorVotes1MPartition
agg_type = "votes_1m"
else:
raise ValueError(f"Unknown output type: {output.type}")
inputs = [InPartition(data_date=output.data_date, color=color) for color in COLORS]
env = {"DATA_DATE": output.data_date, "AGGREGATE_TYPE": agg_type}
configs.append(JobConfigBuilder().add_outputs(output).add_inputs(*inputs).set_env(env).build())
return configs
def exec(self, config: JobConfig) -> None:
aggregate_color_votes_exec(data_date=config.env["DATA_DATE"], aggregate_type=config.env["AGGREGATE_TYPE"])
@graph.job
class ColorVoteReportCalc(DataBuildJob):
output_types = [ColorVoteReportPartition]
def config(self, outputs: list[ColorVoteReportPartition]) -> list[JobConfig]:
config = JobConfigBuilder().add_outputs(*outputs).add_args(*[p.serialize() for p in outputs])
for data_date in set(p.data_date for p in outputs):
config.add_inputs(
DailyVotesPartition(data_date=data_date),
Votes1WPartition(data_date=data_date),
Votes1MPartition(data_date=data_date),
)
for output in outputs:
config.add_inputs(
IngestedColorPartition(data_date=output.data_date, color=output.color),
TrailingColorVotes1WPartition(data_date=output.data_date, color=output.color),
TrailingColorVotes1MPartition(data_date=output.data_date, color=output.color),
)
return [config.build()]
def exec(self, config: JobConfig) -> None:
color_vote_report_calc_exec(config.args)

View file

@ -0,0 +1,40 @@
from dataclasses import dataclass
from databuild.dsl.python.dsl import PartitionPattern
@dataclass
class DatePartitioned:
data_date: str
@dataclass
class DateColorPartitioned:
data_date: str
color: str
class IngestedColorPartition(DateColorPartitioned, PartitionPattern):
_raw_pattern = r"daily_color_votes/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)"
class TrailingColorVotes1WPartition(DateColorPartitioned, PartitionPattern):
_raw_pattern = r"color_votes_1w/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)"
class TrailingColorVotes1MPartition(DateColorPartitioned, PartitionPattern):
_raw_pattern = r"color_votes_1m/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)"
class DailyVotesPartition(DatePartitioned, PartitionPattern):
_raw_pattern = r"daily_votes/(?P<data_date>\d{4}-\d{2}-\d{2})"
class Votes1WPartition(DatePartitioned, PartitionPattern):
_raw_pattern = r"votes_1w/(?P<data_date>\d{4}-\d{2}-\d{2})"
class Votes1MPartition(DatePartitioned, PartitionPattern):
_raw_pattern = r"votes_1m/(?P<data_date>\d{4}-\d{2}-\d{2})"
class ColorVoteReportPartition(DateColorPartitioned, PartitionPattern):
_raw_pattern = r"color_vote_report/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)"

View file

@ -0,0 +1,75 @@
# Individual job configuration tests
py_test(
name = "test_ingest_color_votes",
srcs = ["test_ingest_color_votes.py"],
main = "test_ingest_color_votes.py",
deps = [
"//databuild:py_proto",
"//databuild/dsl/python:dsl",
"//databuild/test/app:job_src",
"//databuild/test/app/dsl:dsl_src",
],
)
py_test(
name = "test_trailing_color_votes",
srcs = ["test_trailing_color_votes.py"],
main = "test_trailing_color_votes.py",
deps = [
"//databuild:py_proto",
"//databuild/dsl/python:dsl",
"//databuild/test/app:job_src",
"//databuild/test/app/dsl:dsl_src",
],
)
py_test(
name = "test_aggregate_color_votes",
srcs = ["test_aggregate_color_votes.py"],
main = "test_aggregate_color_votes.py",
deps = [
"//databuild:py_proto",
"//databuild/dsl/python:dsl",
"//databuild/test/app:job_src",
"//databuild/test/app/dsl:dsl_src",
],
)
py_test(
name = "test_color_vote_report_calc",
srcs = ["test_color_vote_report_calc.py"],
main = "test_color_vote_report_calc.py",
deps = [
"//databuild:py_proto",
"//databuild/dsl/python:dsl",
"//databuild/test/app:job_src",
"//databuild/test/app/dsl:dsl_src",
],
)
# Graph analysis test
py_test(
name = "test_graph_analysis",
srcs = ["test_graph_analysis.py"],
main = "test_graph_analysis.py",
deps = [
"//databuild:py_proto",
"//databuild/dsl/python:dsl",
"//databuild/test/app:job_src",
"//databuild/test/app/dsl:dsl_src",
],
)
# Bazel vs DSL comparison test
py_test(
name = "test_bazel_dsl_comparison",
srcs = ["test_bazel_dsl_comparison.py"],
main = "test_bazel_dsl_comparison.py",
deps = [
"//databuild:py_proto",
"//databuild/dsl/python:dsl",
"//databuild/test/app:job_src",
"//databuild/test/app/bazel:job_src",
"//databuild/test/app/dsl:dsl_src",
],
)

View file

@ -0,0 +1,159 @@
from databuild.test.app.dsl.graph import AggregateColorVotes
from databuild.test.app.dsl.partitions import (
DailyVotesPartition,
Votes1WPartition,
Votes1MPartition,
IngestedColorPartition,
TrailingColorVotes1WPartition,
TrailingColorVotes1MPartition
)
from databuild.test.app.colors import COLORS
from databuild.proto import DepType
def test_aggregate_color_votes_configure_daily_votes():
"""Test AggregateColorVotes config method with daily votes output."""
job = AggregateColorVotes()
outputs = [DailyVotesPartition(data_date="2025-01-15")]
configs = job.config(outputs)
assert len(configs) == 1
config = configs[0]
assert len(config.outputs) == 1
assert config.outputs[0].str == "daily_votes/2025-01-15"
assert config.env["DATA_DATE"] == "2025-01-15"
assert config.env["AGGREGATE_TYPE"] == "daily_votes"
# Should have inputs for all colors
assert len(config.inputs) == len(COLORS)
expected_inputs = {f"daily_color_votes/2025-01-15/{color}" for color in COLORS}
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
assert actual_inputs == expected_inputs
# All inputs should be MATERIALIZE type
for input_dep in config.inputs:
assert input_dep.dep_type_code == DepType.MATERIALIZE
assert input_dep.dep_type_name == "materialize"
def test_aggregate_color_votes_configure_votes_1w():
"""Test AggregateColorVotes config method with weekly votes output."""
job = AggregateColorVotes()
outputs = [Votes1WPartition(data_date="2025-01-15")]
configs = job.config(outputs)
assert len(configs) == 1
config = configs[0]
assert len(config.outputs) == 1
assert config.outputs[0].str == "votes_1w/2025-01-15"
assert config.env["DATA_DATE"] == "2025-01-15"
assert config.env["AGGREGATE_TYPE"] == "votes_1w"
# Should have inputs for all colors from trailing 1w partitions
assert len(config.inputs) == len(COLORS)
expected_inputs = {f"color_votes_1w/2025-01-15/{color}" for color in COLORS}
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
assert actual_inputs == expected_inputs
def test_aggregate_color_votes_configure_votes_1m():
"""Test AggregateColorVotes config method with monthly votes output."""
job = AggregateColorVotes()
outputs = [Votes1MPartition(data_date="2025-01-15")]
configs = job.config(outputs)
assert len(configs) == 1
config = configs[0]
assert len(config.outputs) == 1
assert config.outputs[0].str == "votes_1m/2025-01-15"
assert config.env["DATA_DATE"] == "2025-01-15"
assert config.env["AGGREGATE_TYPE"] == "votes_1m"
# Should have inputs for all colors from trailing 1m partitions
assert len(config.inputs) == len(COLORS)
expected_inputs = {f"color_votes_1m/2025-01-15/{color}" for color in COLORS}
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
assert actual_inputs == expected_inputs
def test_aggregate_color_votes_configure_multiple_outputs():
"""Test AggregateColorVotes config method with multiple different output types."""
job = AggregateColorVotes()
outputs = [
DailyVotesPartition(data_date="2025-01-15"),
Votes1WPartition(data_date="2025-01-16"),
Votes1MPartition(data_date="2025-01-17")
]
configs = job.config(outputs)
assert len(configs) == 3 # One config per output
# Find configs by date
daily_config = None
weekly_config = None
monthly_config = None
for config in configs:
if config.env["DATA_DATE"] == "2025-01-15":
daily_config = config
elif config.env["DATA_DATE"] == "2025-01-16":
weekly_config = config
elif config.env["DATA_DATE"] == "2025-01-17":
monthly_config = config
assert daily_config is not None
assert weekly_config is not None
assert monthly_config is not None
# Check daily config
assert daily_config.env["AGGREGATE_TYPE"] == "daily_votes"
assert daily_config.outputs[0].str == "daily_votes/2025-01-15"
assert len(daily_config.inputs) == len(COLORS)
assert all("daily_color_votes/2025-01-15/" in inp.partition_ref.str for inp in daily_config.inputs)
# Check weekly config
assert weekly_config.env["AGGREGATE_TYPE"] == "votes_1w"
assert weekly_config.outputs[0].str == "votes_1w/2025-01-16"
assert len(weekly_config.inputs) == len(COLORS)
assert all("color_votes_1w/2025-01-16/" in inp.partition_ref.str for inp in weekly_config.inputs)
# Check monthly config
assert monthly_config.env["AGGREGATE_TYPE"] == "votes_1m"
assert monthly_config.outputs[0].str == "votes_1m/2025-01-17"
assert len(monthly_config.inputs) == len(COLORS)
assert all("color_votes_1m/2025-01-17/" in inp.partition_ref.str for inp in monthly_config.inputs)
def test_aggregate_color_votes_configure_multiple_same_type():
"""Test AggregateColorVotes config method with multiple outputs of same type."""
job = AggregateColorVotes()
outputs = [
DailyVotesPartition(data_date="2025-01-15"),
DailyVotesPartition(data_date="2025-01-16")
]
configs = job.config(outputs)
assert len(configs) == 2 # One config per output
for config in configs:
assert config.env["AGGREGATE_TYPE"] == "daily_votes"
assert len(config.inputs) == len(COLORS)
if config.env["DATA_DATE"] == "2025-01-15":
assert config.outputs[0].str == "daily_votes/2025-01-15"
assert all("daily_color_votes/2025-01-15/" in inp.partition_ref.str for inp in config.inputs)
elif config.env["DATA_DATE"] == "2025-01-16":
assert config.outputs[0].str == "daily_votes/2025-01-16"
assert all("daily_color_votes/2025-01-16/" in inp.partition_ref.str for inp in config.inputs)
else:
assert False, f"Unexpected date: {config.env['DATA_DATE']}"
if __name__ == '__main__':
import pytest
raise SystemExit(pytest.main([__file__]))

View file

@ -0,0 +1,244 @@
#!/usr/bin/env python3
"""
Comparison test between Bazel and DSL implementations.
This test verifies that the DSL job configurations produce identical results
to the equivalent bazel job configurations for the same partition references.
"""
import unittest
from databuild.proto import PartitionRef, JobConfigureResponse
from databuild.test.app.dsl.graph import (
IngestColorVotes,
TrailingColorVotes,
AggregateColorVotes,
ColorVoteReportCalc
)
from databuild.test.app.dsl.partitions import (
IngestedColorPartition,
TrailingColorVotes1WPartition,
TrailingColorVotes1MPartition,
DailyVotesPartition,
Votes1WPartition,
Votes1MPartition,
ColorVoteReportPartition
)
# Import bazel job config functions
from databuild.test.app.bazel.jobs.ingest_color_votes.config import configure as bazel_ingest_config
from databuild.test.app.bazel.jobs.trailing_color_votes.config import configure as bazel_trailing_config
from databuild.test.app.bazel.jobs.aggregate_color_votes.config import configure as bazel_aggregate_config
from databuild.test.app.bazel.jobs.color_vote_report_calc.config import configure as bazel_report_config
class BazelDSLComparisonTest(unittest.TestCase):
"""Compare bazel and DSL job configurations to ensure they produce identical results."""
def _compare_job_configs(self, bazel_response, dsl_configs):
"""Helper to compare JobConfigureResponse from bazel with list[JobConfig] from DSL."""
self.assertIsInstance(bazel_response, JobConfigureResponse)
self.assertIsInstance(dsl_configs, list)
bazel_configs = bazel_response.configs
self.assertEqual(len(bazel_configs), len(dsl_configs),
"Bazel and DSL should produce same number of configs")
# Sort both by a stable key for comparison
def config_sort_key(config):
outputs_str = ",".join(sorted(out.str for out in config.outputs))
env_str = ",".join(f"{k}={v}" for k, v in sorted(config.env.items()))
return f"{outputs_str}:{env_str}"
bazel_sorted = sorted(bazel_configs, key=config_sort_key)
dsl_sorted = sorted(dsl_configs, key=config_sort_key)
for bazel_config, dsl_config in zip(bazel_sorted, dsl_sorted):
# Compare outputs
bazel_outputs = {out.str for out in bazel_config.outputs}
dsl_outputs = {out.str for out in dsl_config.outputs}
self.assertEqual(bazel_outputs, dsl_outputs, "Outputs should match")
# Compare inputs
bazel_inputs = {(inp.partition_ref.str, inp.dep_type_code, inp.dep_type_name)
for inp in bazel_config.inputs}
dsl_inputs = {(inp.partition_ref.str, inp.dep_type_code, inp.dep_type_name)
for inp in dsl_config.inputs}
self.assertEqual(bazel_inputs, dsl_inputs, "Inputs should match")
# Compare args
self.assertEqual(set(bazel_config.args), set(dsl_config.args), "Args should match")
# Compare env
self.assertEqual(bazel_config.env, dsl_config.env, "Environment should match")
def test_ingest_color_votes_comparison(self):
"""Compare IngestColorVotes bazel vs DSL configurations."""
# Test single output
partition_refs = [PartitionRef(str="daily_color_votes/2025-01-01/red")]
bazel_response = bazel_ingest_config(partition_refs)
partitions = [IngestedColorPartition.deserialize(ref.str) for ref in partition_refs]
dsl_job = IngestColorVotes()
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
# Test multiple outputs
partition_refs = [
PartitionRef(str="daily_color_votes/2025-01-02/red"),
PartitionRef(str="daily_color_votes/2025-01-02/blue")
]
bazel_response = bazel_ingest_config(partition_refs)
partitions = [IngestedColorPartition.deserialize(ref.str) for ref in partition_refs]
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
def test_trailing_color_votes_comparison(self):
"""Compare TrailingColorVotes bazel vs DSL configurations."""
# Test weekly output
partition_refs = [PartitionRef(str="color_votes_1w/2025-01-07/red")]
bazel_response = bazel_trailing_config(partition_refs)
partitions = [TrailingColorVotes1WPartition.deserialize(ref.str) for ref in partition_refs]
dsl_job = TrailingColorVotes()
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
# Test monthly output
partition_refs = [PartitionRef(str="color_votes_1m/2025-01-28/blue")]
bazel_response = bazel_trailing_config(partition_refs)
partitions = [TrailingColorVotes1MPartition.deserialize(ref.str) for ref in partition_refs]
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
# Test mixed weekly and monthly for same date/color
partition_refs = [
PartitionRef(str="color_votes_1w/2025-01-28/green"),
PartitionRef(str="color_votes_1m/2025-01-28/green")
]
bazel_response = bazel_trailing_config(partition_refs)
partitions = [
TrailingColorVotes1WPartition.deserialize(partition_refs[0].str),
TrailingColorVotes1MPartition.deserialize(partition_refs[1].str)
]
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
def test_aggregate_color_votes_comparison(self):
"""Compare AggregateColorVotes bazel vs DSL configurations."""
# Test daily votes
partition_refs = [PartitionRef(str="daily_votes/2025-01-15")]
bazel_response = bazel_aggregate_config(partition_refs)
partitions = [DailyVotesPartition.deserialize(ref.str) for ref in partition_refs]
dsl_job = AggregateColorVotes()
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
# Test weekly votes
partition_refs = [PartitionRef(str="votes_1w/2025-01-15")]
bazel_response = bazel_aggregate_config(partition_refs)
partitions = [Votes1WPartition.deserialize(ref.str) for ref in partition_refs]
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
# Test monthly votes
partition_refs = [PartitionRef(str="votes_1m/2025-01-15")]
bazel_response = bazel_aggregate_config(partition_refs)
partitions = [Votes1MPartition.deserialize(ref.str) for ref in partition_refs]
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
# Test multiple different types
partition_refs = [
PartitionRef(str="daily_votes/2025-01-15"),
PartitionRef(str="votes_1w/2025-01-16"),
PartitionRef(str="votes_1m/2025-01-17")
]
bazel_response = bazel_aggregate_config(partition_refs)
partitions = [
DailyVotesPartition.deserialize(partition_refs[0].str),
Votes1WPartition.deserialize(partition_refs[1].str),
Votes1MPartition.deserialize(partition_refs[2].str)
]
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
def test_color_vote_report_calc_comparison(self):
"""Compare ColorVoteReportCalc bazel vs DSL configurations."""
# Test single report
partition_refs = [PartitionRef(str="color_vote_report/2025-01-15/red")]
bazel_response = bazel_report_config(partition_refs)
partitions = [ColorVoteReportPartition.deserialize(ref.str) for ref in partition_refs]
dsl_job = ColorVoteReportCalc()
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
# Test multiple reports same date
partition_refs = [
PartitionRef(str="color_vote_report/2025-01-15/red"),
PartitionRef(str="color_vote_report/2025-01-15/blue")
]
bazel_response = bazel_report_config(partition_refs)
partitions = [ColorVoteReportPartition.deserialize(ref.str) for ref in partition_refs]
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
# Test multiple reports different dates
partition_refs = [
PartitionRef(str="color_vote_report/2025-01-15/red"),
PartitionRef(str="color_vote_report/2025-01-16/red")
]
bazel_response = bazel_report_config(partition_refs)
partitions = [ColorVoteReportPartition.deserialize(ref.str) for ref in partition_refs]
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
def test_partition_serialization_roundtrip(self):
"""Test that DSL partition serialization/deserialization works correctly."""
test_cases = [
IngestedColorPartition(data_date="2025-01-15", color="red"),
TrailingColorVotes1WPartition(data_date="2025-01-15", color="blue"),
TrailingColorVotes1MPartition(data_date="2025-01-28", color="green"),
DailyVotesPartition(data_date="2025-01-15"),
Votes1WPartition(data_date="2025-01-15"),
Votes1MPartition(data_date="2025-01-15"),
ColorVoteReportPartition(data_date="2025-01-15", color="yellow")
]
for partition in test_cases:
with self.subTest(partition=partition):
# Serialize then deserialize
serialized = partition.serialize()
deserialized = type(partition).deserialize(serialized)
# Should be equal
self.assertEqual(partition, deserialized)
# Serializing again should give same result
reserialized = deserialized.serialize()
self.assertEqual(serialized, reserialized)
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,204 @@
from databuild.test.app.dsl.graph import ColorVoteReportCalc
from databuild.test.app.dsl.partitions import (
ColorVoteReportPartition,
DailyVotesPartition,
Votes1WPartition,
Votes1MPartition,
IngestedColorPartition,
TrailingColorVotes1WPartition,
TrailingColorVotes1MPartition
)
from databuild.proto import DepType
def test_color_vote_report_calc_configure_single_output():
"""Test ColorVoteReportCalc config method with single color report output."""
job = ColorVoteReportCalc()
outputs = [ColorVoteReportPartition(data_date="2025-01-15", color="red")]
configs = job.config(outputs)
assert len(configs) == 1
config = configs[0]
# Check outputs
assert len(config.outputs) == 1
assert config.outputs[0].str == "color_vote_report/2025-01-15/red"
# Check args - should contain partition strings
assert len(config.args) == 1
assert config.args[0] == "color_vote_report/2025-01-15/red"
# Check inputs - should have aggregate inputs for the date and specific color inputs
expected_inputs = {
# Aggregate inputs for the date
"daily_votes/2025-01-15",
"votes_1w/2025-01-15",
"votes_1m/2025-01-15",
# Color-specific inputs
"daily_color_votes/2025-01-15/red",
"color_votes_1w/2025-01-15/red",
"color_votes_1m/2025-01-15/red"
}
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
assert actual_inputs == expected_inputs
# All inputs should be MATERIALIZE type
for input_dep in config.inputs:
assert input_dep.dep_type_code == DepType.MATERIALIZE
assert input_dep.dep_type_name == "materialize"
def test_color_vote_report_calc_configure_multiple_colors_same_date():
"""Test ColorVoteReportCalc config method with multiple colors for same date."""
job = ColorVoteReportCalc()
outputs = [
ColorVoteReportPartition(data_date="2025-01-15", color="red"),
ColorVoteReportPartition(data_date="2025-01-15", color="blue")
]
configs = job.config(outputs)
assert len(configs) == 1 # Single config since all outputs go to same job
config = configs[0]
# Check outputs
assert len(config.outputs) == 2
output_strs = {output.str for output in config.outputs}
assert "color_vote_report/2025-01-15/red" in output_strs
assert "color_vote_report/2025-01-15/blue" in output_strs
# Check args - should contain both partition strings
assert len(config.args) == 2
assert set(config.args) == {"color_vote_report/2025-01-15/red", "color_vote_report/2025-01-15/blue"}
# Check inputs - should have aggregate inputs for the date and color-specific inputs for both colors
expected_inputs = {
# Aggregate inputs for the date (only one set since same date)
"daily_votes/2025-01-15",
"votes_1w/2025-01-15",
"votes_1m/2025-01-15",
# Color-specific inputs for red
"daily_color_votes/2025-01-15/red",
"color_votes_1w/2025-01-15/red",
"color_votes_1m/2025-01-15/red",
# Color-specific inputs for blue
"daily_color_votes/2025-01-15/blue",
"color_votes_1w/2025-01-15/blue",
"color_votes_1m/2025-01-15/blue"
}
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
assert actual_inputs == expected_inputs
def test_color_vote_report_calc_configure_multiple_dates():
"""Test ColorVoteReportCalc config method with reports for different dates."""
job = ColorVoteReportCalc()
outputs = [
ColorVoteReportPartition(data_date="2025-01-15", color="red"),
ColorVoteReportPartition(data_date="2025-01-16", color="red")
]
configs = job.config(outputs)
assert len(configs) == 1 # Single config since all outputs go to same job
config = configs[0]
# Check outputs
assert len(config.outputs) == 2
output_strs = {output.str for output in config.outputs}
assert "color_vote_report/2025-01-15/red" in output_strs
assert "color_vote_report/2025-01-16/red" in output_strs
# Check args
assert len(config.args) == 2
assert set(config.args) == {"color_vote_report/2025-01-15/red", "color_vote_report/2025-01-16/red"}
# Check inputs - should have aggregate inputs for both dates and color-specific inputs
expected_inputs = {
# Aggregate inputs for both dates
"daily_votes/2025-01-15",
"votes_1w/2025-01-15",
"votes_1m/2025-01-15",
"daily_votes/2025-01-16",
"votes_1w/2025-01-16",
"votes_1m/2025-01-16",
# Color-specific inputs for red on both dates
"daily_color_votes/2025-01-15/red",
"color_votes_1w/2025-01-15/red",
"color_votes_1m/2025-01-15/red",
"daily_color_votes/2025-01-16/red",
"color_votes_1w/2025-01-16/red",
"color_votes_1m/2025-01-16/red"
}
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
assert actual_inputs == expected_inputs
def test_color_vote_report_calc_configure_complex_scenario():
"""Test ColorVoteReportCalc config method with complex multi-date, multi-color scenario."""
job = ColorVoteReportCalc()
outputs = [
ColorVoteReportPartition(data_date="2025-01-15", color="red"),
ColorVoteReportPartition(data_date="2025-01-15", color="blue"),
ColorVoteReportPartition(data_date="2025-01-16", color="green"),
ColorVoteReportPartition(data_date="2025-01-17", color="red")
]
configs = job.config(outputs)
assert len(configs) == 1 # Single config since all outputs go to same job
config = configs[0]
# Check outputs
assert len(config.outputs) == 4
expected_output_strs = {
"color_vote_report/2025-01-15/red",
"color_vote_report/2025-01-15/blue",
"color_vote_report/2025-01-16/green",
"color_vote_report/2025-01-17/red"
}
actual_output_strs = {output.str for output in config.outputs}
assert actual_output_strs == expected_output_strs
# Check args
assert len(config.args) == 4
assert set(config.args) == expected_output_strs
# Check inputs - should have aggregate inputs for all unique dates and color-specific inputs
expected_inputs = {
# Aggregate inputs for all dates
"daily_votes/2025-01-15",
"votes_1w/2025-01-15",
"votes_1m/2025-01-15",
"daily_votes/2025-01-16",
"votes_1w/2025-01-16",
"votes_1m/2025-01-16",
"daily_votes/2025-01-17",
"votes_1w/2025-01-17",
"votes_1m/2025-01-17",
# Color-specific inputs
"daily_color_votes/2025-01-15/red",
"color_votes_1w/2025-01-15/red",
"color_votes_1m/2025-01-15/red",
"daily_color_votes/2025-01-15/blue",
"color_votes_1w/2025-01-15/blue",
"color_votes_1m/2025-01-15/blue",
"daily_color_votes/2025-01-16/green",
"color_votes_1w/2025-01-16/green",
"color_votes_1m/2025-01-16/green",
"daily_color_votes/2025-01-17/red",
"color_votes_1w/2025-01-17/red",
"color_votes_1m/2025-01-17/red"
}
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
assert actual_inputs == expected_inputs
if __name__ == '__main__':
import pytest
raise SystemExit(pytest.main([__file__]))

View file

@ -0,0 +1,157 @@
#!/usr/bin/env python3
"""
Integration test for the DSL graph analysis.
This test verifies that when we request color vote reports via the DSL graph,
the analyzer correctly identifies all upstream dependencies and jobs required.
NOTE: This test assumes the DSL graph will have an analyze() method similar to
the bazel graph analyzer. This functionality is not yet implemented but these
tests will validate it once available.
"""
import unittest
from databuild.test.app.dsl.graph import graph
from databuild.test.app.dsl.partitions import ColorVoteReportPartition
class DSLGraphAnalysisTest(unittest.TestCase):
def setUp(self):
# Ensure we have the graph instance
self.graph = graph
def test_single_color_report_dependencies(self):
"""Test dependencies for a single color vote report via DSL."""
partition_refs = ["color_vote_report/2024-01-15/red"]
# TODO: Once DSL graph analysis is implemented, this should call:
# result = self.graph.analyze(partition_refs)
# self.assertIn('nodes', result)
# For now, we can at least verify the graph structure
self.assertIsNotNone(self.graph)
self.assertGreater(len(self.graph.lookup), 0)
# Verify we can create the partition and find its producer
partition = ColorVoteReportPartition(data_date="2024-01-15", color="red")
producer_job_class = self.graph.lookup.get(ColorVoteReportPartition)
self.assertIsNotNone(producer_job_class, "ColorVoteReportPartition should have a registered producer")
# Test that we can call the job's config method
job_instance = producer_job_class()
configs = job_instance.config([partition])
self.assertIsInstance(configs, list)
self.assertGreater(len(configs), 0)
def test_multiple_color_reports_same_date(self):
"""Test dependencies when requesting multiple colors for the same date via DSL."""
partition_refs = [
"color_vote_report/2024-01-15/red",
"color_vote_report/2024-01-15/blue"
]
# TODO: Once DSL graph analysis is implemented, this should call:
# result = self.graph.analyze(partition_refs)
# self.assertIn('nodes', result)
# For now, verify we can handle multiple partitions
partitions = [
ColorVoteReportPartition(data_date="2024-01-15", color="red"),
ColorVoteReportPartition(data_date="2024-01-15", color="blue")
]
producer_job_class = self.graph.lookup.get(ColorVoteReportPartition)
self.assertIsNotNone(producer_job_class)
job_instance = producer_job_class()
configs = job_instance.config(partitions)
self.assertIsInstance(configs, list)
self.assertGreater(len(configs), 0)
def test_multiple_dates_dependencies(self):
"""Test dependencies when requesting reports for different dates via DSL."""
partition_refs = [
"color_vote_report/2024-01-15/red",
"color_vote_report/2024-01-16/red"
]
# TODO: Once DSL graph analysis is implemented, this should call:
# result = self.graph.analyze(partition_refs)
# self.assertIn('nodes', result)
# For now, verify we can handle different dates
partitions = [
ColorVoteReportPartition(data_date="2024-01-15", color="red"),
ColorVoteReportPartition(data_date="2024-01-16", color="red")
]
producer_job_class = self.graph.lookup.get(ColorVoteReportPartition)
self.assertIsNotNone(producer_job_class)
job_instance = producer_job_class()
configs = job_instance.config(partitions)
self.assertIsInstance(configs, list)
self.assertGreater(len(configs), 0)
def test_graph_completeness(self):
"""Test that the DSL graph has all expected partition types registered."""
from databuild.test.app.dsl.partitions import (
IngestedColorPartition,
TrailingColorVotes1WPartition,
TrailingColorVotes1MPartition,
DailyVotesPartition,
Votes1WPartition,
Votes1MPartition,
ColorVoteReportPartition
)
expected_partitions = {
IngestedColorPartition,
TrailingColorVotes1WPartition,
TrailingColorVotes1MPartition,
DailyVotesPartition,
Votes1WPartition,
Votes1MPartition,
ColorVoteReportPartition
}
registered_partitions = set(self.graph.lookup.keys())
self.assertEqual(registered_partitions, expected_partitions,
"All partition types should be registered in the graph")
def test_partition_lookup_functionality(self):
"""Test that partition lookup works correctly for all partition types."""
from databuild.test.app.dsl.partitions import (
IngestedColorPartition,
TrailingColorVotes1WPartition,
TrailingColorVotes1MPartition,
DailyVotesPartition,
Votes1WPartition,
Votes1MPartition,
ColorVoteReportPartition
)
# Test each partition type can be looked up and has a valid job
test_cases = [
(IngestedColorPartition, IngestedColorPartition(data_date="2024-01-15", color="red")),
(TrailingColorVotes1WPartition, TrailingColorVotes1WPartition(data_date="2024-01-15", color="red")),
(TrailingColorVotes1MPartition, TrailingColorVotes1MPartition(data_date="2024-01-15", color="red")),
(DailyVotesPartition, DailyVotesPartition(data_date="2024-01-15")),
(Votes1WPartition, Votes1WPartition(data_date="2024-01-15")),
(Votes1MPartition, Votes1MPartition(data_date="2024-01-15")),
(ColorVoteReportPartition, ColorVoteReportPartition(data_date="2024-01-15", color="red"))
]
for partition_type, partition_instance in test_cases:
with self.subTest(partition_type=partition_type.__name__):
job_class = self.graph.lookup.get(partition_type)
self.assertIsNotNone(job_class, f"Job class for {partition_type.__name__} should be registered")
# Verify we can instantiate the job and call config
job_instance = job_class()
configs = job_instance.config([partition_instance])
self.assertIsInstance(configs, list, f"Config method for {partition_type.__name__} should return a list")
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,56 @@
from databuild.test.app.dsl.graph import IngestColorVotes
from databuild.test.app.dsl.partitions import IngestedColorPartition
from databuild.proto import PartitionRef
def test_ingest_color_votes_configure_single():
"""Test IngestColorVotes config method with single output."""
job = IngestColorVotes()
outputs = [IngestedColorPartition(data_date="2025-01-01", color="red")]
configs = job.config(outputs)
assert len(configs) == 1
config = configs[0]
assert len(config.outputs) == 1
assert config.outputs[0].str == "daily_color_votes/2025-01-01/red"
assert config.env["COLOR"] == "red"
assert config.env["DATA_DATE"] == "2025-01-01"
assert len(config.inputs) == 0
assert len(config.args) == 0
def test_ingest_color_votes_configure_multiple():
"""Test IngestColorVotes config method with multiple outputs."""
job = IngestColorVotes()
outputs = [
IngestedColorPartition(data_date="2025-01-02", color="red"),
IngestedColorPartition(data_date="2025-01-02", color="blue"),
]
configs = job.config(outputs)
assert len(configs) == 2
# First config
config1 = configs[0]
assert len(config1.outputs) == 1
assert config1.outputs[0].str == "daily_color_votes/2025-01-02/red"
assert config1.env["COLOR"] == "red"
assert config1.env["DATA_DATE"] == "2025-01-02"
assert len(config1.inputs) == 0
assert len(config1.args) == 0
# Second config
config2 = configs[1]
assert len(config2.outputs) == 1
assert config2.outputs[0].str == "daily_color_votes/2025-01-02/blue"
assert config2.env["COLOR"] == "blue"
assert config2.env["DATA_DATE"] == "2025-01-02"
assert len(config2.inputs) == 0
assert len(config2.args) == 0
if __name__ == '__main__':
import pytest
raise SystemExit(pytest.main([__file__]))

View file

@ -0,0 +1,135 @@
from databuild.test.app.dsl.graph import TrailingColorVotes
from databuild.test.app.dsl.partitions import (
TrailingColorVotes1WPartition,
TrailingColorVotes1MPartition,
IngestedColorPartition
)
from databuild.proto import DepType
def test_trailing_color_votes_configure_weekly_only():
"""Test TrailingColorVotes config method with weekly output only."""
job = TrailingColorVotes()
outputs = [TrailingColorVotes1WPartition(data_date="2025-01-07", color="red")]
configs = job.config(outputs)
assert len(configs) == 1
config = configs[0]
assert len(config.outputs) == 1
assert config.outputs[0].str == "color_votes_1w/2025-01-07/red"
assert config.env["COLOR"] == "red"
assert config.env["DATA_DATE"] == "2025-01-07"
assert config.env["WEEKLY"] == "true"
assert config.env["MONTHLY"] == "false"
# Should have 7 days of inputs
assert len(config.inputs) == 7
expected_dates = ["2025-01-07", "2025-01-06", "2025-01-05", "2025-01-04",
"2025-01-03", "2025-01-02", "2025-01-01"]
for i, input_dep in enumerate(config.inputs):
assert input_dep.dep_type_code == DepType.MATERIALIZE
assert input_dep.dep_type_name == "materialize"
assert input_dep.partition_ref.str == f"daily_color_votes/{expected_dates[i]}/red"
def test_trailing_color_votes_configure_monthly_only():
"""Test TrailingColorVotes config method with monthly output only."""
job = TrailingColorVotes()
outputs = [TrailingColorVotes1MPartition(data_date="2025-01-28", color="blue")]
configs = job.config(outputs)
assert len(configs) == 1
config = configs[0]
assert len(config.outputs) == 1
assert config.outputs[0].str == "color_votes_1m/2025-01-28/blue"
assert config.env["COLOR"] == "blue"
assert config.env["DATA_DATE"] == "2025-01-28"
assert config.env["WEEKLY"] == "false"
assert config.env["MONTHLY"] == "true"
# Should have 28 days of inputs
assert len(config.inputs) == 28
# Check first and last input dates
assert config.inputs[0].partition_ref.str == "daily_color_votes/2025-01-28/blue"
assert config.inputs[27].partition_ref.str == "daily_color_votes/2025-01-01/blue"
def test_trailing_color_votes_configure_both_weekly_and_monthly():
"""Test TrailingColorVotes config method with both weekly and monthly outputs for same date/color."""
job = TrailingColorVotes()
outputs = [
TrailingColorVotes1WPartition(data_date="2025-01-28", color="green"),
TrailingColorVotes1MPartition(data_date="2025-01-28", color="green")
]
configs = job.config(outputs)
assert len(configs) == 1 # Should group by (data_date, color)
config = configs[0]
assert len(config.outputs) == 2
# Check outputs
output_strs = {output.str for output in config.outputs}
assert "color_votes_1w/2025-01-28/green" in output_strs
assert "color_votes_1m/2025-01-28/green" in output_strs
assert config.env["COLOR"] == "green"
assert config.env["DATA_DATE"] == "2025-01-28"
assert config.env["WEEKLY"] == "true"
assert config.env["MONTHLY"] == "true"
# Should have 28 days of inputs (max window)
assert len(config.inputs) == 28
def test_trailing_color_votes_configure_multiple_groups():
"""Test TrailingColorVotes config method with outputs that require separate configs."""
job = TrailingColorVotes()
outputs = [
TrailingColorVotes1WPartition(data_date="2025-01-07", color="red"),
TrailingColorVotes1WPartition(data_date="2025-01-07", color="blue"),
TrailingColorVotes1MPartition(data_date="2025-01-08", color="red")
]
configs = job.config(outputs)
assert len(configs) == 3 # Three different (data_date, color) combinations
# Find configs by their characteristics
red_7th_config = None
blue_7th_config = None
red_8th_config = None
for config in configs:
if config.env["DATA_DATE"] == "2025-01-07" and config.env["COLOR"] == "red":
red_7th_config = config
elif config.env["DATA_DATE"] == "2025-01-07" and config.env["COLOR"] == "blue":
blue_7th_config = config
elif config.env["DATA_DATE"] == "2025-01-08" and config.env["COLOR"] == "red":
red_8th_config = config
assert red_7th_config is not None
assert blue_7th_config is not None
assert red_8th_config is not None
# Check red 7th (weekly only)
assert red_7th_config.env["WEEKLY"] == "true"
assert red_7th_config.env["MONTHLY"] == "false"
assert len(red_7th_config.inputs) == 7
# Check blue 7th (weekly only)
assert blue_7th_config.env["WEEKLY"] == "true"
assert blue_7th_config.env["MONTHLY"] == "false"
assert len(blue_7th_config.inputs) == 7
# Check red 8th (monthly only)
assert red_8th_config.env["WEEKLY"] == "false"
assert red_8th_config.env["MONTHLY"] == "true"
assert len(red_8th_config.inputs) == 28
if __name__ == '__main__':
import pytest
raise SystemExit(pytest.main([__file__]))

View file

@ -0,0 +1,26 @@
from databuild.test.app import dal
from databuild.proto import PartitionRef
from databuild.test.app.colors import COLORS
def execute(data_date: str, aggregate_type: str):
# Determine input prefix based on aggregate type
if aggregate_type == "daily_votes":
input_prefix = "daily_color_votes"
elif aggregate_type == "votes_1w":
input_prefix = "color_votes_1w"
elif aggregate_type == "votes_1m":
input_prefix = "color_votes_1m"
else:
raise ValueError(f"Unknown aggregate type: {aggregate_type}")
# Read data from all colors for this date
input_refs = []
for color in COLORS:
input_refs.append(PartitionRef(str=f"{input_prefix}/{data_date}/{color}"))
data = dal.read(*input_refs)
total_votes = sum(record["votes"] for record in data)
# Write aggregated result
output_ref = PartitionRef(str=f"{aggregate_type}/{data_date}")
dal.write(output_ref, [{"data_date": data_date, "votes": total_votes}])

View file

@ -0,0 +1,51 @@
from databuild.test.app import dal
from databuild.proto import PartitionRef
def execute(output_partition_strs: list[str]):
# Parse requested outputs
outputs = [PartitionRef(str=ref_str) for ref_str in output_partition_strs]
for output in outputs:
parts = output.str.split("/")
data_date, color = parts[1], parts[2]
# Read total votes for this date - fail if missing
daily_total = dal.read(PartitionRef(str=f"daily_votes/{data_date}"), empty_ok=False)
weekly_total = dal.read(PartitionRef(str=f"votes_1w/{data_date}"), empty_ok=False)
monthly_total = dal.read(PartitionRef(str=f"votes_1m/{data_date}"), empty_ok=False)
# Read color-specific votes for this date/color - fail if missing
daily_color = dal.read(PartitionRef(str=f"daily_color_votes/{data_date}/{color}"), empty_ok=False)
weekly_color = dal.read(PartitionRef(str=f"color_votes_1w/{data_date}/{color}"), empty_ok=False)
monthly_color = dal.read(PartitionRef(str=f"color_votes_1m/{data_date}/{color}"), empty_ok=False)
# Extract vote counts
daily_total_votes = daily_total[0]["votes"]
weekly_total_votes = weekly_total[0]["votes"]
monthly_total_votes = monthly_total[0]["votes"]
daily_color_votes = daily_color[0]["votes"]
weekly_color_votes = weekly_color[0]["votes"]
monthly_color_votes = monthly_color[0]["votes"]
# Calculate percentages
daily_percent = (daily_color_votes / daily_total_votes * 100) if daily_total_votes > 0 else 0
weekly_percent = (weekly_color_votes / weekly_total_votes * 100) if weekly_total_votes > 0 else 0
monthly_percent = (monthly_color_votes / monthly_total_votes * 100) if monthly_total_votes > 0 else 0
# Write report
report_data = [{
"color": color,
"data_date": data_date,
"daily_total_votes": daily_total_votes,
"weekly_total_votes": weekly_total_votes,
"monthly_total_votes": monthly_total_votes,
"daily_color_votes": daily_color_votes,
"weekly_color_votes": weekly_color_votes,
"monthly_color_votes": monthly_color_votes,
"daily_percent": daily_percent,
"weekly_percent": weekly_percent,
"monthly_percent": monthly_percent
}]
dal.write(output, report_data)

View file

@ -1,34 +1,8 @@
from databuild.test.app.jobs.ingest_color_votes.config import configure
from databuild.test.app.jobs.ingest_color_votes.execute import execute
from databuild.test.app import dal
from databuild.proto import PartitionRef
def test_ingest_color_votes_configure():
refs_single = [PartitionRef(str="daily_color_votes/2025-01-01/red")]
config_single = configure(refs_single)
assert len(config_single.configs) == 1
assert config_single.configs[0].outputs[0].str == "daily_color_votes/2025-01-01/red"
assert config_single.configs[0].env["COLOR"] == "red"
assert config_single.configs[0].env["DATA_DATE"] == "2025-01-01"
refs_multiple = [
PartitionRef(str="daily_color_votes/2025-01-02/red"),
PartitionRef(str="daily_color_votes/2025-01-02/blue"),
]
config_multiple = configure(refs_multiple)
assert len(config_multiple.configs) == 2
assert len(config_multiple.configs[0].outputs) == 1
assert config_multiple.configs[0].outputs[0].str == "daily_color_votes/2025-01-02/red"
assert config_multiple.configs[0].env["COLOR"] == "red"
assert config_multiple.configs[0].env["DATA_DATE"] == "2025-01-02"
assert len(config_multiple.configs[1].outputs) == 1
assert config_multiple.configs[1].outputs[0].str == "daily_color_votes/2025-01-02/blue"
assert config_multiple.configs[1].env["COLOR"] == "blue"
assert config_multiple.configs[1].env["DATA_DATE"] == "2025-01-02"
def test_ingest_color_votes():
execute("2025-01-01", "red")
results = dal.read(PartitionRef(str="daily_color_votes/2025-01-01/red"))

View file

@ -0,0 +1,28 @@
from databuild.test.app import dal
from databuild.proto import PartitionRef
from datetime import date, timedelta
import os
def execute(data_date: str, color: str):
output_date = date.fromisoformat(data_date)
weekly = os.environ.get("WEEKLY", "false").lower() == "true"
monthly = os.environ.get("MONTHLY", "false").lower() == "true"
def calculate_and_write(window_days: int, output_prefix: str):
# Read trailing data and sum votes
input_refs = []
for i in range(window_days):
input_date = output_date - timedelta(days=i)
input_refs.append(PartitionRef(str=f"daily_color_votes/{input_date.isoformat()}/{color}"))
data = dal.read(*input_refs)
total_votes = sum(record["votes"] for record in data)
output_ref = PartitionRef(str=f"{output_prefix}/{data_date}/{color}")
dal.write(output_ref, [{"color": color, "data_date": data_date, "votes": total_votes}])
if weekly:
calculate_and_write(7, "color_votes_1w")
if monthly:
calculate_and_write(28, "color_votes_1m")

View file

@ -25,29 +25,6 @@ pip.parse(
)
use_repo(pip, "pypi")
# Rules OCI - necessary for producing a docker container
bazel_dep(name = "rules_oci", version = "2.2.6")
# For testing, we also recommend https://registry.bazel.build/modules/container_structure_test
oci = use_extension("@rules_oci//oci:extensions.bzl", "oci")
# Declare external images you need to pull, for example:
oci.pull(
name = "debian",
image = "docker.io/library/python",
platforms = [
"linux/arm64/v8",
"linux/amd64",
],
# 'latest' is not reproducible, but it's convenient.
# During the build we print a WARNING message that includes recommended 'digest' and 'platforms'
# values which you can use here in place of 'tag' to pin for reproducibility.
tag = "3.12-bookworm",
)
# For each oci.pull call, repeat the "name" here to expose them as dependencies.
use_repo(oci, "debian", "debian_linux_amd64", "debian_linux_arm64_v8")
# Platforms for specifying linux/arm
bazel_dep(name = "platforms", version = "0.0.11")

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,292 @@
# DSL Graph Generation: Bazel Module Generation from Python DSL
## Motivation & High-Level Goals
### Problem Statement
DataBuild's Python DSL provides an ergonomic interface for defining data processing graphs, but currently lacks a deployment path. Users can define jobs and graphs using the DSL, but cannot easily package and deploy them as complete, hermetic applications. This limits the DSL's utility as a production-ready interface.
### Strategic Goals
1. **Seamless Deployment**: Enable DSL-defined graphs to be built and deployed as complete bazel modules
2. **Hermetic Packaging**: Generate self-contained modules with all dependencies resolved
3. **Interface Consistency**: Maintain CLI/Service interchangeability principle across generated modules
4. **Production Readiness**: Support container deployment and external dependency management
### Success Criteria
- DSL graphs can be compiled to standalone bazel modules (`@my_generated_graph//...`)
- Generated modules support the full databuild interface (analyze, build, service, container images)
- External repositories can depend on databuild core and generate working applications
- End-to-end deployment pipeline from DSL definition to running containers
## Required Reading
### Core Design Documents
- [`DESIGN.md`](../DESIGN.md) - Overall databuild architecture and principles
- [`design/core-build.md`](../design/core-build.md) - Job and graph execution semantics
- [`design/graph-specification.md`](../design/graph-specification.md) - DSL interfaces and patterns
- [`design/service.md`](../design/service.md) - Service interface requirements
- [`design/deploy-strategies.md`](../design/deploy-strategies.md) - Deployment patterns
### Key Source Files
- [`databuild/dsl/python/dsl.py`](../databuild/dsl/python/dsl.py) - Current DSL implementation
- [`databuild/test/app/dsl/graph.py`](../databuild/test/app/dsl/graph.py) - Reference DSL usage
- [`databuild/rules.bzl`](../databuild/rules.bzl) - Bazel rules for jobs and graphs
- [`databuild/databuild.proto`](../databuild/databuild.proto) - Core interfaces
### Understanding Prerequisites
1. **Job Architecture**: Jobs have `.cfg`, `.exec`, and main targets with subcommand pattern
2. **Graph Structure**: Graphs require job lookup, analyze, build, and service variants
3. **Bazel Modules**: External repos use `@workspace//...` references for generated content
4. **CLI/Service Consistency**: Both interfaces must produce identical artifacts and behaviors
## Implementation Plan
### Phase 1: Basic Generation Infrastructure
**Goal**: Establish foundation for generating bazel modules from DSL definitions
#### Deliverables
- Extend `DataBuildGraph.generate_bazel_module()` method
- Generate minimal `MODULE.bazel` with databuild core dependency
- Generate `BUILD.bazel` with job and graph target stubs
- Basic workspace creation and file writing utilities
#### Implementation Tasks
1. Add `generate_bazel_module(workspace_name: str, output_dir: str)` to `DataBuildGraph`
2. Create template system for `MODULE.bazel` and `BUILD.bazel` generation
3. Implement file system utilities for creating workspace structure
4. Add basic validation for DSL graph completeness
#### Tests & Verification
```bash
# Test: Basic generation succeeds
python -c "
from databuild.test.app.dsl.graph import graph
graph.generate_bazel_module('test_graph', '/tmp/generated')
"
# Test: Generated files are valid
cd /tmp/generated
bazel build //... # Should succeed without errors
# Test: Module can be referenced externally
# In separate workspace:
# bazel build @test_graph//...
```
#### Success Criteria
- Generated `MODULE.bazel` has correct databuild dependency
- Generated `BUILD.bazel` is syntactically valid
- External workspace can reference `@generated_graph//...` targets
- No compilation errors in generated bazel files
---
### Phase 2: Job Binary Generation
**Goal**: Convert DSL job classes into executable databuild job targets
#### Deliverables
- Auto-generate job binary Python files with config/exec subcommand handling
- Create `databuild_job` targets for each DSL job class
- Implement job lookup binary generation
- Wire partition pattern matching to job target resolution
#### Implementation Tasks
1. Create job binary template with subcommand dispatching:
```python
# Generated job_binary.py template
if sys.argv[1] == "config":
job_instance = MyDSLJob()
config = job_instance.config(parse_outputs(sys.argv[2:]))
print(json.dumps(config))
elif sys.argv[1] == "exec":
config = json.loads(sys.stdin.read())
job_instance.exec(config)
```
2. Generate job lookup binary from DSL job registrations:
```python
# Generated lookup.py
def lookup_job_for_partition(partition_ref: str) -> str:
for pattern, job_target in JOB_MAPPINGS.items():
if pattern.match(partition_ref):
return job_target
raise ValueError(f"No job found for: {partition_ref}")
```
3. Create `databuild_job` targets in generated `BUILD.bazel`
4. Handle DSL job dependencies and imports in generated files
#### Tests & Verification
```bash
# Test: Job config execution
bazel run @test_graph//:ingest_color_votes.cfg -- \
"daily_color_votes/2024-01-01/red"
# Should output valid JobConfig JSON
# Test: Job exec execution
echo '{"outputs":[...], "env":{"DATA_DATE":"2024-01-01"}}' | \
bazel run @test_graph//:ingest_color_votes.exec
# Should execute successfully
# Test: Job lookup
bazel run @test_graph//:job_lookup -- \
"daily_color_votes/2024-01-01/red"
# Should output: //:ingest_color_votes
```
#### Success Criteria
- All DSL jobs become executable `databuild_job` targets
- Job binaries correctly handle config/exec subcommands
- Job lookup correctly maps partition patterns to job targets
- Generated jobs maintain DSL semantic behavior
---
### Phase 3: Graph Integration
**Goal**: Generate complete databuild graph targets with all operational variants
#### Deliverables
- Generate `databuild_graph` target with analyze/build/service capabilities
- Create all graph variant targets (`.analyze`, `.build`, `.service`, etc.)
- Wire job dependencies into graph configuration
- Generate container deployment targets
#### Implementation Tasks
1. Generate `databuild_graph` target with complete job list
2. Create all required graph variants:
- `my_graph.analyze` - Planning capability
- `my_graph.build` - CLI execution
- `my_graph.service` - HTTP service
- `my_graph.service.image` - Container image
3. Configure job lookup and dependency wiring
4. Add graph label and identification metadata
#### Tests & Verification
```bash
# Test: Graph analysis
bazel run @test_graph//:my_graph.analyze -- \
"color_vote_report/2024-01-01/red"
# Should output complete job execution plan
# Test: Graph building
bazel run @test_graph//:my_graph.build -- \
"daily_color_votes/2024-01-01/red"
# Should execute end-to-end build
# Test: Service deployment
bazel run @test_graph//:my_graph.service -- --port 8081
# Should start HTTP service on port 8081
# Test: Container generation
bazel build @test_graph//:my_graph.service.image
# Should create deployable container image
```
#### Success Criteria
- Graph targets provide full databuild functionality
- CLI and service interfaces produce identical results
- All graph operations work with generated job targets
- Container images are deployable and functional
---
### Phase 4: Dependency Resolution
**Goal**: Handle external pip packages and bazel dependencies in generated modules
#### Deliverables
- User-declared dependency system in DSL
- Generated `MODULE.bazel` with proper pip and bazel dependencies
- Dependency validation and conflict resolution
- Support for requirements files and version pinning
#### Implementation Tasks
1. Extend `DataBuildGraph` constructor to accept dependencies:
```python
graph = DataBuildGraph(
"//my_graph",
pip_deps=["pandas>=2.0.0", "numpy"],
bazel_deps=["@my_repo//internal:lib"]
)
```
2. Generate `MODULE.bazel` with pip extension configuration:
```python
pip = use_extension("@rules_python//python/extensions:pip.bzl", "pip")
pip.parse(
hub_name = "pip_deps",
python_version = "3.11",
requirements_lock = "//:requirements_lock.txt"
)
```
3. Create requirements file generation from declared dependencies
4. Add dependency validation during generation
#### Tests & Verification
```bash
# Test: Pip dependencies resolved
bazel build @test_graph//:my_job
# Should succeed with pandas/numpy available
# Test: Cross-module references work
# Generate graph that depends on @other_repo//lib
bazel build @test_graph//:dependent_job
# Should resolve external bazel dependencies
# Test: Container includes all deps
bazel run @test_graph//:my_graph.service.image_load
docker run databuild_test_graph_service:latest python -c "import pandas"
# Should succeed - pandas available in container
```
#### Success Criteria
- Generated modules resolve all external dependencies
- Pip packages are available to job execution
- Cross-repository bazel dependencies work correctly
- Container images include complete dependency closure
---
### Phase 5: End-to-End Deployment
**Goal**: Complete production deployment pipeline with observability
#### Deliverables
- Production-ready container images with proper configuration
- Integration with existing databuild observability systems
- Build event log compatibility
- Performance optimization and resource management
#### Implementation Tasks
1. Optimize generated container images for production use
2. Ensure build event logging works correctly in generated modules
3. Add resource configuration and limits to generated targets
4. Create deployment documentation and examples
5. Performance testing and optimization
#### Tests & Verification
```bash
./run_e2e_tests.sh
```
#### Success Criteria
- Generated modules are production-ready
- Full observability and logging integration
- Performance meets production requirements
- CLI/Service consistency maintained
- Complete deployment documentation
## Validation Strategy
### Integration with Existing Tests
- Extend `run_e2e_tests.sh` to test generated modules
- Add generated module tests to CI/CD pipeline
- Use existing test app DSL as primary test case
### Performance Benchmarks
- Graph analysis speed comparison (DSL vs hand-written bazel)
- Container image size optimization
- Job execution overhead measurement
### Correctness Verification
- Build event log structure validation
- Partition resolution accuracy testing
- Dependency resolution completeness checks

View file

@ -1,10 +1,8 @@
- Implement python dsl
- Achieve fast configuration (betterproto2 imports are sus)
- Remove manual reference of enum values, e.g. [here](../databuild/repositories/builds/mod.rs:85)
- Type-safe mithril [claude link](https://claude.ai/share/f33f8605-472a-4db4-9211-5a1e52087316)
- Status indicator for page selection
- On build request detail page, show aggregated job results
- Use path based navigation instead of hashbang?
- Add build request notes
- How do we encode job labels in the path? (Build event job links are not encoding job labels properly)
- Resolve double type system with protobuf and openapi
- Plan for external worker dispatch (e.g. k8s pod per build, or launch in container service)
@ -12,3 +10,6 @@
- Should we have meaningful exit codes? E.g. "retry-able error", etc?
- Fully joinable build/job IDs - ensure all execution logs / metrics are joinable to build request ID?
- Triggers?
- Add build request notes
- Status indicator for page selection
- Use path based navigation instead of hashbang?