Compare commits
6 commits
30f1d9addb
...
40d42e03dd
| Author | SHA1 | Date | |
|---|---|---|---|
| 40d42e03dd | |||
| 2ad4ae6d3c | |||
| ae5147cb36 | |||
| 82e1d0eb26 | |||
| 6d55d54267 | |||
| 63f9518486 |
51 changed files with 2490 additions and 2922 deletions
22
MODULE.bazel
22
MODULE.bazel
|
|
@ -209,11 +209,29 @@ python.toolchain(
|
|||
|
||||
pip = use_extension("@rules_python//python/extensions:pip.bzl", "pip")
|
||||
pip.parse(
|
||||
hub_name = "pypi",
|
||||
hub_name = "databuild_pypi",
|
||||
python_version = "3.13",
|
||||
requirements_lock = "//:requirements_lock.txt",
|
||||
)
|
||||
use_repo(pip, "pypi")
|
||||
use_repo(pip, "databuild_pypi")
|
||||
|
||||
# OCI (Docker images)
|
||||
oci = use_extension("@rules_oci//oci:extensions.bzl", "oci")
|
||||
|
||||
# Declare external images you need to pull
|
||||
oci.pull(
|
||||
name = "debian",
|
||||
image = "docker.io/library/python",
|
||||
platforms = [
|
||||
"linux/arm64/v8",
|
||||
"linux/amd64",
|
||||
],
|
||||
# Using a pinned version for reproducibility
|
||||
tag = "3.12-bookworm",
|
||||
)
|
||||
|
||||
# For each oci.pull call, repeat the "name" here to expose them as dependencies
|
||||
use_repo(oci, "debian", "debian_linux_amd64", "debian_linux_arm64_v8")
|
||||
|
||||
# Ruff
|
||||
# macOS ARM64 (Apple Silicon)
|
||||
|
|
|
|||
|
|
@ -567,11 +567,54 @@
|
|||
"@@rules_oci+//oci:extensions.bzl%oci": {
|
||||
"general": {
|
||||
"bzlTransitiveDigest": "KHcdN2ovRQGX1MKsH0nGoGPFd/84U43tssN2jImCeJU=",
|
||||
"usagesDigest": "/O1PwnnkqSBmI9Oe08ZYYqjM4IS8JR+/9rjgzVTNDaQ=",
|
||||
"usagesDigest": "Y6oSW43ZgWvZTMtL3eDjcxyo58BCPzyiFhH+D+xVgwM=",
|
||||
"recordedFileInputs": {},
|
||||
"recordedDirentsInputs": {},
|
||||
"envVariables": {},
|
||||
"generatedRepoSpecs": {
|
||||
"debian_linux_arm64_v8": {
|
||||
"repoRuleId": "@@rules_oci+//oci/private:pull.bzl%oci_pull",
|
||||
"attributes": {
|
||||
"www_authenticate_challenges": {},
|
||||
"scheme": "https",
|
||||
"registry": "index.docker.io",
|
||||
"repository": "library/python",
|
||||
"identifier": "3.12-bookworm",
|
||||
"platform": "linux/arm64/v8",
|
||||
"target_name": "debian_linux_arm64_v8",
|
||||
"bazel_tags": []
|
||||
}
|
||||
},
|
||||
"debian_linux_amd64": {
|
||||
"repoRuleId": "@@rules_oci+//oci/private:pull.bzl%oci_pull",
|
||||
"attributes": {
|
||||
"www_authenticate_challenges": {},
|
||||
"scheme": "https",
|
||||
"registry": "index.docker.io",
|
||||
"repository": "library/python",
|
||||
"identifier": "3.12-bookworm",
|
||||
"platform": "linux/amd64",
|
||||
"target_name": "debian_linux_amd64",
|
||||
"bazel_tags": []
|
||||
}
|
||||
},
|
||||
"debian": {
|
||||
"repoRuleId": "@@rules_oci+//oci/private:pull.bzl%oci_alias",
|
||||
"attributes": {
|
||||
"target_name": "debian",
|
||||
"www_authenticate_challenges": {},
|
||||
"scheme": "https",
|
||||
"registry": "index.docker.io",
|
||||
"repository": "library/python",
|
||||
"identifier": "3.12-bookworm",
|
||||
"platforms": {
|
||||
"@@platforms//cpu:arm64": "@debian_linux_arm64_v8",
|
||||
"@@platforms//cpu:x86_64": "@debian_linux_amd64"
|
||||
},
|
||||
"bzlmod_repository": "debian",
|
||||
"reproducible": true
|
||||
}
|
||||
},
|
||||
"oci_crane_darwin_amd64": {
|
||||
"repoRuleId": "@@rules_oci+//oci:repositories.bzl%crane_repositories",
|
||||
"attributes": {
|
||||
|
|
@ -687,7 +730,11 @@
|
|||
}
|
||||
},
|
||||
"moduleExtensionMetadata": {
|
||||
"explicitRootModuleDirectDeps": [],
|
||||
"explicitRootModuleDirectDeps": [
|
||||
"debian",
|
||||
"debian_linux_arm64_v8",
|
||||
"debian_linux_amd64"
|
||||
],
|
||||
"explicitRootModuleDirectDevDeps": [],
|
||||
"useAllRepos": "NO",
|
||||
"reproducible": false
|
||||
|
|
|
|||
|
|
@ -150,7 +150,7 @@ py_binary(
|
|||
srcs = ["proto_wrapper.py"],
|
||||
main = "proto_wrapper.py",
|
||||
deps = [
|
||||
"@pypi//betterproto2_compiler",
|
||||
"@databuild_pypi//betterproto2_compiler",
|
||||
],
|
||||
)
|
||||
|
||||
|
|
@ -175,7 +175,7 @@ $(location @com_google_protobuf//:protoc) --python_betterproto2_out=$(GENDIR)/da
|
|||
":protoc-gen-python_betterproto2",
|
||||
"//:ruff_binary",
|
||||
"@com_google_protobuf//:protoc",
|
||||
"@pypi//betterproto2_compiler",
|
||||
"@databuild_pypi//betterproto2_compiler",
|
||||
],
|
||||
)
|
||||
|
||||
|
|
@ -187,8 +187,8 @@ py_library(
|
|||
],
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"@pypi//betterproto2_compiler",
|
||||
"@pypi//grpcio",
|
||||
"@pypi//pytest",
|
||||
"@databuild_pypi//betterproto2_compiler",
|
||||
"@databuild_pypi//grpcio",
|
||||
"@databuild_pypi//pytest",
|
||||
],
|
||||
)
|
||||
|
|
|
|||
|
|
@ -3,5 +3,6 @@ py_library(
|
|||
srcs = ["dsl.py"],
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//databuild:py_proto",
|
||||
],
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
|
||||
from databuild.proto import JobConfig, PartitionRef, DataDep, DepType
|
||||
from typing import Self, Protocol, get_type_hints, get_origin, get_args
|
||||
from dataclasses import fields, is_dataclass
|
||||
from dataclasses import fields, is_dataclass, dataclass, field
|
||||
import re
|
||||
|
||||
|
||||
|
|
@ -58,21 +59,13 @@ class PartitionPattern:
|
|||
return result
|
||||
|
||||
|
||||
class JobConfig:
|
||||
"""TODO need to generate this from databuild.proto"""
|
||||
|
||||
|
||||
class PartitionManifest:
|
||||
"""TODO need to generate this from databuild.proto"""
|
||||
|
||||
|
||||
class DataBuildJob(Protocol):
|
||||
# The types of partitions that this job produces
|
||||
output_types: list[type[PartitionPattern]]
|
||||
|
||||
def config(self, outputs: list[PartitionPattern]) -> list[JobConfig]: ...
|
||||
|
||||
def exec(self, config: JobConfig) -> PartitionManifest: ...
|
||||
def exec(self, config: JobConfig) -> None: ...
|
||||
|
||||
|
||||
class DataBuildGraph:
|
||||
|
|
@ -85,7 +78,54 @@ class DataBuildGraph:
|
|||
for partition in cls.output_types:
|
||||
assert partition not in self.lookup, f"Partition `{partition}` already registered"
|
||||
self.lookup[partition] = cls
|
||||
return cls
|
||||
|
||||
def generate_bazel_module(self):
|
||||
"""Generates a complete databuild application, packaging up referenced jobs and this graph via bazel targets"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@dataclass
|
||||
class JobConfigBuilder:
|
||||
outputs: list[PartitionRef] = field(default_factory=list)
|
||||
inputs: list[DataDep] = field(default_factory=list)
|
||||
args: list[str] = field(default_factory=list)
|
||||
env: dict[str, str] = field(default_factory=dict)
|
||||
|
||||
def build(self) -> JobConfig:
|
||||
return JobConfig(
|
||||
outputs=self.outputs,
|
||||
inputs=self.inputs,
|
||||
args=self.args,
|
||||
env=self.env,
|
||||
)
|
||||
|
||||
def add_inputs(self, *partitions: PartitionPattern, dep_type: DepType=DepType.MATERIALIZE) -> Self:
|
||||
for p in partitions:
|
||||
dep_type_name = "materialize" if dep_type == DepType.MATERIALIZE else "query"
|
||||
self.inputs.append(DataDep(dep_type_code=dep_type, dep_type_name=dep_type_name, partition_ref=PartitionRef(str=p.serialize())))
|
||||
return self
|
||||
|
||||
def add_outputs(self, *partitions: PartitionPattern) -> Self:
|
||||
for p in partitions:
|
||||
self.outputs.append(PartitionRef(str=p.serialize()))
|
||||
return self
|
||||
|
||||
def add_args(self, *args: str) -> Self:
|
||||
self.args.extend(args)
|
||||
return self
|
||||
|
||||
def set_args(self, args: list[str]) -> Self:
|
||||
self.args = args
|
||||
return self
|
||||
|
||||
def set_env(self, env: dict[str, str]) -> Self:
|
||||
self.env = env
|
||||
return self
|
||||
|
||||
def add_env(self, **kwargs) -> Self:
|
||||
for k, v in kwargs.items():
|
||||
assert isinstance(k, str), f"Expected a string key, got `{k}`"
|
||||
assert isinstance(v, str), f"Expected a string key, got `{v}`"
|
||||
self.env[k] = v
|
||||
return self
|
||||
|
|
|
|||
|
|
@ -3,6 +3,6 @@ py_test(
|
|||
srcs = glob(["*.py"]),
|
||||
deps = [
|
||||
"//databuild/dsl/python:dsl",
|
||||
"@pypi//pytest",
|
||||
"@databuild_pypi//pytest",
|
||||
],
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
|
||||
from databuild.dsl.python.dsl import PartitionPattern, DataBuildGraph, DataBuildJob, JobConfig, PartitionManifest
|
||||
from databuild.dsl.python.dsl import PartitionPattern, DataBuildGraph, DataBuildJob
|
||||
from databuild.proto import JobConfig, PartitionManifest
|
||||
from dataclasses import dataclass
|
||||
import pytest
|
||||
|
||||
|
|
@ -45,7 +46,7 @@ def test_basic_graph_definition():
|
|||
@graph.job
|
||||
class TestJob(DataBuildJob):
|
||||
output_types = [CategoryAnalysisPartition]
|
||||
def exec(self, config: JobConfig) -> PartitionManifest: ...
|
||||
def exec(self, config: JobConfig) -> None: ...
|
||||
def config(self, outputs: list[PartitionPattern]) -> list[JobConfig]: ...
|
||||
|
||||
assert len(graph.lookup) == 1
|
||||
|
|
@ -58,14 +59,15 @@ def test_graph_collision():
|
|||
@graph.job
|
||||
class TestJob1(DataBuildJob):
|
||||
output_types = [CategoryAnalysisPartition]
|
||||
def exec(self, config: JobConfig) -> PartitionManifest: ...
|
||||
def exec(self, config: JobConfig) -> None: ...
|
||||
def config(self, outputs: list[PartitionPattern]) -> list[JobConfig]: ...
|
||||
|
||||
with pytest.raises(AssertionError):
|
||||
# Outputs the same partition, so should raise
|
||||
@graph.job
|
||||
class TestJob2(DataBuildJob):
|
||||
output_types = [CategoryAnalysisPartition]
|
||||
def exec(self, config: JobConfig) -> PartitionManifest: ...
|
||||
def exec(self, config: JobConfig) -> None: ...
|
||||
def config(self, outputs: list[PartitionPattern]) -> list[JobConfig]: ...
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -79,8 +79,11 @@ fn resolve(output_refs: &[String]) -> Result<HashMap<String, Vec<String>>, Strin
|
|||
.map_err(|e| format!("Failed to execute job lookup: {}", e))?;
|
||||
|
||||
if !output.status.success() {
|
||||
error!("Job lookup failed: {}", output.status);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
error!("Job lookup failed: {}", stderr);
|
||||
error!("stderr: {}", stderr);
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
error!("stdout: {}", stdout);
|
||||
return Err(format!("Failed to run job lookup: {}", stderr));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1 +1,11 @@
|
|||
from databuild.py_proto_out.databuild.v1 import *
|
||||
from betterproto2 import Casing, OutputFormat
|
||||
|
||||
|
||||
def to_dict(d) -> dict:
|
||||
"""Helper for creating proper dicts from protobuf derived dataclasses."""
|
||||
return d.to_dict(
|
||||
casing=Casing.SNAKE,
|
||||
output_format=OutputFormat.PYTHON,
|
||||
include_default_values=True
|
||||
)
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@ load("@rules_oci//oci:defs.bzl", "oci_image", "oci_load")
|
|||
RUNFILES_PREFIX = """
|
||||
# ================= BEGIN RUNFILES INIT =================
|
||||
|
||||
SCRIPT_PATH="$(realpath "$0")"
|
||||
|
||||
# TODO should this be extracted to shared init script
|
||||
# Get the directory where the script is located
|
||||
if [[ -z "${RUNFILES_DIR:-}" ]]; then
|
||||
|
|
@ -71,6 +73,7 @@ def _databuild_job_cfg_impl(ctx):
|
|||
output = script,
|
||||
substitutions = {
|
||||
"%{EXECUTABLE_PATH}": configure_path,
|
||||
"%{EXECUTABLE_SHORT_PATH}": ctx.attr.configure.files_to_run.executable.short_path,
|
||||
"%{RUNFILES_PREFIX}": RUNFILES_PREFIX,
|
||||
"%{PREFIX}": "EXECUTABLE_SUBCOMMAND=\"config\"\n",
|
||||
},
|
||||
|
|
@ -331,6 +334,7 @@ def _databuild_graph_lookup_impl(ctx):
|
|||
"%{RUNFILES_PREFIX}": RUNFILES_PREFIX,
|
||||
"%{PREFIX}": "",
|
||||
"%{EXECUTABLE_PATH}": ctx.attr.lookup.files_to_run.executable.path,
|
||||
"%{EXECUTABLE_SHORT_PATH}": ctx.attr.lookup.files_to_run.executable.short_path,
|
||||
},
|
||||
is_executable = True,
|
||||
)
|
||||
|
|
@ -399,6 +403,7 @@ export DATABUILD_JOB_LOOKUP_PATH=$(rlocation _main/{lookup_path})
|
|||
output = script,
|
||||
substitutions = {
|
||||
"%{EXECUTABLE_PATH}": ctx.attr._analyze.files_to_run.executable.path,
|
||||
"%{EXECUTABLE_SHORT_PATH}": ctx.attr._analyze.files_to_run.executable.short_path,
|
||||
"%{RUNFILES_PREFIX}": RUNFILES_PREFIX,
|
||||
"%{PREFIX}": script_prefix,
|
||||
},
|
||||
|
|
|
|||
|
|
@ -5,7 +5,32 @@ set -e
|
|||
|
||||
%{PREFIX}
|
||||
|
||||
EXECUTABLE_BINARY="$(rlocation "_main/$(basename "%{EXECUTABLE_PATH}")")"
|
||||
# Check if rlocation function is available
|
||||
if ! type rlocation >/dev/null 2>&1; then
|
||||
echo "Error: rlocation function not available. Runfiles may not be properly initialized." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Resolve the executable using rlocation
|
||||
EXECUTABLE_BINARY="$(rlocation "_main/%{EXECUTABLE_SHORT_PATH}")"
|
||||
|
||||
# Check if rlocation returned something
|
||||
if [[ -z "${EXECUTABLE_BINARY}" ]]; then
|
||||
echo "Error: rlocation returned empty result for '_main/%{EXECUTABLE_SHORT_PATH}'" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Check if the resolved binary exists
|
||||
if [[ ! -f "${EXECUTABLE_BINARY}" ]]; then
|
||||
echo "Error: Resolved executable '${EXECUTABLE_BINARY}' does not exist" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Check if the resolved binary is executable
|
||||
if [[ ! -x "${EXECUTABLE_BINARY}" ]]; then
|
||||
echo "Error: Resolved executable '${EXECUTABLE_BINARY}' is not executable" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Run the configuration
|
||||
if [[ -n "${EXECUTABLE_SUBCOMMAND:-}" ]]; then
|
||||
|
|
|
|||
|
|
@ -1,25 +1,9 @@
|
|||
load("//databuild:rules.bzl", "databuild_graph", "databuild_job")
|
||||
|
||||
py_library(
|
||||
name = "job_src",
|
||||
srcs = glob(["**/*.py"]),
|
||||
visibility = ["//visibility:public"],
|
||||
deps = ["//databuild:py_proto"],
|
||||
deps = [
|
||||
"//databuild:py_proto",
|
||||
"//databuild/dsl/python:dsl",
|
||||
],
|
||||
)
|
||||
|
||||
# Tests
|
||||
py_test(
|
||||
name = "test",
|
||||
srcs = glob(["**/test.py"]),
|
||||
deps = [":job_src"],
|
||||
)
|
||||
|
||||
# Bazel-defined
|
||||
|
||||
#databuild_job(
|
||||
# name = "ingest_color_votes",
|
||||
#)
|
||||
|
||||
# Python-DSL-defined
|
||||
|
||||
# TODO
|
||||
|
|
|
|||
149
databuild/test/app/bazel/BUILD.bazel
Normal file
149
databuild/test/app/bazel/BUILD.bazel
Normal file
|
|
@ -0,0 +1,149 @@
|
|||
load("//databuild:rules.bzl", "databuild_graph", "databuild_job")
|
||||
|
||||
py_library(
|
||||
name = "job_src",
|
||||
srcs = glob(["**/*.py"]),
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//databuild:py_proto",
|
||||
"//databuild/dsl/python:dsl",
|
||||
],
|
||||
)
|
||||
|
||||
# Tests
|
||||
py_test(
|
||||
name = "test_trailing_color_votes",
|
||||
srcs = ["jobs/trailing_color_votes/test.py"],
|
||||
main = "jobs/trailing_color_votes/test.py",
|
||||
deps = [
|
||||
":job_src",
|
||||
"//databuild/test/app:job_src",
|
||||
],
|
||||
)
|
||||
|
||||
py_test(
|
||||
name = "test_ingest_color_votes",
|
||||
srcs = ["jobs/ingest_color_votes/test.py"],
|
||||
main = "jobs/ingest_color_votes/test.py",
|
||||
deps = [
|
||||
":job_src",
|
||||
"//databuild/test/app:job_src",
|
||||
],
|
||||
)
|
||||
|
||||
py_test(
|
||||
name = "test_aggregate_color_votes",
|
||||
srcs = ["jobs/aggregate_color_votes/test.py"],
|
||||
main = "jobs/aggregate_color_votes/test.py",
|
||||
deps = [
|
||||
":job_src",
|
||||
"//databuild/test/app:job_src",
|
||||
],
|
||||
)
|
||||
|
||||
py_test(
|
||||
name = "test_color_vote_report_calc",
|
||||
srcs = ["jobs/color_vote_report_calc/test.py"],
|
||||
main = "jobs/color_vote_report_calc/test.py",
|
||||
deps = [
|
||||
":job_src",
|
||||
"//databuild/test/app:job_src",
|
||||
],
|
||||
)
|
||||
|
||||
py_test(
|
||||
name = "test_graph_analysis",
|
||||
srcs = ["graph/graph_test.py"],
|
||||
data = [
|
||||
":bazel_graph.analyze",
|
||||
":bazel_graph_lookup",
|
||||
],
|
||||
main = "graph/graph_test.py",
|
||||
deps = [
|
||||
":job_src",
|
||||
"//databuild/test/app:job_src",
|
||||
],
|
||||
)
|
||||
|
||||
# Bazel-defined
|
||||
## Graph
|
||||
databuild_graph(
|
||||
name = "bazel_graph",
|
||||
jobs = [
|
||||
":ingest_color_votes",
|
||||
":trailing_color_votes",
|
||||
":aggregate_color_votes",
|
||||
":color_vote_report_calc",
|
||||
],
|
||||
lookup = ":bazel_graph_lookup",
|
||||
)
|
||||
|
||||
py_binary(
|
||||
name = "bazel_graph_lookup",
|
||||
srcs = ["graph/lookup.py"],
|
||||
main = "graph/lookup.py",
|
||||
)
|
||||
|
||||
## Ingest Color Votes
|
||||
databuild_job(
|
||||
name = "ingest_color_votes",
|
||||
binary = ":ingest_color_votes_binary",
|
||||
)
|
||||
|
||||
py_binary(
|
||||
name = "ingest_color_votes_binary",
|
||||
srcs = ["jobs/ingest_color_votes/main.py"],
|
||||
main = "jobs/ingest_color_votes/main.py",
|
||||
deps = [
|
||||
":job_src",
|
||||
"//databuild/test/app:job_src",
|
||||
],
|
||||
)
|
||||
|
||||
## Trailing Color Votes
|
||||
databuild_job(
|
||||
name = "trailing_color_votes",
|
||||
binary = ":trailing_color_votes_binary",
|
||||
)
|
||||
|
||||
py_binary(
|
||||
name = "trailing_color_votes_binary",
|
||||
srcs = ["jobs/trailing_color_votes/main.py"],
|
||||
main = "jobs/trailing_color_votes/main.py",
|
||||
deps = [
|
||||
":job_src",
|
||||
"//databuild/test/app:job_src",
|
||||
],
|
||||
)
|
||||
|
||||
## Aggregate Color Votes
|
||||
databuild_job(
|
||||
name = "aggregate_color_votes",
|
||||
binary = ":aggregate_color_votes_binary",
|
||||
)
|
||||
|
||||
py_binary(
|
||||
name = "aggregate_color_votes_binary",
|
||||
srcs = ["jobs/aggregate_color_votes/main.py"],
|
||||
main = "jobs/aggregate_color_votes/main.py",
|
||||
deps = [
|
||||
":job_src",
|
||||
"//databuild/test/app:job_src",
|
||||
],
|
||||
)
|
||||
|
||||
## Color Vote Report Calc
|
||||
databuild_job(
|
||||
name = "color_vote_report_calc",
|
||||
binary = ":color_vote_report_calc_binary",
|
||||
)
|
||||
|
||||
py_binary(
|
||||
name = "color_vote_report_calc_binary",
|
||||
srcs = ["jobs/color_vote_report_calc/main.py"],
|
||||
main = "jobs/color_vote_report_calc/main.py",
|
||||
deps = [
|
||||
":job_src",
|
||||
"//databuild/test/app:job_src",
|
||||
],
|
||||
)
|
||||
4
databuild/test/app/bazel/README.md
Normal file
4
databuild/test/app/bazel/README.md
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
|
||||
# Bazel-Based Graph Definition
|
||||
|
||||
The bazel-based graph definition relies on declaring `databuild_job` and `databuild_graph` targets which reference binaries.
|
||||
91
databuild/test/app/bazel/graph/graph_test.py
Normal file
91
databuild/test/app/bazel/graph/graph_test.py
Normal file
|
|
@ -0,0 +1,91 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Integration test for the databuild graph analysis.
|
||||
|
||||
This test verifies that when we request color vote reports, the graph analyzer
|
||||
correctly identifies all upstream dependencies and jobs required.
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import json
|
||||
import unittest
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class GraphAnalysisTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Determine the path to bazel_graph.analyze
|
||||
# In bazel test, we need to find the executable in the runfiles
|
||||
runfiles_dir = os.environ.get('RUNFILES_DIR')
|
||||
test_srcdir = os.environ.get('TEST_SRCDIR')
|
||||
|
||||
possible_paths = []
|
||||
if runfiles_dir:
|
||||
possible_paths.append(os.path.join(runfiles_dir, '_main', 'databuild', 'test', 'app', 'bazel_graph.analyze'))
|
||||
possible_paths.append(os.path.join(runfiles_dir, 'databuild', 'test', 'app', 'bazel_graph.analyze'))
|
||||
|
||||
if test_srcdir:
|
||||
possible_paths.append(os.path.join(test_srcdir, '_main', 'databuild', 'test', 'app', 'bazel_graph.analyze'))
|
||||
possible_paths.append(os.path.join(test_srcdir, 'databuild', 'test', 'app', 'bazel_graph.analyze'))
|
||||
|
||||
# Fallback for local testing
|
||||
possible_paths.extend([
|
||||
'bazel-bin/databuild/test/app/bazel_graph.analyze',
|
||||
'./bazel_graph.analyze'
|
||||
])
|
||||
|
||||
self.graph_analyze = None
|
||||
for path in possible_paths:
|
||||
if os.path.exists(path):
|
||||
self.graph_analyze = path
|
||||
break
|
||||
|
||||
# Ensure the executable exists
|
||||
if not self.graph_analyze:
|
||||
self.skipTest(f"Graph analyze executable not found in any of these paths: {possible_paths}")
|
||||
|
||||
def run_graph_analyze(self, partition_refs):
|
||||
"""Run graph.analyze with the given partition references."""
|
||||
cmd = [self.graph_analyze] + partition_refs
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, cwd=os.getcwd())
|
||||
|
||||
if result.returncode != 0:
|
||||
self.fail(f"Graph analyze failed with return code {result.returncode}.\nStdout: {result.stdout}\nStderr: {result.stderr}")
|
||||
|
||||
# Parse the JSON output
|
||||
try:
|
||||
return json.loads(result.stdout)
|
||||
except json.JSONDecodeError as e:
|
||||
self.fail(f"Failed to parse JSON output: {e}\nOutput: {result.stdout}")
|
||||
|
||||
def test_single_color_report_dependencies(self):
|
||||
"""Test dependencies for a single color vote report."""
|
||||
partition_refs = ["color_vote_report/2024-01-15/red"]
|
||||
result = self.run_graph_analyze(partition_refs)
|
||||
self.assertIn('nodes', result)
|
||||
# TODO expand
|
||||
|
||||
def test_multiple_color_reports_same_date(self):
|
||||
"""Test dependencies when requesting multiple colors for the same date."""
|
||||
partition_refs = [
|
||||
"color_vote_report/2024-01-15/red",
|
||||
"color_vote_report/2024-01-15/blue"
|
||||
]
|
||||
result = self.run_graph_analyze(partition_refs)
|
||||
self.assertIn('nodes', result)
|
||||
# TODO expand
|
||||
|
||||
def test_multiple_dates_dependencies(self):
|
||||
"""Test dependencies when requesting reports for different dates."""
|
||||
partition_refs = [
|
||||
"color_vote_report/2024-01-15/red",
|
||||
"color_vote_report/2024-01-16/red"
|
||||
]
|
||||
result = self.run_graph_analyze(partition_refs)
|
||||
self.assertIn('nodes', result)
|
||||
# TODO expand
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
29
databuild/test/app/bazel/graph/lookup.py
Normal file
29
databuild/test/app/bazel/graph/lookup.py
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from collections import defaultdict
|
||||
import sys
|
||||
import json
|
||||
|
||||
LABEL_BASE = "//databuild/test/app"
|
||||
|
||||
|
||||
def lookup(raw_ref: str):
|
||||
if raw_ref.startswith("daily_color_votes"):
|
||||
return LABEL_BASE + ":ingest_color_votes"
|
||||
elif raw_ref.startswith("color_votes_1"):
|
||||
return LABEL_BASE + ":trailing_color_votes"
|
||||
elif raw_ref.startswith("daily_votes") or raw_ref.startswith("votes_1w") or raw_ref.startswith("votes_1m"):
|
||||
return LABEL_BASE + ":aggregate_color_votes"
|
||||
elif raw_ref.startswith("color_vote_report"):
|
||||
return LABEL_BASE + ":color_vote_report_calc"
|
||||
else:
|
||||
raise ValueError(f"Unable to resolve job for partition: `{raw_ref}`")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
results = defaultdict(list)
|
||||
for raw_ref in sys.argv[1:]:
|
||||
results[lookup(raw_ref)].append(raw_ref)
|
||||
|
||||
# Output the results as JSON
|
||||
print(json.dumps(dict(results)))
|
||||
0
databuild/test/app/bazel/graph/test.py
Normal file
0
databuild/test/app/bazel/graph/test.py
Normal file
1
databuild/test/app/bazel/jobs/aggregate_color_votes/README.md
Symbolic link
1
databuild/test/app/bazel/jobs/aggregate_color_votes/README.md
Symbolic link
|
|
@ -0,0 +1 @@
|
|||
jobs/aggregate_color_votes/README.md
|
||||
|
|
@ -0,0 +1,42 @@
|
|||
from databuild.proto import PartitionRef, JobConfigureResponse, JobConfig, DepType, DataDep
|
||||
from databuild.test.app.colors import COLORS
|
||||
from datetime import date
|
||||
|
||||
def configure(outputs: list[PartitionRef]) -> JobConfigureResponse:
|
||||
configs = []
|
||||
|
||||
for output in outputs:
|
||||
parts = output.str.split("/")
|
||||
if len(parts) == 2:
|
||||
output_type, data_date = parts
|
||||
date.fromisoformat(data_date) # Validate date format
|
||||
|
||||
# Determine input type based on output type
|
||||
if output_type == "daily_votes":
|
||||
input_prefix = "daily_color_votes"
|
||||
elif output_type == "votes_1w":
|
||||
input_prefix = "color_votes_1w"
|
||||
elif output_type == "votes_1m":
|
||||
input_prefix = "color_votes_1m"
|
||||
else:
|
||||
raise ValueError(f"Unknown output type: {output_type}")
|
||||
|
||||
# Create inputs for all colors
|
||||
inputs = []
|
||||
for color in COLORS:
|
||||
input_ref = PartitionRef(str=f"{input_prefix}/{data_date}/{color}")
|
||||
inputs.append(input_ref)
|
||||
|
||||
configs.append(JobConfig(
|
||||
outputs=[output],
|
||||
inputs=[DataDep(dep_type_code=DepType.MATERIALIZE, dep_type_name="materialize", partition_ref=ref) for ref in inputs],
|
||||
args=[],
|
||||
env={
|
||||
"DATA_DATE": data_date,
|
||||
"AGGREGATE_TYPE": output_type
|
||||
}
|
||||
))
|
||||
else:
|
||||
raise ValueError(f"Invalid output partition format: {output.str}")
|
||||
|
||||
return JobConfigureResponse(configs=configs)
|
||||
20
databuild/test/app/bazel/jobs/aggregate_color_votes/main.py
Normal file
20
databuild/test/app/bazel/jobs/aggregate_color_votes/main.py
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
"""Main entrypoint for the aggregate_color_votes job for use with bazel-defined graph."""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
from databuild.proto import PartitionRef, to_dict
|
||||
from databuild.test.app.bazel.jobs.aggregate_color_votes.config import configure
|
||||
from databuild.test.app.jobs.aggregate_color_votes.execute import execute
|
||||
|
||||
if __name__ == "__main__":
|
||||
if sys.argv[1] == "config":
|
||||
response = configure([
|
||||
PartitionRef(str=raw_ref)
|
||||
for raw_ref in sys.argv[2:]
|
||||
])
|
||||
print(json.dumps(to_dict(response)))
|
||||
elif sys.argv[1] == "exec":
|
||||
execute(os.environ["DATA_DATE"], os.environ["AGGREGATE_TYPE"])
|
||||
else:
|
||||
raise Exception(f"Invalid command `{sys.argv[1]}`")
|
||||
59
databuild/test/app/bazel/jobs/aggregate_color_votes/test.py
Normal file
59
databuild/test/app/bazel/jobs/aggregate_color_votes/test.py
Normal file
|
|
@ -0,0 +1,59 @@
|
|||
import unittest
|
||||
from databuild.proto import PartitionRef
|
||||
from databuild.test.app.bazel.jobs.aggregate_color_votes.config import configure
|
||||
from databuild.test.app.colors import COLORS
|
||||
|
||||
class TestAggregateColorVotesConfig(unittest.TestCase):
|
||||
def test_configure_daily_votes(self):
|
||||
outputs = [PartitionRef(str="daily_votes/2024-01-15")]
|
||||
response = configure(outputs)
|
||||
|
||||
self.assertEqual(len(response.configs), 1)
|
||||
config = response.configs[0]
|
||||
self.assertEqual(len(config.outputs), 1)
|
||||
self.assertEqual(len(config.inputs), len(COLORS)) # One input per color
|
||||
self.assertEqual(config.env["AGGREGATE_TYPE"], "daily_votes")
|
||||
self.assertEqual(config.env["DATA_DATE"], "2024-01-15")
|
||||
|
||||
# Check that inputs are from daily_color_votes
|
||||
for i, color in enumerate(COLORS):
|
||||
expected_input = f"daily_color_votes/2024-01-15/{color}"
|
||||
self.assertEqual(config.inputs[i].partition_ref.str, expected_input)
|
||||
|
||||
def test_configure_weekly_votes(self):
|
||||
outputs = [PartitionRef(str="votes_1w/2024-01-21")]
|
||||
response = configure(outputs)
|
||||
|
||||
self.assertEqual(len(response.configs), 1)
|
||||
config = response.configs[0]
|
||||
self.assertEqual(config.env["AGGREGATE_TYPE"], "votes_1w")
|
||||
|
||||
# Check that inputs are from color_votes_1w
|
||||
for i, color in enumerate(COLORS):
|
||||
expected_input = f"color_votes_1w/2024-01-21/{color}"
|
||||
self.assertEqual(config.inputs[i].partition_ref.str, expected_input)
|
||||
|
||||
def test_configure_monthly_votes(self):
|
||||
outputs = [PartitionRef(str="votes_1m/2024-01-31")]
|
||||
response = configure(outputs)
|
||||
|
||||
self.assertEqual(len(response.configs), 1)
|
||||
config = response.configs[0]
|
||||
self.assertEqual(config.env["AGGREGATE_TYPE"], "votes_1m")
|
||||
|
||||
# Check that inputs are from color_votes_1m
|
||||
for i, color in enumerate(COLORS):
|
||||
expected_input = f"color_votes_1m/2024-01-31/{color}"
|
||||
self.assertEqual(config.inputs[i].partition_ref.str, expected_input)
|
||||
|
||||
def test_configure_multiple_outputs(self):
|
||||
outputs = [
|
||||
PartitionRef(str="daily_votes/2024-01-15"),
|
||||
PartitionRef(str="votes_1w/2024-01-21")
|
||||
]
|
||||
response = configure(outputs)
|
||||
|
||||
self.assertEqual(len(response.configs), 2) # One config per output
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
1
databuild/test/app/bazel/jobs/color_vote_report_calc/README.md
Symbolic link
1
databuild/test/app/bazel/jobs/color_vote_report_calc/README.md
Symbolic link
|
|
@ -0,0 +1 @@
|
|||
jobs/color_vote_report_calc/README.md
|
||||
|
|
@ -0,0 +1,48 @@
|
|||
from databuild.proto import PartitionRef, JobConfigureResponse, JobConfig, DataDep, DepType
|
||||
from datetime import date
|
||||
from collections import defaultdict
|
||||
|
||||
def configure(outputs: list[PartitionRef]) -> JobConfigureResponse:
|
||||
# This job produces a single job config that handles all requested outputs
|
||||
all_dates = set()
|
||||
all_colors = set()
|
||||
|
||||
for output in outputs:
|
||||
parts = output.str.split("/")
|
||||
if len(parts) == 3 and parts[0] == "color_vote_report":
|
||||
prefix, data_date, color = parts
|
||||
date.fromisoformat(data_date) # Validate date format
|
||||
all_dates.add(data_date)
|
||||
all_colors.add(color)
|
||||
else:
|
||||
raise ValueError(f"Invalid output partition format: {output.str}")
|
||||
|
||||
# Build inputs for all dates and colors that are actually requested
|
||||
inputs = []
|
||||
|
||||
# Add total vote aggregates for all dates
|
||||
for data_date in all_dates:
|
||||
inputs.extend([
|
||||
PartitionRef(str=f"daily_votes/{data_date}"),
|
||||
PartitionRef(str=f"votes_1w/{data_date}"),
|
||||
PartitionRef(str=f"votes_1m/{data_date}")
|
||||
])
|
||||
|
||||
# Add color-specific inputs for all date/color combinations that are requested
|
||||
for output in outputs:
|
||||
data_date, color = output.str.split("/")[1], output.str.split("/")[2]
|
||||
inputs.extend([
|
||||
PartitionRef(str=f"daily_color_votes/{data_date}/{color}"),
|
||||
PartitionRef(str=f"color_votes_1w/{data_date}/{color}"),
|
||||
PartitionRef(str=f"color_votes_1m/{data_date}/{color}")
|
||||
])
|
||||
|
||||
# Single job config for all outputs - pass output partition refs as args
|
||||
config = JobConfig(
|
||||
outputs=outputs,
|
||||
inputs=[DataDep(dep_type_code=DepType.MATERIALIZE, dep_type_name="materialize", partition_ref=ref) for ref in inputs],
|
||||
args=[output.str for output in outputs],
|
||||
env={}
|
||||
)
|
||||
|
||||
return JobConfigureResponse(configs=[config])
|
||||
20
databuild/test/app/bazel/jobs/color_vote_report_calc/main.py
Normal file
20
databuild/test/app/bazel/jobs/color_vote_report_calc/main.py
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
"""Main entrypoint for the color_vote_report_calc job for use with bazel-defined graph."""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
from databuild.proto import PartitionRef, to_dict
|
||||
from databuild.test.app.bazel.jobs.color_vote_report_calc.config import configure
|
||||
from databuild.test.app.jobs.color_vote_report_calc.execute import execute
|
||||
|
||||
if __name__ == "__main__":
|
||||
if sys.argv[1] == "config":
|
||||
response = configure([
|
||||
PartitionRef(str=raw_ref)
|
||||
for raw_ref in sys.argv[2:]
|
||||
])
|
||||
print(json.dumps(to_dict(response)))
|
||||
elif sys.argv[1] == "exec":
|
||||
execute(sys.argv[2:])
|
||||
else:
|
||||
raise Exception(f"Invalid command `{sys.argv[1]}`")
|
||||
60
databuild/test/app/bazel/jobs/color_vote_report_calc/test.py
Normal file
60
databuild/test/app/bazel/jobs/color_vote_report_calc/test.py
Normal file
|
|
@ -0,0 +1,60 @@
|
|||
import unittest
|
||||
from databuild.proto import PartitionRef
|
||||
from databuild.test.app.bazel.jobs.color_vote_report_calc.config import configure
|
||||
|
||||
class TestColorVoteReportCalcConfig(unittest.TestCase):
|
||||
def test_configure_single_output(self):
|
||||
outputs = [PartitionRef(str="color_vote_report/2024-01-15/red")]
|
||||
response = configure(outputs)
|
||||
|
||||
self.assertEqual(len(response.configs), 1) # Always single config
|
||||
config = response.configs[0]
|
||||
self.assertEqual(len(config.outputs), 1)
|
||||
self.assertEqual(config.args, ["color_vote_report/2024-01-15/red"])
|
||||
|
||||
# Should have inputs for total votes and color-specific votes
|
||||
expected_inputs = [
|
||||
"daily_votes/2024-01-15",
|
||||
"votes_1w/2024-01-15",
|
||||
"votes_1m/2024-01-15",
|
||||
"daily_color_votes/2024-01-15/red",
|
||||
"color_votes_1w/2024-01-15/red",
|
||||
"color_votes_1m/2024-01-15/red"
|
||||
]
|
||||
actual_inputs = [inp.partition_ref.str for inp in config.inputs]
|
||||
for expected in expected_inputs:
|
||||
self.assertIn(expected, actual_inputs)
|
||||
|
||||
def test_configure_multiple_outputs_same_date(self):
|
||||
outputs = [
|
||||
PartitionRef(str="color_vote_report/2024-01-15/red"),
|
||||
PartitionRef(str="color_vote_report/2024-01-15/blue")
|
||||
]
|
||||
response = configure(outputs)
|
||||
|
||||
self.assertEqual(len(response.configs), 1) # Single config for all outputs
|
||||
config = response.configs[0]
|
||||
self.assertEqual(len(config.outputs), 2)
|
||||
self.assertEqual(set(config.args), {
|
||||
"color_vote_report/2024-01-15/red",
|
||||
"color_vote_report/2024-01-15/blue"
|
||||
})
|
||||
|
||||
def test_configure_multiple_dates(self):
|
||||
outputs = [
|
||||
PartitionRef(str="color_vote_report/2024-01-15/red"),
|
||||
PartitionRef(str="color_vote_report/2024-01-16/red")
|
||||
]
|
||||
response = configure(outputs)
|
||||
|
||||
self.assertEqual(len(response.configs), 1) # Single config for all outputs
|
||||
config = response.configs[0]
|
||||
self.assertEqual(len(config.outputs), 2)
|
||||
|
||||
# Should have total vote inputs for both dates
|
||||
actual_inputs = [inp.partition_ref.str for inp in config.inputs]
|
||||
self.assertIn("daily_votes/2024-01-15", actual_inputs)
|
||||
self.assertIn("daily_votes/2024-01-16", actual_inputs)
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
1
databuild/test/app/bazel/jobs/ingest_color_votes/README.md
Symbolic link
1
databuild/test/app/bazel/jobs/ingest_color_votes/README.md
Symbolic link
|
|
@ -0,0 +1 @@
|
|||
jobs/ingest_color_votes/README.md
|
||||
|
|
@ -2,17 +2,19 @@
|
|||
|
||||
import sys
|
||||
import os
|
||||
from databuild.proto import PartitionRef
|
||||
from databuild.test.app.jobs.ingest_color_votes.config import configure
|
||||
import json
|
||||
from databuild.proto import PartitionRef, to_dict
|
||||
from databuild.test.app.bazel.jobs.ingest_color_votes.config import configure
|
||||
from databuild.test.app.jobs.ingest_color_votes.execute import execute
|
||||
|
||||
if __name__ == "__main__":
|
||||
if sys.argv[1] == "config":
|
||||
configure([
|
||||
response = configure([
|
||||
PartitionRef(str=raw_ref)
|
||||
for raw_ref in sys.argv[2:]
|
||||
])
|
||||
elif sys.argv[1] == "execute":
|
||||
print(json.dumps(to_dict(response)))
|
||||
elif sys.argv[1] == "exec":
|
||||
execute(os.environ["DATA_DATE"], os.environ["COLOR"])
|
||||
else:
|
||||
raise Exception(f"Invalid command `{sys.argv[1]}`")
|
||||
32
databuild/test/app/bazel/jobs/ingest_color_votes/test.py
Normal file
32
databuild/test/app/bazel/jobs/ingest_color_votes/test.py
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
from databuild.test.app.bazel.jobs.ingest_color_votes.config import configure
|
||||
from databuild.proto import PartitionRef
|
||||
|
||||
|
||||
def test_ingest_color_votes_configure():
|
||||
refs_single = [PartitionRef(str="daily_color_votes/2025-01-01/red")]
|
||||
config_single = configure(refs_single)
|
||||
assert len(config_single.configs) == 1
|
||||
assert config_single.configs[0].outputs[0].str == "daily_color_votes/2025-01-01/red"
|
||||
assert config_single.configs[0].env["COLOR"] == "red"
|
||||
assert config_single.configs[0].env["DATA_DATE"] == "2025-01-01"
|
||||
|
||||
refs_multiple = [
|
||||
PartitionRef(str="daily_color_votes/2025-01-02/red"),
|
||||
PartitionRef(str="daily_color_votes/2025-01-02/blue"),
|
||||
]
|
||||
|
||||
config_multiple = configure(refs_multiple)
|
||||
assert len(config_multiple.configs) == 2
|
||||
assert len(config_multiple.configs[0].outputs) == 1
|
||||
assert config_multiple.configs[0].outputs[0].str == "daily_color_votes/2025-01-02/red"
|
||||
assert config_multiple.configs[0].env["COLOR"] == "red"
|
||||
assert config_multiple.configs[0].env["DATA_DATE"] == "2025-01-02"
|
||||
assert len(config_multiple.configs[1].outputs) == 1
|
||||
assert config_multiple.configs[1].outputs[0].str == "daily_color_votes/2025-01-02/blue"
|
||||
assert config_multiple.configs[1].env["COLOR"] == "blue"
|
||||
assert config_multiple.configs[1].env["DATA_DATE"] == "2025-01-02"
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import pytest
|
||||
raise SystemExit(pytest.main([__file__]))
|
||||
1
databuild/test/app/bazel/jobs/trailing_color_votes/README.md
Symbolic link
1
databuild/test/app/bazel/jobs/trailing_color_votes/README.md
Symbolic link
|
|
@ -0,0 +1 @@
|
|||
jobs/trailing_color_votes/README.md
|
||||
46
databuild/test/app/bazel/jobs/trailing_color_votes/config.py
Normal file
46
databuild/test/app/bazel/jobs/trailing_color_votes/config.py
Normal file
|
|
@ -0,0 +1,46 @@
|
|||
from databuild.proto import PartitionRef, JobConfigureResponse, JobConfig, DepType, DataDep
|
||||
from datetime import date, timedelta
|
||||
from collections import defaultdict
|
||||
|
||||
def configure(outputs: list[PartitionRef]) -> JobConfigureResponse:
|
||||
# Group outputs by date and color
|
||||
grouped_outputs = defaultdict(list)
|
||||
|
||||
for output in outputs:
|
||||
parts = output.str.split("/")
|
||||
if len(parts) == 3 and parts[0] in ["color_votes_1w", "color_votes_1m"]:
|
||||
grouped_outputs[tuple(parts[1:])].append(output)
|
||||
else:
|
||||
raise ValueError(f"Invalid output partition format: {output.str}")
|
||||
|
||||
configs = []
|
||||
for (data_date, color), output_partitions in grouped_outputs.items():
|
||||
# Parse the output date
|
||||
output_date = date.fromisoformat(data_date)
|
||||
|
||||
# Determine which windows are needed and the maximum window
|
||||
has_weekly = any(output.str.startswith("color_votes_1w/") for output in output_partitions)
|
||||
has_monthly = any(output.str.startswith("color_votes_1m/") for output in output_partitions)
|
||||
max_window = max(7 if has_weekly else 0, 28 if has_monthly else 0)
|
||||
|
||||
# Generate input partition refs for the required trailing window
|
||||
inputs = []
|
||||
for i in range(max_window):
|
||||
input_date = output_date - timedelta(days=i)
|
||||
inputs.append(PartitionRef(str=f"daily_color_votes/{input_date.isoformat()}/{color}"))
|
||||
|
||||
env = {
|
||||
"DATA_DATE": data_date,
|
||||
"COLOR": color,
|
||||
"WEEKLY": "true" if has_weekly else "false",
|
||||
"MONTHLY": "true" if has_monthly else "false"
|
||||
}
|
||||
|
||||
configs.append(JobConfig(
|
||||
outputs=output_partitions,
|
||||
inputs=[DataDep(dep_type_code=DepType.MATERIALIZE, dep_type_name="materialize", partition_ref=ref) for ref in inputs],
|
||||
args=[],
|
||||
env=env
|
||||
))
|
||||
|
||||
return JobConfigureResponse(configs=configs)
|
||||
20
databuild/test/app/bazel/jobs/trailing_color_votes/main.py
Normal file
20
databuild/test/app/bazel/jobs/trailing_color_votes/main.py
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
"""Main entrypoint for the trailing_color_votes job for use with bazel-defined graph."""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
from databuild.proto import PartitionRef, to_dict
|
||||
from databuild.test.app.bazel.jobs.trailing_color_votes.config import configure
|
||||
from databuild.test.app.jobs.trailing_color_votes.execute import execute
|
||||
|
||||
if __name__ == "__main__":
|
||||
if sys.argv[1] == "config":
|
||||
response = configure([
|
||||
PartitionRef(str=raw_ref)
|
||||
for raw_ref in sys.argv[2:]
|
||||
])
|
||||
print(json.dumps(to_dict(response)))
|
||||
elif sys.argv[1] == "exec":
|
||||
execute(os.environ["DATA_DATE"], os.environ["COLOR"])
|
||||
else:
|
||||
raise Exception(f"Invalid command `{sys.argv[1]}`")
|
||||
53
databuild/test/app/bazel/jobs/trailing_color_votes/test.py
Normal file
53
databuild/test/app/bazel/jobs/trailing_color_votes/test.py
Normal file
|
|
@ -0,0 +1,53 @@
|
|||
import unittest
|
||||
from databuild.proto import PartitionRef
|
||||
from databuild.test.app.bazel.jobs.trailing_color_votes.config import configure
|
||||
|
||||
class TestTrailingColorVotesConfig(unittest.TestCase):
|
||||
def test_configure_weekly_only(self):
|
||||
outputs = [PartitionRef(str="color_votes_1w/2024-01-07/red")]
|
||||
response = configure(outputs)
|
||||
|
||||
self.assertEqual(len(response.configs), 1)
|
||||
config = response.configs[0]
|
||||
self.assertEqual(len(config.outputs), 1)
|
||||
self.assertEqual(len(config.inputs), 7) # 7 days for weekly
|
||||
self.assertEqual(config.env["WEEKLY"], "true")
|
||||
self.assertEqual(config.env["MONTHLY"], "false")
|
||||
|
||||
def test_configure_monthly_only(self):
|
||||
outputs = [PartitionRef(str="color_votes_1m/2024-01-28/blue")]
|
||||
response = configure(outputs)
|
||||
|
||||
self.assertEqual(len(response.configs), 1)
|
||||
config = response.configs[0]
|
||||
self.assertEqual(len(config.outputs), 1)
|
||||
self.assertEqual(len(config.inputs), 28) # 28 days for monthly
|
||||
self.assertEqual(config.env["WEEKLY"], "false")
|
||||
self.assertEqual(config.env["MONTHLY"], "true")
|
||||
|
||||
def test_configure_both_weekly_and_monthly(self):
|
||||
outputs = [
|
||||
PartitionRef(str="color_votes_1w/2024-01-28/green"),
|
||||
PartitionRef(str="color_votes_1m/2024-01-28/green")
|
||||
]
|
||||
response = configure(outputs)
|
||||
|
||||
self.assertEqual(len(response.configs), 1) # Single config for same date/color
|
||||
config = response.configs[0]
|
||||
self.assertEqual(len(config.outputs), 2) # Both outputs
|
||||
self.assertEqual(len(config.inputs), 28) # 28 days (max of 7 and 28)
|
||||
self.assertEqual(config.env["WEEKLY"], "true")
|
||||
self.assertEqual(config.env["MONTHLY"], "true")
|
||||
|
||||
def test_configure_multiple_colors_dates(self):
|
||||
outputs = [
|
||||
PartitionRef(str="color_votes_1w/2024-01-07/red"),
|
||||
PartitionRef(str="color_votes_1w/2024-01-07/blue"),
|
||||
PartitionRef(str="color_votes_1m/2024-01-14/red")
|
||||
]
|
||||
response = configure(outputs)
|
||||
|
||||
self.assertEqual(len(response.configs), 3) # One config per unique date/color combination
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
13
databuild/test/app/dsl/BUILD.bazel
Normal file
13
databuild/test/app/dsl/BUILD.bazel
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
py_library(
|
||||
name = "dsl_src",
|
||||
srcs = glob(
|
||||
["*.py"],
|
||||
exclude = ["test_*.py"],
|
||||
),
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//databuild:py_proto",
|
||||
"//databuild/dsl/python:dsl",
|
||||
"//databuild/test/app:job_src",
|
||||
],
|
||||
)
|
||||
130
databuild/test/app/dsl/graph.py
Normal file
130
databuild/test/app/dsl/graph.py
Normal file
|
|
@ -0,0 +1,130 @@
|
|||
"""Python DSL implementation of test app"""
|
||||
|
||||
from collections import defaultdict
|
||||
from databuild.dsl.python.dsl import DataBuildGraph, DataBuildJob, JobConfigBuilder
|
||||
from databuild.proto import JobConfig
|
||||
from databuild.test.app.colors import COLORS
|
||||
from databuild.test.app.jobs.ingest_color_votes.execute import execute as ingest_color_votes_exec
|
||||
from databuild.test.app.jobs.trailing_color_votes.execute import execute as trailing_color_votes_exec
|
||||
from databuild.test.app.jobs.aggregate_color_votes.execute import execute as aggregate_color_votes_exec
|
||||
from databuild.test.app.jobs.color_vote_report_calc.execute import execute as color_vote_report_calc_exec
|
||||
from databuild.test.app.dsl.partitions import (
|
||||
IngestedColorPartition,
|
||||
TrailingColorVotes1MPartition,
|
||||
TrailingColorVotes1WPartition,
|
||||
DailyVotesPartition,
|
||||
Votes1WPartition,
|
||||
Votes1MPartition,
|
||||
ColorVoteReportPartition
|
||||
)
|
||||
from datetime import date, timedelta
|
||||
|
||||
graph = DataBuildGraph("//databuild/test/app:dsl_graph")
|
||||
|
||||
|
||||
@graph.job
|
||||
class IngestColorVotes(DataBuildJob):
|
||||
output_types = [IngestedColorPartition]
|
||||
|
||||
def config(self, outputs: list[IngestedColorPartition]) -> list[JobConfig]:
|
||||
configs = []
|
||||
for output in outputs:
|
||||
env = {"DATA_DATE": output.data_date, "COLOR": output.color}
|
||||
configs.append(JobConfigBuilder().add_outputs(output).set_env(env).build())
|
||||
return configs
|
||||
|
||||
def exec(self, config: JobConfig) -> None:
|
||||
ingest_color_votes_exec(data_date=config.env["DATA_DATE"], color=config.env["COLOR"])
|
||||
|
||||
|
||||
@graph.job
|
||||
class TrailingColorVotes(DataBuildJob):
|
||||
output_types = [TrailingColorVotes1MPartition, TrailingColorVotes1WPartition]
|
||||
|
||||
def config(self, outputs: list[TrailingColorVotes1MPartition | TrailingColorVotes1WPartition]) -> list[JobConfig]:
|
||||
groups = defaultdict(list)
|
||||
for output in outputs:
|
||||
groups[(output.data_date, output.color)].append(output)
|
||||
|
||||
configs = []
|
||||
for (data_date, color), outputs in groups.items():
|
||||
weekly = "false"
|
||||
monthly = "false"
|
||||
max_window = 0
|
||||
for output in outputs:
|
||||
if isinstance(output, TrailingColorVotes1WPartition):
|
||||
weekly = "true"
|
||||
max_window = max(max_window, 7)
|
||||
elif isinstance(output, TrailingColorVotes1MPartition):
|
||||
monthly = "true"
|
||||
max_window = max(max_window, 28)
|
||||
|
||||
env = {"DATA_DATE": data_date, "COLOR": color, "WEEKLY": weekly, "MONTHLY": monthly}
|
||||
config = JobConfigBuilder(env=env).add_outputs(*outputs)
|
||||
for i in range(max_window):
|
||||
in_date = (date.fromisoformat(data_date) - timedelta(days=i)).isoformat()
|
||||
config.add_inputs(IngestedColorPartition(data_date=in_date, color=color))
|
||||
|
||||
configs.append(config.build())
|
||||
return configs
|
||||
|
||||
def exec(self, config: JobConfig) -> None:
|
||||
trailing_color_votes_exec(data_date=config.env["DATA_DATE"], color=config.env["COLOR"])
|
||||
|
||||
|
||||
@graph.job
|
||||
class AggregateColorVotes(DataBuildJob):
|
||||
output_types = [DailyVotesPartition, Votes1WPartition, Votes1MPartition]
|
||||
|
||||
def config(self, outputs: list[DailyVotesPartition | Votes1WPartition | Votes1MPartition]) -> list[JobConfig]:
|
||||
configs = []
|
||||
|
||||
for output in outputs:
|
||||
if isinstance(output, DailyVotesPartition):
|
||||
InPartition = IngestedColorPartition
|
||||
agg_type = "daily_votes"
|
||||
elif isinstance(output, Votes1WPartition):
|
||||
InPartition = TrailingColorVotes1WPartition
|
||||
agg_type = "votes_1w"
|
||||
elif isinstance(output, Votes1MPartition):
|
||||
InPartition = TrailingColorVotes1MPartition
|
||||
agg_type = "votes_1m"
|
||||
else:
|
||||
raise ValueError(f"Unknown output type: {output.type}")
|
||||
|
||||
inputs = [InPartition(data_date=output.data_date, color=color) for color in COLORS]
|
||||
env = {"DATA_DATE": output.data_date, "AGGREGATE_TYPE": agg_type}
|
||||
configs.append(JobConfigBuilder().add_outputs(output).add_inputs(*inputs).set_env(env).build())
|
||||
|
||||
return configs
|
||||
|
||||
def exec(self, config: JobConfig) -> None:
|
||||
aggregate_color_votes_exec(data_date=config.env["DATA_DATE"], aggregate_type=config.env["AGGREGATE_TYPE"])
|
||||
|
||||
|
||||
@graph.job
|
||||
class ColorVoteReportCalc(DataBuildJob):
|
||||
output_types = [ColorVoteReportPartition]
|
||||
|
||||
def config(self, outputs: list[ColorVoteReportPartition]) -> list[JobConfig]:
|
||||
config = JobConfigBuilder().add_outputs(*outputs).add_args(*[p.serialize() for p in outputs])
|
||||
|
||||
for data_date in set(p.data_date for p in outputs):
|
||||
config.add_inputs(
|
||||
DailyVotesPartition(data_date=data_date),
|
||||
Votes1WPartition(data_date=data_date),
|
||||
Votes1MPartition(data_date=data_date),
|
||||
)
|
||||
|
||||
for output in outputs:
|
||||
config.add_inputs(
|
||||
IngestedColorPartition(data_date=output.data_date, color=output.color),
|
||||
TrailingColorVotes1WPartition(data_date=output.data_date, color=output.color),
|
||||
TrailingColorVotes1MPartition(data_date=output.data_date, color=output.color),
|
||||
)
|
||||
|
||||
return [config.build()]
|
||||
|
||||
def exec(self, config: JobConfig) -> None:
|
||||
color_vote_report_calc_exec(config.args)
|
||||
|
||||
40
databuild/test/app/dsl/partitions.py
Normal file
40
databuild/test/app/dsl/partitions.py
Normal file
|
|
@ -0,0 +1,40 @@
|
|||
from dataclasses import dataclass
|
||||
from databuild.dsl.python.dsl import PartitionPattern
|
||||
|
||||
@dataclass
|
||||
class DatePartitioned:
|
||||
data_date: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class DateColorPartitioned:
|
||||
data_date: str
|
||||
color: str
|
||||
|
||||
|
||||
class IngestedColorPartition(DateColorPartitioned, PartitionPattern):
|
||||
_raw_pattern = r"daily_color_votes/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)"
|
||||
|
||||
|
||||
class TrailingColorVotes1WPartition(DateColorPartitioned, PartitionPattern):
|
||||
_raw_pattern = r"color_votes_1w/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)"
|
||||
|
||||
|
||||
class TrailingColorVotes1MPartition(DateColorPartitioned, PartitionPattern):
|
||||
_raw_pattern = r"color_votes_1m/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)"
|
||||
|
||||
|
||||
class DailyVotesPartition(DatePartitioned, PartitionPattern):
|
||||
_raw_pattern = r"daily_votes/(?P<data_date>\d{4}-\d{2}-\d{2})"
|
||||
|
||||
|
||||
class Votes1WPartition(DatePartitioned, PartitionPattern):
|
||||
_raw_pattern = r"votes_1w/(?P<data_date>\d{4}-\d{2}-\d{2})"
|
||||
|
||||
|
||||
class Votes1MPartition(DatePartitioned, PartitionPattern):
|
||||
_raw_pattern = r"votes_1m/(?P<data_date>\d{4}-\d{2}-\d{2})"
|
||||
|
||||
|
||||
class ColorVoteReportPartition(DateColorPartitioned, PartitionPattern):
|
||||
_raw_pattern = r"color_vote_report/(?P<data_date>\d{4}-\d{2}-\d{2})/(?P<color>[^/]+)"
|
||||
75
databuild/test/app/dsl/test/BUILD.bazel
Normal file
75
databuild/test/app/dsl/test/BUILD.bazel
Normal file
|
|
@ -0,0 +1,75 @@
|
|||
# Individual job configuration tests
|
||||
py_test(
|
||||
name = "test_ingest_color_votes",
|
||||
srcs = ["test_ingest_color_votes.py"],
|
||||
main = "test_ingest_color_votes.py",
|
||||
deps = [
|
||||
"//databuild:py_proto",
|
||||
"//databuild/dsl/python:dsl",
|
||||
"//databuild/test/app:job_src",
|
||||
"//databuild/test/app/dsl:dsl_src",
|
||||
],
|
||||
)
|
||||
|
||||
py_test(
|
||||
name = "test_trailing_color_votes",
|
||||
srcs = ["test_trailing_color_votes.py"],
|
||||
main = "test_trailing_color_votes.py",
|
||||
deps = [
|
||||
"//databuild:py_proto",
|
||||
"//databuild/dsl/python:dsl",
|
||||
"//databuild/test/app:job_src",
|
||||
"//databuild/test/app/dsl:dsl_src",
|
||||
],
|
||||
)
|
||||
|
||||
py_test(
|
||||
name = "test_aggregate_color_votes",
|
||||
srcs = ["test_aggregate_color_votes.py"],
|
||||
main = "test_aggregate_color_votes.py",
|
||||
deps = [
|
||||
"//databuild:py_proto",
|
||||
"//databuild/dsl/python:dsl",
|
||||
"//databuild/test/app:job_src",
|
||||
"//databuild/test/app/dsl:dsl_src",
|
||||
],
|
||||
)
|
||||
|
||||
py_test(
|
||||
name = "test_color_vote_report_calc",
|
||||
srcs = ["test_color_vote_report_calc.py"],
|
||||
main = "test_color_vote_report_calc.py",
|
||||
deps = [
|
||||
"//databuild:py_proto",
|
||||
"//databuild/dsl/python:dsl",
|
||||
"//databuild/test/app:job_src",
|
||||
"//databuild/test/app/dsl:dsl_src",
|
||||
],
|
||||
)
|
||||
|
||||
# Graph analysis test
|
||||
py_test(
|
||||
name = "test_graph_analysis",
|
||||
srcs = ["test_graph_analysis.py"],
|
||||
main = "test_graph_analysis.py",
|
||||
deps = [
|
||||
"//databuild:py_proto",
|
||||
"//databuild/dsl/python:dsl",
|
||||
"//databuild/test/app:job_src",
|
||||
"//databuild/test/app/dsl:dsl_src",
|
||||
],
|
||||
)
|
||||
|
||||
# Bazel vs DSL comparison test
|
||||
py_test(
|
||||
name = "test_bazel_dsl_comparison",
|
||||
srcs = ["test_bazel_dsl_comparison.py"],
|
||||
main = "test_bazel_dsl_comparison.py",
|
||||
deps = [
|
||||
"//databuild:py_proto",
|
||||
"//databuild/dsl/python:dsl",
|
||||
"//databuild/test/app:job_src",
|
||||
"//databuild/test/app/bazel:job_src",
|
||||
"//databuild/test/app/dsl:dsl_src",
|
||||
],
|
||||
)
|
||||
159
databuild/test/app/dsl/test/test_aggregate_color_votes.py
Normal file
159
databuild/test/app/dsl/test/test_aggregate_color_votes.py
Normal file
|
|
@ -0,0 +1,159 @@
|
|||
from databuild.test.app.dsl.graph import AggregateColorVotes
|
||||
from databuild.test.app.dsl.partitions import (
|
||||
DailyVotesPartition,
|
||||
Votes1WPartition,
|
||||
Votes1MPartition,
|
||||
IngestedColorPartition,
|
||||
TrailingColorVotes1WPartition,
|
||||
TrailingColorVotes1MPartition
|
||||
)
|
||||
from databuild.test.app.colors import COLORS
|
||||
from databuild.proto import DepType
|
||||
|
||||
|
||||
def test_aggregate_color_votes_configure_daily_votes():
|
||||
"""Test AggregateColorVotes config method with daily votes output."""
|
||||
job = AggregateColorVotes()
|
||||
outputs = [DailyVotesPartition(data_date="2025-01-15")]
|
||||
|
||||
configs = job.config(outputs)
|
||||
|
||||
assert len(configs) == 1
|
||||
config = configs[0]
|
||||
assert len(config.outputs) == 1
|
||||
assert config.outputs[0].str == "daily_votes/2025-01-15"
|
||||
assert config.env["DATA_DATE"] == "2025-01-15"
|
||||
assert config.env["AGGREGATE_TYPE"] == "daily_votes"
|
||||
|
||||
# Should have inputs for all colors
|
||||
assert len(config.inputs) == len(COLORS)
|
||||
expected_inputs = {f"daily_color_votes/2025-01-15/{color}" for color in COLORS}
|
||||
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
|
||||
assert actual_inputs == expected_inputs
|
||||
|
||||
# All inputs should be MATERIALIZE type
|
||||
for input_dep in config.inputs:
|
||||
assert input_dep.dep_type_code == DepType.MATERIALIZE
|
||||
assert input_dep.dep_type_name == "materialize"
|
||||
|
||||
|
||||
def test_aggregate_color_votes_configure_votes_1w():
|
||||
"""Test AggregateColorVotes config method with weekly votes output."""
|
||||
job = AggregateColorVotes()
|
||||
outputs = [Votes1WPartition(data_date="2025-01-15")]
|
||||
|
||||
configs = job.config(outputs)
|
||||
|
||||
assert len(configs) == 1
|
||||
config = configs[0]
|
||||
assert len(config.outputs) == 1
|
||||
assert config.outputs[0].str == "votes_1w/2025-01-15"
|
||||
assert config.env["DATA_DATE"] == "2025-01-15"
|
||||
assert config.env["AGGREGATE_TYPE"] == "votes_1w"
|
||||
|
||||
# Should have inputs for all colors from trailing 1w partitions
|
||||
assert len(config.inputs) == len(COLORS)
|
||||
expected_inputs = {f"color_votes_1w/2025-01-15/{color}" for color in COLORS}
|
||||
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
|
||||
assert actual_inputs == expected_inputs
|
||||
|
||||
|
||||
def test_aggregate_color_votes_configure_votes_1m():
|
||||
"""Test AggregateColorVotes config method with monthly votes output."""
|
||||
job = AggregateColorVotes()
|
||||
outputs = [Votes1MPartition(data_date="2025-01-15")]
|
||||
|
||||
configs = job.config(outputs)
|
||||
|
||||
assert len(configs) == 1
|
||||
config = configs[0]
|
||||
assert len(config.outputs) == 1
|
||||
assert config.outputs[0].str == "votes_1m/2025-01-15"
|
||||
assert config.env["DATA_DATE"] == "2025-01-15"
|
||||
assert config.env["AGGREGATE_TYPE"] == "votes_1m"
|
||||
|
||||
# Should have inputs for all colors from trailing 1m partitions
|
||||
assert len(config.inputs) == len(COLORS)
|
||||
expected_inputs = {f"color_votes_1m/2025-01-15/{color}" for color in COLORS}
|
||||
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
|
||||
assert actual_inputs == expected_inputs
|
||||
|
||||
|
||||
def test_aggregate_color_votes_configure_multiple_outputs():
|
||||
"""Test AggregateColorVotes config method with multiple different output types."""
|
||||
job = AggregateColorVotes()
|
||||
outputs = [
|
||||
DailyVotesPartition(data_date="2025-01-15"),
|
||||
Votes1WPartition(data_date="2025-01-16"),
|
||||
Votes1MPartition(data_date="2025-01-17")
|
||||
]
|
||||
|
||||
configs = job.config(outputs)
|
||||
|
||||
assert len(configs) == 3 # One config per output
|
||||
|
||||
# Find configs by date
|
||||
daily_config = None
|
||||
weekly_config = None
|
||||
monthly_config = None
|
||||
|
||||
for config in configs:
|
||||
if config.env["DATA_DATE"] == "2025-01-15":
|
||||
daily_config = config
|
||||
elif config.env["DATA_DATE"] == "2025-01-16":
|
||||
weekly_config = config
|
||||
elif config.env["DATA_DATE"] == "2025-01-17":
|
||||
monthly_config = config
|
||||
|
||||
assert daily_config is not None
|
||||
assert weekly_config is not None
|
||||
assert monthly_config is not None
|
||||
|
||||
# Check daily config
|
||||
assert daily_config.env["AGGREGATE_TYPE"] == "daily_votes"
|
||||
assert daily_config.outputs[0].str == "daily_votes/2025-01-15"
|
||||
assert len(daily_config.inputs) == len(COLORS)
|
||||
assert all("daily_color_votes/2025-01-15/" in inp.partition_ref.str for inp in daily_config.inputs)
|
||||
|
||||
# Check weekly config
|
||||
assert weekly_config.env["AGGREGATE_TYPE"] == "votes_1w"
|
||||
assert weekly_config.outputs[0].str == "votes_1w/2025-01-16"
|
||||
assert len(weekly_config.inputs) == len(COLORS)
|
||||
assert all("color_votes_1w/2025-01-16/" in inp.partition_ref.str for inp in weekly_config.inputs)
|
||||
|
||||
# Check monthly config
|
||||
assert monthly_config.env["AGGREGATE_TYPE"] == "votes_1m"
|
||||
assert monthly_config.outputs[0].str == "votes_1m/2025-01-17"
|
||||
assert len(monthly_config.inputs) == len(COLORS)
|
||||
assert all("color_votes_1m/2025-01-17/" in inp.partition_ref.str for inp in monthly_config.inputs)
|
||||
|
||||
|
||||
def test_aggregate_color_votes_configure_multiple_same_type():
|
||||
"""Test AggregateColorVotes config method with multiple outputs of same type."""
|
||||
job = AggregateColorVotes()
|
||||
outputs = [
|
||||
DailyVotesPartition(data_date="2025-01-15"),
|
||||
DailyVotesPartition(data_date="2025-01-16")
|
||||
]
|
||||
|
||||
configs = job.config(outputs)
|
||||
|
||||
assert len(configs) == 2 # One config per output
|
||||
|
||||
for config in configs:
|
||||
assert config.env["AGGREGATE_TYPE"] == "daily_votes"
|
||||
assert len(config.inputs) == len(COLORS)
|
||||
|
||||
if config.env["DATA_DATE"] == "2025-01-15":
|
||||
assert config.outputs[0].str == "daily_votes/2025-01-15"
|
||||
assert all("daily_color_votes/2025-01-15/" in inp.partition_ref.str for inp in config.inputs)
|
||||
elif config.env["DATA_DATE"] == "2025-01-16":
|
||||
assert config.outputs[0].str == "daily_votes/2025-01-16"
|
||||
assert all("daily_color_votes/2025-01-16/" in inp.partition_ref.str for inp in config.inputs)
|
||||
else:
|
||||
assert False, f"Unexpected date: {config.env['DATA_DATE']}"
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import pytest
|
||||
raise SystemExit(pytest.main([__file__]))
|
||||
244
databuild/test/app/dsl/test/test_bazel_dsl_comparison.py
Normal file
244
databuild/test/app/dsl/test/test_bazel_dsl_comparison.py
Normal file
|
|
@ -0,0 +1,244 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Comparison test between Bazel and DSL implementations.
|
||||
|
||||
This test verifies that the DSL job configurations produce identical results
|
||||
to the equivalent bazel job configurations for the same partition references.
|
||||
"""
|
||||
|
||||
import unittest
|
||||
from databuild.proto import PartitionRef, JobConfigureResponse
|
||||
from databuild.test.app.dsl.graph import (
|
||||
IngestColorVotes,
|
||||
TrailingColorVotes,
|
||||
AggregateColorVotes,
|
||||
ColorVoteReportCalc
|
||||
)
|
||||
from databuild.test.app.dsl.partitions import (
|
||||
IngestedColorPartition,
|
||||
TrailingColorVotes1WPartition,
|
||||
TrailingColorVotes1MPartition,
|
||||
DailyVotesPartition,
|
||||
Votes1WPartition,
|
||||
Votes1MPartition,
|
||||
ColorVoteReportPartition
|
||||
)
|
||||
|
||||
# Import bazel job config functions
|
||||
from databuild.test.app.bazel.jobs.ingest_color_votes.config import configure as bazel_ingest_config
|
||||
from databuild.test.app.bazel.jobs.trailing_color_votes.config import configure as bazel_trailing_config
|
||||
from databuild.test.app.bazel.jobs.aggregate_color_votes.config import configure as bazel_aggregate_config
|
||||
from databuild.test.app.bazel.jobs.color_vote_report_calc.config import configure as bazel_report_config
|
||||
|
||||
|
||||
class BazelDSLComparisonTest(unittest.TestCase):
|
||||
"""Compare bazel and DSL job configurations to ensure they produce identical results."""
|
||||
|
||||
def _compare_job_configs(self, bazel_response, dsl_configs):
|
||||
"""Helper to compare JobConfigureResponse from bazel with list[JobConfig] from DSL."""
|
||||
self.assertIsInstance(bazel_response, JobConfigureResponse)
|
||||
self.assertIsInstance(dsl_configs, list)
|
||||
|
||||
bazel_configs = bazel_response.configs
|
||||
self.assertEqual(len(bazel_configs), len(dsl_configs),
|
||||
"Bazel and DSL should produce same number of configs")
|
||||
|
||||
# Sort both by a stable key for comparison
|
||||
def config_sort_key(config):
|
||||
outputs_str = ",".join(sorted(out.str for out in config.outputs))
|
||||
env_str = ",".join(f"{k}={v}" for k, v in sorted(config.env.items()))
|
||||
return f"{outputs_str}:{env_str}"
|
||||
|
||||
bazel_sorted = sorted(bazel_configs, key=config_sort_key)
|
||||
dsl_sorted = sorted(dsl_configs, key=config_sort_key)
|
||||
|
||||
for bazel_config, dsl_config in zip(bazel_sorted, dsl_sorted):
|
||||
# Compare outputs
|
||||
bazel_outputs = {out.str for out in bazel_config.outputs}
|
||||
dsl_outputs = {out.str for out in dsl_config.outputs}
|
||||
self.assertEqual(bazel_outputs, dsl_outputs, "Outputs should match")
|
||||
|
||||
# Compare inputs
|
||||
bazel_inputs = {(inp.partition_ref.str, inp.dep_type_code, inp.dep_type_name)
|
||||
for inp in bazel_config.inputs}
|
||||
dsl_inputs = {(inp.partition_ref.str, inp.dep_type_code, inp.dep_type_name)
|
||||
for inp in dsl_config.inputs}
|
||||
self.assertEqual(bazel_inputs, dsl_inputs, "Inputs should match")
|
||||
|
||||
# Compare args
|
||||
self.assertEqual(set(bazel_config.args), set(dsl_config.args), "Args should match")
|
||||
|
||||
# Compare env
|
||||
self.assertEqual(bazel_config.env, dsl_config.env, "Environment should match")
|
||||
|
||||
def test_ingest_color_votes_comparison(self):
|
||||
"""Compare IngestColorVotes bazel vs DSL configurations."""
|
||||
# Test single output
|
||||
partition_refs = [PartitionRef(str="daily_color_votes/2025-01-01/red")]
|
||||
bazel_response = bazel_ingest_config(partition_refs)
|
||||
|
||||
partitions = [IngestedColorPartition.deserialize(ref.str) for ref in partition_refs]
|
||||
dsl_job = IngestColorVotes()
|
||||
dsl_configs = dsl_job.config(partitions)
|
||||
|
||||
self._compare_job_configs(bazel_response, dsl_configs)
|
||||
|
||||
# Test multiple outputs
|
||||
partition_refs = [
|
||||
PartitionRef(str="daily_color_votes/2025-01-02/red"),
|
||||
PartitionRef(str="daily_color_votes/2025-01-02/blue")
|
||||
]
|
||||
bazel_response = bazel_ingest_config(partition_refs)
|
||||
|
||||
partitions = [IngestedColorPartition.deserialize(ref.str) for ref in partition_refs]
|
||||
dsl_configs = dsl_job.config(partitions)
|
||||
|
||||
self._compare_job_configs(bazel_response, dsl_configs)
|
||||
|
||||
def test_trailing_color_votes_comparison(self):
|
||||
"""Compare TrailingColorVotes bazel vs DSL configurations."""
|
||||
# Test weekly output
|
||||
partition_refs = [PartitionRef(str="color_votes_1w/2025-01-07/red")]
|
||||
bazel_response = bazel_trailing_config(partition_refs)
|
||||
|
||||
partitions = [TrailingColorVotes1WPartition.deserialize(ref.str) for ref in partition_refs]
|
||||
dsl_job = TrailingColorVotes()
|
||||
dsl_configs = dsl_job.config(partitions)
|
||||
|
||||
self._compare_job_configs(bazel_response, dsl_configs)
|
||||
|
||||
# Test monthly output
|
||||
partition_refs = [PartitionRef(str="color_votes_1m/2025-01-28/blue")]
|
||||
bazel_response = bazel_trailing_config(partition_refs)
|
||||
|
||||
partitions = [TrailingColorVotes1MPartition.deserialize(ref.str) for ref in partition_refs]
|
||||
dsl_configs = dsl_job.config(partitions)
|
||||
|
||||
self._compare_job_configs(bazel_response, dsl_configs)
|
||||
|
||||
# Test mixed weekly and monthly for same date/color
|
||||
partition_refs = [
|
||||
PartitionRef(str="color_votes_1w/2025-01-28/green"),
|
||||
PartitionRef(str="color_votes_1m/2025-01-28/green")
|
||||
]
|
||||
bazel_response = bazel_trailing_config(partition_refs)
|
||||
|
||||
partitions = [
|
||||
TrailingColorVotes1WPartition.deserialize(partition_refs[0].str),
|
||||
TrailingColorVotes1MPartition.deserialize(partition_refs[1].str)
|
||||
]
|
||||
dsl_configs = dsl_job.config(partitions)
|
||||
|
||||
self._compare_job_configs(bazel_response, dsl_configs)
|
||||
|
||||
def test_aggregate_color_votes_comparison(self):
|
||||
"""Compare AggregateColorVotes bazel vs DSL configurations."""
|
||||
# Test daily votes
|
||||
partition_refs = [PartitionRef(str="daily_votes/2025-01-15")]
|
||||
bazel_response = bazel_aggregate_config(partition_refs)
|
||||
|
||||
partitions = [DailyVotesPartition.deserialize(ref.str) for ref in partition_refs]
|
||||
dsl_job = AggregateColorVotes()
|
||||
dsl_configs = dsl_job.config(partitions)
|
||||
|
||||
self._compare_job_configs(bazel_response, dsl_configs)
|
||||
|
||||
# Test weekly votes
|
||||
partition_refs = [PartitionRef(str="votes_1w/2025-01-15")]
|
||||
bazel_response = bazel_aggregate_config(partition_refs)
|
||||
|
||||
partitions = [Votes1WPartition.deserialize(ref.str) for ref in partition_refs]
|
||||
dsl_configs = dsl_job.config(partitions)
|
||||
|
||||
self._compare_job_configs(bazel_response, dsl_configs)
|
||||
|
||||
# Test monthly votes
|
||||
partition_refs = [PartitionRef(str="votes_1m/2025-01-15")]
|
||||
bazel_response = bazel_aggregate_config(partition_refs)
|
||||
|
||||
partitions = [Votes1MPartition.deserialize(ref.str) for ref in partition_refs]
|
||||
dsl_configs = dsl_job.config(partitions)
|
||||
|
||||
self._compare_job_configs(bazel_response, dsl_configs)
|
||||
|
||||
# Test multiple different types
|
||||
partition_refs = [
|
||||
PartitionRef(str="daily_votes/2025-01-15"),
|
||||
PartitionRef(str="votes_1w/2025-01-16"),
|
||||
PartitionRef(str="votes_1m/2025-01-17")
|
||||
]
|
||||
bazel_response = bazel_aggregate_config(partition_refs)
|
||||
|
||||
partitions = [
|
||||
DailyVotesPartition.deserialize(partition_refs[0].str),
|
||||
Votes1WPartition.deserialize(partition_refs[1].str),
|
||||
Votes1MPartition.deserialize(partition_refs[2].str)
|
||||
]
|
||||
dsl_configs = dsl_job.config(partitions)
|
||||
|
||||
self._compare_job_configs(bazel_response, dsl_configs)
|
||||
|
||||
def test_color_vote_report_calc_comparison(self):
|
||||
"""Compare ColorVoteReportCalc bazel vs DSL configurations."""
|
||||
# Test single report
|
||||
partition_refs = [PartitionRef(str="color_vote_report/2025-01-15/red")]
|
||||
bazel_response = bazel_report_config(partition_refs)
|
||||
|
||||
partitions = [ColorVoteReportPartition.deserialize(ref.str) for ref in partition_refs]
|
||||
dsl_job = ColorVoteReportCalc()
|
||||
dsl_configs = dsl_job.config(partitions)
|
||||
|
||||
self._compare_job_configs(bazel_response, dsl_configs)
|
||||
|
||||
# Test multiple reports same date
|
||||
partition_refs = [
|
||||
PartitionRef(str="color_vote_report/2025-01-15/red"),
|
||||
PartitionRef(str="color_vote_report/2025-01-15/blue")
|
||||
]
|
||||
bazel_response = bazel_report_config(partition_refs)
|
||||
|
||||
partitions = [ColorVoteReportPartition.deserialize(ref.str) for ref in partition_refs]
|
||||
dsl_configs = dsl_job.config(partitions)
|
||||
|
||||
self._compare_job_configs(bazel_response, dsl_configs)
|
||||
|
||||
# Test multiple reports different dates
|
||||
partition_refs = [
|
||||
PartitionRef(str="color_vote_report/2025-01-15/red"),
|
||||
PartitionRef(str="color_vote_report/2025-01-16/red")
|
||||
]
|
||||
bazel_response = bazel_report_config(partition_refs)
|
||||
|
||||
partitions = [ColorVoteReportPartition.deserialize(ref.str) for ref in partition_refs]
|
||||
dsl_configs = dsl_job.config(partitions)
|
||||
|
||||
self._compare_job_configs(bazel_response, dsl_configs)
|
||||
|
||||
def test_partition_serialization_roundtrip(self):
|
||||
"""Test that DSL partition serialization/deserialization works correctly."""
|
||||
test_cases = [
|
||||
IngestedColorPartition(data_date="2025-01-15", color="red"),
|
||||
TrailingColorVotes1WPartition(data_date="2025-01-15", color="blue"),
|
||||
TrailingColorVotes1MPartition(data_date="2025-01-28", color="green"),
|
||||
DailyVotesPartition(data_date="2025-01-15"),
|
||||
Votes1WPartition(data_date="2025-01-15"),
|
||||
Votes1MPartition(data_date="2025-01-15"),
|
||||
ColorVoteReportPartition(data_date="2025-01-15", color="yellow")
|
||||
]
|
||||
|
||||
for partition in test_cases:
|
||||
with self.subTest(partition=partition):
|
||||
# Serialize then deserialize
|
||||
serialized = partition.serialize()
|
||||
deserialized = type(partition).deserialize(serialized)
|
||||
|
||||
# Should be equal
|
||||
self.assertEqual(partition, deserialized)
|
||||
|
||||
# Serializing again should give same result
|
||||
reserialized = deserialized.serialize()
|
||||
self.assertEqual(serialized, reserialized)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
204
databuild/test/app/dsl/test/test_color_vote_report_calc.py
Normal file
204
databuild/test/app/dsl/test/test_color_vote_report_calc.py
Normal file
|
|
@ -0,0 +1,204 @@
|
|||
from databuild.test.app.dsl.graph import ColorVoteReportCalc
|
||||
from databuild.test.app.dsl.partitions import (
|
||||
ColorVoteReportPartition,
|
||||
DailyVotesPartition,
|
||||
Votes1WPartition,
|
||||
Votes1MPartition,
|
||||
IngestedColorPartition,
|
||||
TrailingColorVotes1WPartition,
|
||||
TrailingColorVotes1MPartition
|
||||
)
|
||||
from databuild.proto import DepType
|
||||
|
||||
|
||||
def test_color_vote_report_calc_configure_single_output():
|
||||
"""Test ColorVoteReportCalc config method with single color report output."""
|
||||
job = ColorVoteReportCalc()
|
||||
outputs = [ColorVoteReportPartition(data_date="2025-01-15", color="red")]
|
||||
|
||||
configs = job.config(outputs)
|
||||
|
||||
assert len(configs) == 1
|
||||
config = configs[0]
|
||||
|
||||
# Check outputs
|
||||
assert len(config.outputs) == 1
|
||||
assert config.outputs[0].str == "color_vote_report/2025-01-15/red"
|
||||
|
||||
# Check args - should contain partition strings
|
||||
assert len(config.args) == 1
|
||||
assert config.args[0] == "color_vote_report/2025-01-15/red"
|
||||
|
||||
# Check inputs - should have aggregate inputs for the date and specific color inputs
|
||||
expected_inputs = {
|
||||
# Aggregate inputs for the date
|
||||
"daily_votes/2025-01-15",
|
||||
"votes_1w/2025-01-15",
|
||||
"votes_1m/2025-01-15",
|
||||
# Color-specific inputs
|
||||
"daily_color_votes/2025-01-15/red",
|
||||
"color_votes_1w/2025-01-15/red",
|
||||
"color_votes_1m/2025-01-15/red"
|
||||
}
|
||||
|
||||
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
|
||||
assert actual_inputs == expected_inputs
|
||||
|
||||
# All inputs should be MATERIALIZE type
|
||||
for input_dep in config.inputs:
|
||||
assert input_dep.dep_type_code == DepType.MATERIALIZE
|
||||
assert input_dep.dep_type_name == "materialize"
|
||||
|
||||
|
||||
def test_color_vote_report_calc_configure_multiple_colors_same_date():
|
||||
"""Test ColorVoteReportCalc config method with multiple colors for same date."""
|
||||
job = ColorVoteReportCalc()
|
||||
outputs = [
|
||||
ColorVoteReportPartition(data_date="2025-01-15", color="red"),
|
||||
ColorVoteReportPartition(data_date="2025-01-15", color="blue")
|
||||
]
|
||||
|
||||
configs = job.config(outputs)
|
||||
|
||||
assert len(configs) == 1 # Single config since all outputs go to same job
|
||||
config = configs[0]
|
||||
|
||||
# Check outputs
|
||||
assert len(config.outputs) == 2
|
||||
output_strs = {output.str for output in config.outputs}
|
||||
assert "color_vote_report/2025-01-15/red" in output_strs
|
||||
assert "color_vote_report/2025-01-15/blue" in output_strs
|
||||
|
||||
# Check args - should contain both partition strings
|
||||
assert len(config.args) == 2
|
||||
assert set(config.args) == {"color_vote_report/2025-01-15/red", "color_vote_report/2025-01-15/blue"}
|
||||
|
||||
# Check inputs - should have aggregate inputs for the date and color-specific inputs for both colors
|
||||
expected_inputs = {
|
||||
# Aggregate inputs for the date (only one set since same date)
|
||||
"daily_votes/2025-01-15",
|
||||
"votes_1w/2025-01-15",
|
||||
"votes_1m/2025-01-15",
|
||||
# Color-specific inputs for red
|
||||
"daily_color_votes/2025-01-15/red",
|
||||
"color_votes_1w/2025-01-15/red",
|
||||
"color_votes_1m/2025-01-15/red",
|
||||
# Color-specific inputs for blue
|
||||
"daily_color_votes/2025-01-15/blue",
|
||||
"color_votes_1w/2025-01-15/blue",
|
||||
"color_votes_1m/2025-01-15/blue"
|
||||
}
|
||||
|
||||
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
|
||||
assert actual_inputs == expected_inputs
|
||||
|
||||
|
||||
def test_color_vote_report_calc_configure_multiple_dates():
|
||||
"""Test ColorVoteReportCalc config method with reports for different dates."""
|
||||
job = ColorVoteReportCalc()
|
||||
outputs = [
|
||||
ColorVoteReportPartition(data_date="2025-01-15", color="red"),
|
||||
ColorVoteReportPartition(data_date="2025-01-16", color="red")
|
||||
]
|
||||
|
||||
configs = job.config(outputs)
|
||||
|
||||
assert len(configs) == 1 # Single config since all outputs go to same job
|
||||
config = configs[0]
|
||||
|
||||
# Check outputs
|
||||
assert len(config.outputs) == 2
|
||||
output_strs = {output.str for output in config.outputs}
|
||||
assert "color_vote_report/2025-01-15/red" in output_strs
|
||||
assert "color_vote_report/2025-01-16/red" in output_strs
|
||||
|
||||
# Check args
|
||||
assert len(config.args) == 2
|
||||
assert set(config.args) == {"color_vote_report/2025-01-15/red", "color_vote_report/2025-01-16/red"}
|
||||
|
||||
# Check inputs - should have aggregate inputs for both dates and color-specific inputs
|
||||
expected_inputs = {
|
||||
# Aggregate inputs for both dates
|
||||
"daily_votes/2025-01-15",
|
||||
"votes_1w/2025-01-15",
|
||||
"votes_1m/2025-01-15",
|
||||
"daily_votes/2025-01-16",
|
||||
"votes_1w/2025-01-16",
|
||||
"votes_1m/2025-01-16",
|
||||
# Color-specific inputs for red on both dates
|
||||
"daily_color_votes/2025-01-15/red",
|
||||
"color_votes_1w/2025-01-15/red",
|
||||
"color_votes_1m/2025-01-15/red",
|
||||
"daily_color_votes/2025-01-16/red",
|
||||
"color_votes_1w/2025-01-16/red",
|
||||
"color_votes_1m/2025-01-16/red"
|
||||
}
|
||||
|
||||
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
|
||||
assert actual_inputs == expected_inputs
|
||||
|
||||
|
||||
def test_color_vote_report_calc_configure_complex_scenario():
|
||||
"""Test ColorVoteReportCalc config method with complex multi-date, multi-color scenario."""
|
||||
job = ColorVoteReportCalc()
|
||||
outputs = [
|
||||
ColorVoteReportPartition(data_date="2025-01-15", color="red"),
|
||||
ColorVoteReportPartition(data_date="2025-01-15", color="blue"),
|
||||
ColorVoteReportPartition(data_date="2025-01-16", color="green"),
|
||||
ColorVoteReportPartition(data_date="2025-01-17", color="red")
|
||||
]
|
||||
|
||||
configs = job.config(outputs)
|
||||
|
||||
assert len(configs) == 1 # Single config since all outputs go to same job
|
||||
config = configs[0]
|
||||
|
||||
# Check outputs
|
||||
assert len(config.outputs) == 4
|
||||
expected_output_strs = {
|
||||
"color_vote_report/2025-01-15/red",
|
||||
"color_vote_report/2025-01-15/blue",
|
||||
"color_vote_report/2025-01-16/green",
|
||||
"color_vote_report/2025-01-17/red"
|
||||
}
|
||||
actual_output_strs = {output.str for output in config.outputs}
|
||||
assert actual_output_strs == expected_output_strs
|
||||
|
||||
# Check args
|
||||
assert len(config.args) == 4
|
||||
assert set(config.args) == expected_output_strs
|
||||
|
||||
# Check inputs - should have aggregate inputs for all unique dates and color-specific inputs
|
||||
expected_inputs = {
|
||||
# Aggregate inputs for all dates
|
||||
"daily_votes/2025-01-15",
|
||||
"votes_1w/2025-01-15",
|
||||
"votes_1m/2025-01-15",
|
||||
"daily_votes/2025-01-16",
|
||||
"votes_1w/2025-01-16",
|
||||
"votes_1m/2025-01-16",
|
||||
"daily_votes/2025-01-17",
|
||||
"votes_1w/2025-01-17",
|
||||
"votes_1m/2025-01-17",
|
||||
# Color-specific inputs
|
||||
"daily_color_votes/2025-01-15/red",
|
||||
"color_votes_1w/2025-01-15/red",
|
||||
"color_votes_1m/2025-01-15/red",
|
||||
"daily_color_votes/2025-01-15/blue",
|
||||
"color_votes_1w/2025-01-15/blue",
|
||||
"color_votes_1m/2025-01-15/blue",
|
||||
"daily_color_votes/2025-01-16/green",
|
||||
"color_votes_1w/2025-01-16/green",
|
||||
"color_votes_1m/2025-01-16/green",
|
||||
"daily_color_votes/2025-01-17/red",
|
||||
"color_votes_1w/2025-01-17/red",
|
||||
"color_votes_1m/2025-01-17/red"
|
||||
}
|
||||
|
||||
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
|
||||
assert actual_inputs == expected_inputs
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import pytest
|
||||
raise SystemExit(pytest.main([__file__]))
|
||||
157
databuild/test/app/dsl/test/test_graph_analysis.py
Normal file
157
databuild/test/app/dsl/test/test_graph_analysis.py
Normal file
|
|
@ -0,0 +1,157 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Integration test for the DSL graph analysis.
|
||||
|
||||
This test verifies that when we request color vote reports via the DSL graph,
|
||||
the analyzer correctly identifies all upstream dependencies and jobs required.
|
||||
|
||||
NOTE: This test assumes the DSL graph will have an analyze() method similar to
|
||||
the bazel graph analyzer. This functionality is not yet implemented but these
|
||||
tests will validate it once available.
|
||||
"""
|
||||
|
||||
import unittest
|
||||
from databuild.test.app.dsl.graph import graph
|
||||
from databuild.test.app.dsl.partitions import ColorVoteReportPartition
|
||||
|
||||
|
||||
class DSLGraphAnalysisTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Ensure we have the graph instance
|
||||
self.graph = graph
|
||||
|
||||
def test_single_color_report_dependencies(self):
|
||||
"""Test dependencies for a single color vote report via DSL."""
|
||||
partition_refs = ["color_vote_report/2024-01-15/red"]
|
||||
|
||||
# TODO: Once DSL graph analysis is implemented, this should call:
|
||||
# result = self.graph.analyze(partition_refs)
|
||||
# self.assertIn('nodes', result)
|
||||
|
||||
# For now, we can at least verify the graph structure
|
||||
self.assertIsNotNone(self.graph)
|
||||
self.assertGreater(len(self.graph.lookup), 0)
|
||||
|
||||
# Verify we can create the partition and find its producer
|
||||
partition = ColorVoteReportPartition(data_date="2024-01-15", color="red")
|
||||
producer_job_class = self.graph.lookup.get(ColorVoteReportPartition)
|
||||
self.assertIsNotNone(producer_job_class, "ColorVoteReportPartition should have a registered producer")
|
||||
|
||||
# Test that we can call the job's config method
|
||||
job_instance = producer_job_class()
|
||||
configs = job_instance.config([partition])
|
||||
self.assertIsInstance(configs, list)
|
||||
self.assertGreater(len(configs), 0)
|
||||
|
||||
def test_multiple_color_reports_same_date(self):
|
||||
"""Test dependencies when requesting multiple colors for the same date via DSL."""
|
||||
partition_refs = [
|
||||
"color_vote_report/2024-01-15/red",
|
||||
"color_vote_report/2024-01-15/blue"
|
||||
]
|
||||
|
||||
# TODO: Once DSL graph analysis is implemented, this should call:
|
||||
# result = self.graph.analyze(partition_refs)
|
||||
# self.assertIn('nodes', result)
|
||||
|
||||
# For now, verify we can handle multiple partitions
|
||||
partitions = [
|
||||
ColorVoteReportPartition(data_date="2024-01-15", color="red"),
|
||||
ColorVoteReportPartition(data_date="2024-01-15", color="blue")
|
||||
]
|
||||
|
||||
producer_job_class = self.graph.lookup.get(ColorVoteReportPartition)
|
||||
self.assertIsNotNone(producer_job_class)
|
||||
|
||||
job_instance = producer_job_class()
|
||||
configs = job_instance.config(partitions)
|
||||
self.assertIsInstance(configs, list)
|
||||
self.assertGreater(len(configs), 0)
|
||||
|
||||
def test_multiple_dates_dependencies(self):
|
||||
"""Test dependencies when requesting reports for different dates via DSL."""
|
||||
partition_refs = [
|
||||
"color_vote_report/2024-01-15/red",
|
||||
"color_vote_report/2024-01-16/red"
|
||||
]
|
||||
|
||||
# TODO: Once DSL graph analysis is implemented, this should call:
|
||||
# result = self.graph.analyze(partition_refs)
|
||||
# self.assertIn('nodes', result)
|
||||
|
||||
# For now, verify we can handle different dates
|
||||
partitions = [
|
||||
ColorVoteReportPartition(data_date="2024-01-15", color="red"),
|
||||
ColorVoteReportPartition(data_date="2024-01-16", color="red")
|
||||
]
|
||||
|
||||
producer_job_class = self.graph.lookup.get(ColorVoteReportPartition)
|
||||
self.assertIsNotNone(producer_job_class)
|
||||
|
||||
job_instance = producer_job_class()
|
||||
configs = job_instance.config(partitions)
|
||||
self.assertIsInstance(configs, list)
|
||||
self.assertGreater(len(configs), 0)
|
||||
|
||||
def test_graph_completeness(self):
|
||||
"""Test that the DSL graph has all expected partition types registered."""
|
||||
from databuild.test.app.dsl.partitions import (
|
||||
IngestedColorPartition,
|
||||
TrailingColorVotes1WPartition,
|
||||
TrailingColorVotes1MPartition,
|
||||
DailyVotesPartition,
|
||||
Votes1WPartition,
|
||||
Votes1MPartition,
|
||||
ColorVoteReportPartition
|
||||
)
|
||||
|
||||
expected_partitions = {
|
||||
IngestedColorPartition,
|
||||
TrailingColorVotes1WPartition,
|
||||
TrailingColorVotes1MPartition,
|
||||
DailyVotesPartition,
|
||||
Votes1WPartition,
|
||||
Votes1MPartition,
|
||||
ColorVoteReportPartition
|
||||
}
|
||||
|
||||
registered_partitions = set(self.graph.lookup.keys())
|
||||
self.assertEqual(registered_partitions, expected_partitions,
|
||||
"All partition types should be registered in the graph")
|
||||
|
||||
def test_partition_lookup_functionality(self):
|
||||
"""Test that partition lookup works correctly for all partition types."""
|
||||
from databuild.test.app.dsl.partitions import (
|
||||
IngestedColorPartition,
|
||||
TrailingColorVotes1WPartition,
|
||||
TrailingColorVotes1MPartition,
|
||||
DailyVotesPartition,
|
||||
Votes1WPartition,
|
||||
Votes1MPartition,
|
||||
ColorVoteReportPartition
|
||||
)
|
||||
|
||||
# Test each partition type can be looked up and has a valid job
|
||||
test_cases = [
|
||||
(IngestedColorPartition, IngestedColorPartition(data_date="2024-01-15", color="red")),
|
||||
(TrailingColorVotes1WPartition, TrailingColorVotes1WPartition(data_date="2024-01-15", color="red")),
|
||||
(TrailingColorVotes1MPartition, TrailingColorVotes1MPartition(data_date="2024-01-15", color="red")),
|
||||
(DailyVotesPartition, DailyVotesPartition(data_date="2024-01-15")),
|
||||
(Votes1WPartition, Votes1WPartition(data_date="2024-01-15")),
|
||||
(Votes1MPartition, Votes1MPartition(data_date="2024-01-15")),
|
||||
(ColorVoteReportPartition, ColorVoteReportPartition(data_date="2024-01-15", color="red"))
|
||||
]
|
||||
|
||||
for partition_type, partition_instance in test_cases:
|
||||
with self.subTest(partition_type=partition_type.__name__):
|
||||
job_class = self.graph.lookup.get(partition_type)
|
||||
self.assertIsNotNone(job_class, f"Job class for {partition_type.__name__} should be registered")
|
||||
|
||||
# Verify we can instantiate the job and call config
|
||||
job_instance = job_class()
|
||||
configs = job_instance.config([partition_instance])
|
||||
self.assertIsInstance(configs, list, f"Config method for {partition_type.__name__} should return a list")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
56
databuild/test/app/dsl/test/test_ingest_color_votes.py
Normal file
56
databuild/test/app/dsl/test/test_ingest_color_votes.py
Normal file
|
|
@ -0,0 +1,56 @@
|
|||
from databuild.test.app.dsl.graph import IngestColorVotes
|
||||
from databuild.test.app.dsl.partitions import IngestedColorPartition
|
||||
from databuild.proto import PartitionRef
|
||||
|
||||
|
||||
def test_ingest_color_votes_configure_single():
|
||||
"""Test IngestColorVotes config method with single output."""
|
||||
job = IngestColorVotes()
|
||||
outputs = [IngestedColorPartition(data_date="2025-01-01", color="red")]
|
||||
|
||||
configs = job.config(outputs)
|
||||
|
||||
assert len(configs) == 1
|
||||
config = configs[0]
|
||||
assert len(config.outputs) == 1
|
||||
assert config.outputs[0].str == "daily_color_votes/2025-01-01/red"
|
||||
assert config.env["COLOR"] == "red"
|
||||
assert config.env["DATA_DATE"] == "2025-01-01"
|
||||
assert len(config.inputs) == 0
|
||||
assert len(config.args) == 0
|
||||
|
||||
|
||||
def test_ingest_color_votes_configure_multiple():
|
||||
"""Test IngestColorVotes config method with multiple outputs."""
|
||||
job = IngestColorVotes()
|
||||
outputs = [
|
||||
IngestedColorPartition(data_date="2025-01-02", color="red"),
|
||||
IngestedColorPartition(data_date="2025-01-02", color="blue"),
|
||||
]
|
||||
|
||||
configs = job.config(outputs)
|
||||
|
||||
assert len(configs) == 2
|
||||
|
||||
# First config
|
||||
config1 = configs[0]
|
||||
assert len(config1.outputs) == 1
|
||||
assert config1.outputs[0].str == "daily_color_votes/2025-01-02/red"
|
||||
assert config1.env["COLOR"] == "red"
|
||||
assert config1.env["DATA_DATE"] == "2025-01-02"
|
||||
assert len(config1.inputs) == 0
|
||||
assert len(config1.args) == 0
|
||||
|
||||
# Second config
|
||||
config2 = configs[1]
|
||||
assert len(config2.outputs) == 1
|
||||
assert config2.outputs[0].str == "daily_color_votes/2025-01-02/blue"
|
||||
assert config2.env["COLOR"] == "blue"
|
||||
assert config2.env["DATA_DATE"] == "2025-01-02"
|
||||
assert len(config2.inputs) == 0
|
||||
assert len(config2.args) == 0
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import pytest
|
||||
raise SystemExit(pytest.main([__file__]))
|
||||
135
databuild/test/app/dsl/test/test_trailing_color_votes.py
Normal file
135
databuild/test/app/dsl/test/test_trailing_color_votes.py
Normal file
|
|
@ -0,0 +1,135 @@
|
|||
from databuild.test.app.dsl.graph import TrailingColorVotes
|
||||
from databuild.test.app.dsl.partitions import (
|
||||
TrailingColorVotes1WPartition,
|
||||
TrailingColorVotes1MPartition,
|
||||
IngestedColorPartition
|
||||
)
|
||||
from databuild.proto import DepType
|
||||
|
||||
|
||||
def test_trailing_color_votes_configure_weekly_only():
|
||||
"""Test TrailingColorVotes config method with weekly output only."""
|
||||
job = TrailingColorVotes()
|
||||
outputs = [TrailingColorVotes1WPartition(data_date="2025-01-07", color="red")]
|
||||
|
||||
configs = job.config(outputs)
|
||||
|
||||
assert len(configs) == 1
|
||||
config = configs[0]
|
||||
assert len(config.outputs) == 1
|
||||
assert config.outputs[0].str == "color_votes_1w/2025-01-07/red"
|
||||
assert config.env["COLOR"] == "red"
|
||||
assert config.env["DATA_DATE"] == "2025-01-07"
|
||||
assert config.env["WEEKLY"] == "true"
|
||||
assert config.env["MONTHLY"] == "false"
|
||||
|
||||
# Should have 7 days of inputs
|
||||
assert len(config.inputs) == 7
|
||||
expected_dates = ["2025-01-07", "2025-01-06", "2025-01-05", "2025-01-04",
|
||||
"2025-01-03", "2025-01-02", "2025-01-01"]
|
||||
for i, input_dep in enumerate(config.inputs):
|
||||
assert input_dep.dep_type_code == DepType.MATERIALIZE
|
||||
assert input_dep.dep_type_name == "materialize"
|
||||
assert input_dep.partition_ref.str == f"daily_color_votes/{expected_dates[i]}/red"
|
||||
|
||||
|
||||
def test_trailing_color_votes_configure_monthly_only():
|
||||
"""Test TrailingColorVotes config method with monthly output only."""
|
||||
job = TrailingColorVotes()
|
||||
outputs = [TrailingColorVotes1MPartition(data_date="2025-01-28", color="blue")]
|
||||
|
||||
configs = job.config(outputs)
|
||||
|
||||
assert len(configs) == 1
|
||||
config = configs[0]
|
||||
assert len(config.outputs) == 1
|
||||
assert config.outputs[0].str == "color_votes_1m/2025-01-28/blue"
|
||||
assert config.env["COLOR"] == "blue"
|
||||
assert config.env["DATA_DATE"] == "2025-01-28"
|
||||
assert config.env["WEEKLY"] == "false"
|
||||
assert config.env["MONTHLY"] == "true"
|
||||
|
||||
# Should have 28 days of inputs
|
||||
assert len(config.inputs) == 28
|
||||
# Check first and last input dates
|
||||
assert config.inputs[0].partition_ref.str == "daily_color_votes/2025-01-28/blue"
|
||||
assert config.inputs[27].partition_ref.str == "daily_color_votes/2025-01-01/blue"
|
||||
|
||||
|
||||
def test_trailing_color_votes_configure_both_weekly_and_monthly():
|
||||
"""Test TrailingColorVotes config method with both weekly and monthly outputs for same date/color."""
|
||||
job = TrailingColorVotes()
|
||||
outputs = [
|
||||
TrailingColorVotes1WPartition(data_date="2025-01-28", color="green"),
|
||||
TrailingColorVotes1MPartition(data_date="2025-01-28", color="green")
|
||||
]
|
||||
|
||||
configs = job.config(outputs)
|
||||
|
||||
assert len(configs) == 1 # Should group by (data_date, color)
|
||||
config = configs[0]
|
||||
assert len(config.outputs) == 2
|
||||
|
||||
# Check outputs
|
||||
output_strs = {output.str for output in config.outputs}
|
||||
assert "color_votes_1w/2025-01-28/green" in output_strs
|
||||
assert "color_votes_1m/2025-01-28/green" in output_strs
|
||||
|
||||
assert config.env["COLOR"] == "green"
|
||||
assert config.env["DATA_DATE"] == "2025-01-28"
|
||||
assert config.env["WEEKLY"] == "true"
|
||||
assert config.env["MONTHLY"] == "true"
|
||||
|
||||
# Should have 28 days of inputs (max window)
|
||||
assert len(config.inputs) == 28
|
||||
|
||||
|
||||
def test_trailing_color_votes_configure_multiple_groups():
|
||||
"""Test TrailingColorVotes config method with outputs that require separate configs."""
|
||||
job = TrailingColorVotes()
|
||||
outputs = [
|
||||
TrailingColorVotes1WPartition(data_date="2025-01-07", color="red"),
|
||||
TrailingColorVotes1WPartition(data_date="2025-01-07", color="blue"),
|
||||
TrailingColorVotes1MPartition(data_date="2025-01-08", color="red")
|
||||
]
|
||||
|
||||
configs = job.config(outputs)
|
||||
|
||||
assert len(configs) == 3 # Three different (data_date, color) combinations
|
||||
|
||||
# Find configs by their characteristics
|
||||
red_7th_config = None
|
||||
blue_7th_config = None
|
||||
red_8th_config = None
|
||||
|
||||
for config in configs:
|
||||
if config.env["DATA_DATE"] == "2025-01-07" and config.env["COLOR"] == "red":
|
||||
red_7th_config = config
|
||||
elif config.env["DATA_DATE"] == "2025-01-07" and config.env["COLOR"] == "blue":
|
||||
blue_7th_config = config
|
||||
elif config.env["DATA_DATE"] == "2025-01-08" and config.env["COLOR"] == "red":
|
||||
red_8th_config = config
|
||||
|
||||
assert red_7th_config is not None
|
||||
assert blue_7th_config is not None
|
||||
assert red_8th_config is not None
|
||||
|
||||
# Check red 7th (weekly only)
|
||||
assert red_7th_config.env["WEEKLY"] == "true"
|
||||
assert red_7th_config.env["MONTHLY"] == "false"
|
||||
assert len(red_7th_config.inputs) == 7
|
||||
|
||||
# Check blue 7th (weekly only)
|
||||
assert blue_7th_config.env["WEEKLY"] == "true"
|
||||
assert blue_7th_config.env["MONTHLY"] == "false"
|
||||
assert len(blue_7th_config.inputs) == 7
|
||||
|
||||
# Check red 8th (monthly only)
|
||||
assert red_8th_config.env["WEEKLY"] == "false"
|
||||
assert red_8th_config.env["MONTHLY"] == "true"
|
||||
assert len(red_8th_config.inputs) == 28
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import pytest
|
||||
raise SystemExit(pytest.main([__file__]))
|
||||
26
databuild/test/app/jobs/aggregate_color_votes/execute.py
Normal file
26
databuild/test/app/jobs/aggregate_color_votes/execute.py
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
from databuild.test.app import dal
|
||||
from databuild.proto import PartitionRef
|
||||
from databuild.test.app.colors import COLORS
|
||||
|
||||
def execute(data_date: str, aggregate_type: str):
|
||||
# Determine input prefix based on aggregate type
|
||||
if aggregate_type == "daily_votes":
|
||||
input_prefix = "daily_color_votes"
|
||||
elif aggregate_type == "votes_1w":
|
||||
input_prefix = "color_votes_1w"
|
||||
elif aggregate_type == "votes_1m":
|
||||
input_prefix = "color_votes_1m"
|
||||
else:
|
||||
raise ValueError(f"Unknown aggregate type: {aggregate_type}")
|
||||
|
||||
# Read data from all colors for this date
|
||||
input_refs = []
|
||||
for color in COLORS:
|
||||
input_refs.append(PartitionRef(str=f"{input_prefix}/{data_date}/{color}"))
|
||||
|
||||
data = dal.read(*input_refs)
|
||||
total_votes = sum(record["votes"] for record in data)
|
||||
|
||||
# Write aggregated result
|
||||
output_ref = PartitionRef(str=f"{aggregate_type}/{data_date}")
|
||||
dal.write(output_ref, [{"data_date": data_date, "votes": total_votes}])
|
||||
51
databuild/test/app/jobs/color_vote_report_calc/execute.py
Normal file
51
databuild/test/app/jobs/color_vote_report_calc/execute.py
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
from databuild.test.app import dal
|
||||
from databuild.proto import PartitionRef
|
||||
|
||||
def execute(output_partition_strs: list[str]):
|
||||
# Parse requested outputs
|
||||
outputs = [PartitionRef(str=ref_str) for ref_str in output_partition_strs]
|
||||
|
||||
for output in outputs:
|
||||
parts = output.str.split("/")
|
||||
data_date, color = parts[1], parts[2]
|
||||
|
||||
# Read total votes for this date - fail if missing
|
||||
daily_total = dal.read(PartitionRef(str=f"daily_votes/{data_date}"), empty_ok=False)
|
||||
weekly_total = dal.read(PartitionRef(str=f"votes_1w/{data_date}"), empty_ok=False)
|
||||
monthly_total = dal.read(PartitionRef(str=f"votes_1m/{data_date}"), empty_ok=False)
|
||||
|
||||
# Read color-specific votes for this date/color - fail if missing
|
||||
daily_color = dal.read(PartitionRef(str=f"daily_color_votes/{data_date}/{color}"), empty_ok=False)
|
||||
weekly_color = dal.read(PartitionRef(str=f"color_votes_1w/{data_date}/{color}"), empty_ok=False)
|
||||
monthly_color = dal.read(PartitionRef(str=f"color_votes_1m/{data_date}/{color}"), empty_ok=False)
|
||||
|
||||
# Extract vote counts
|
||||
daily_total_votes = daily_total[0]["votes"]
|
||||
weekly_total_votes = weekly_total[0]["votes"]
|
||||
monthly_total_votes = monthly_total[0]["votes"]
|
||||
|
||||
daily_color_votes = daily_color[0]["votes"]
|
||||
weekly_color_votes = weekly_color[0]["votes"]
|
||||
monthly_color_votes = monthly_color[0]["votes"]
|
||||
|
||||
# Calculate percentages
|
||||
daily_percent = (daily_color_votes / daily_total_votes * 100) if daily_total_votes > 0 else 0
|
||||
weekly_percent = (weekly_color_votes / weekly_total_votes * 100) if weekly_total_votes > 0 else 0
|
||||
monthly_percent = (monthly_color_votes / monthly_total_votes * 100) if monthly_total_votes > 0 else 0
|
||||
|
||||
# Write report
|
||||
report_data = [{
|
||||
"color": color,
|
||||
"data_date": data_date,
|
||||
"daily_total_votes": daily_total_votes,
|
||||
"weekly_total_votes": weekly_total_votes,
|
||||
"monthly_total_votes": monthly_total_votes,
|
||||
"daily_color_votes": daily_color_votes,
|
||||
"weekly_color_votes": weekly_color_votes,
|
||||
"monthly_color_votes": monthly_color_votes,
|
||||
"daily_percent": daily_percent,
|
||||
"weekly_percent": weekly_percent,
|
||||
"monthly_percent": monthly_percent
|
||||
}]
|
||||
|
||||
dal.write(output, report_data)
|
||||
|
|
@ -1,34 +1,8 @@
|
|||
from databuild.test.app.jobs.ingest_color_votes.config import configure
|
||||
from databuild.test.app.jobs.ingest_color_votes.execute import execute
|
||||
from databuild.test.app import dal
|
||||
from databuild.proto import PartitionRef
|
||||
|
||||
|
||||
def test_ingest_color_votes_configure():
|
||||
refs_single = [PartitionRef(str="daily_color_votes/2025-01-01/red")]
|
||||
config_single = configure(refs_single)
|
||||
assert len(config_single.configs) == 1
|
||||
assert config_single.configs[0].outputs[0].str == "daily_color_votes/2025-01-01/red"
|
||||
assert config_single.configs[0].env["COLOR"] == "red"
|
||||
assert config_single.configs[0].env["DATA_DATE"] == "2025-01-01"
|
||||
|
||||
refs_multiple = [
|
||||
PartitionRef(str="daily_color_votes/2025-01-02/red"),
|
||||
PartitionRef(str="daily_color_votes/2025-01-02/blue"),
|
||||
]
|
||||
|
||||
config_multiple = configure(refs_multiple)
|
||||
assert len(config_multiple.configs) == 2
|
||||
assert len(config_multiple.configs[0].outputs) == 1
|
||||
assert config_multiple.configs[0].outputs[0].str == "daily_color_votes/2025-01-02/red"
|
||||
assert config_multiple.configs[0].env["COLOR"] == "red"
|
||||
assert config_multiple.configs[0].env["DATA_DATE"] == "2025-01-02"
|
||||
assert len(config_multiple.configs[1].outputs) == 1
|
||||
assert config_multiple.configs[1].outputs[0].str == "daily_color_votes/2025-01-02/blue"
|
||||
assert config_multiple.configs[1].env["COLOR"] == "blue"
|
||||
assert config_multiple.configs[1].env["DATA_DATE"] == "2025-01-02"
|
||||
|
||||
|
||||
def test_ingest_color_votes():
|
||||
execute("2025-01-01", "red")
|
||||
results = dal.read(PartitionRef(str="daily_color_votes/2025-01-01/red"))
|
||||
|
|
|
|||
28
databuild/test/app/jobs/trailing_color_votes/execute.py
Normal file
28
databuild/test/app/jobs/trailing_color_votes/execute.py
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
from databuild.test.app import dal
|
||||
from databuild.proto import PartitionRef
|
||||
from datetime import date, timedelta
|
||||
import os
|
||||
|
||||
def execute(data_date: str, color: str):
|
||||
output_date = date.fromisoformat(data_date)
|
||||
weekly = os.environ.get("WEEKLY", "false").lower() == "true"
|
||||
monthly = os.environ.get("MONTHLY", "false").lower() == "true"
|
||||
|
||||
def calculate_and_write(window_days: int, output_prefix: str):
|
||||
# Read trailing data and sum votes
|
||||
input_refs = []
|
||||
for i in range(window_days):
|
||||
input_date = output_date - timedelta(days=i)
|
||||
input_refs.append(PartitionRef(str=f"daily_color_votes/{input_date.isoformat()}/{color}"))
|
||||
|
||||
data = dal.read(*input_refs)
|
||||
total_votes = sum(record["votes"] for record in data)
|
||||
|
||||
output_ref = PartitionRef(str=f"{output_prefix}/{data_date}/{color}")
|
||||
dal.write(output_ref, [{"color": color, "data_date": data_date, "votes": total_votes}])
|
||||
|
||||
if weekly:
|
||||
calculate_and_write(7, "color_votes_1w")
|
||||
|
||||
if monthly:
|
||||
calculate_and_write(28, "color_votes_1m")
|
||||
|
|
@ -25,29 +25,6 @@ pip.parse(
|
|||
)
|
||||
use_repo(pip, "pypi")
|
||||
|
||||
# Rules OCI - necessary for producing a docker container
|
||||
bazel_dep(name = "rules_oci", version = "2.2.6")
|
||||
# For testing, we also recommend https://registry.bazel.build/modules/container_structure_test
|
||||
|
||||
oci = use_extension("@rules_oci//oci:extensions.bzl", "oci")
|
||||
|
||||
# Declare external images you need to pull, for example:
|
||||
oci.pull(
|
||||
name = "debian",
|
||||
image = "docker.io/library/python",
|
||||
platforms = [
|
||||
"linux/arm64/v8",
|
||||
"linux/amd64",
|
||||
],
|
||||
# 'latest' is not reproducible, but it's convenient.
|
||||
# During the build we print a WARNING message that includes recommended 'digest' and 'platforms'
|
||||
# values which you can use here in place of 'tag' to pin for reproducibility.
|
||||
tag = "3.12-bookworm",
|
||||
)
|
||||
|
||||
# For each oci.pull call, repeat the "name" here to expose them as dependencies.
|
||||
use_repo(oci, "debian", "debian_linux_amd64", "debian_linux_arm64_v8")
|
||||
|
||||
# Platforms for specifying linux/arm
|
||||
bazel_dep(name = "platforms", version = "0.0.11")
|
||||
|
||||
|
|
|
|||
File diff suppressed because one or more lines are too long
292
plans/dsl-graph-generation.md
Normal file
292
plans/dsl-graph-generation.md
Normal file
|
|
@ -0,0 +1,292 @@
|
|||
# DSL Graph Generation: Bazel Module Generation from Python DSL
|
||||
|
||||
## Motivation & High-Level Goals
|
||||
|
||||
### Problem Statement
|
||||
DataBuild's Python DSL provides an ergonomic interface for defining data processing graphs, but currently lacks a deployment path. Users can define jobs and graphs using the DSL, but cannot easily package and deploy them as complete, hermetic applications. This limits the DSL's utility as a production-ready interface.
|
||||
|
||||
### Strategic Goals
|
||||
1. **Seamless Deployment**: Enable DSL-defined graphs to be built and deployed as complete bazel modules
|
||||
2. **Hermetic Packaging**: Generate self-contained modules with all dependencies resolved
|
||||
3. **Interface Consistency**: Maintain CLI/Service interchangeability principle across generated modules
|
||||
4. **Production Readiness**: Support container deployment and external dependency management
|
||||
|
||||
### Success Criteria
|
||||
- DSL graphs can be compiled to standalone bazel modules (`@my_generated_graph//...`)
|
||||
- Generated modules support the full databuild interface (analyze, build, service, container images)
|
||||
- External repositories can depend on databuild core and generate working applications
|
||||
- End-to-end deployment pipeline from DSL definition to running containers
|
||||
|
||||
## Required Reading
|
||||
|
||||
### Core Design Documents
|
||||
- [`DESIGN.md`](../DESIGN.md) - Overall databuild architecture and principles
|
||||
- [`design/core-build.md`](../design/core-build.md) - Job and graph execution semantics
|
||||
- [`design/graph-specification.md`](../design/graph-specification.md) - DSL interfaces and patterns
|
||||
- [`design/service.md`](../design/service.md) - Service interface requirements
|
||||
- [`design/deploy-strategies.md`](../design/deploy-strategies.md) - Deployment patterns
|
||||
|
||||
### Key Source Files
|
||||
- [`databuild/dsl/python/dsl.py`](../databuild/dsl/python/dsl.py) - Current DSL implementation
|
||||
- [`databuild/test/app/dsl/graph.py`](../databuild/test/app/dsl/graph.py) - Reference DSL usage
|
||||
- [`databuild/rules.bzl`](../databuild/rules.bzl) - Bazel rules for jobs and graphs
|
||||
- [`databuild/databuild.proto`](../databuild/databuild.proto) - Core interfaces
|
||||
|
||||
### Understanding Prerequisites
|
||||
1. **Job Architecture**: Jobs have `.cfg`, `.exec`, and main targets with subcommand pattern
|
||||
2. **Graph Structure**: Graphs require job lookup, analyze, build, and service variants
|
||||
3. **Bazel Modules**: External repos use `@workspace//...` references for generated content
|
||||
4. **CLI/Service Consistency**: Both interfaces must produce identical artifacts and behaviors
|
||||
|
||||
## Implementation Plan
|
||||
|
||||
### Phase 1: Basic Generation Infrastructure
|
||||
**Goal**: Establish foundation for generating bazel modules from DSL definitions
|
||||
|
||||
#### Deliverables
|
||||
- Extend `DataBuildGraph.generate_bazel_module()` method
|
||||
- Generate minimal `MODULE.bazel` with databuild core dependency
|
||||
- Generate `BUILD.bazel` with job and graph target stubs
|
||||
- Basic workspace creation and file writing utilities
|
||||
|
||||
#### Implementation Tasks
|
||||
1. Add `generate_bazel_module(workspace_name: str, output_dir: str)` to `DataBuildGraph`
|
||||
2. Create template system for `MODULE.bazel` and `BUILD.bazel` generation
|
||||
3. Implement file system utilities for creating workspace structure
|
||||
4. Add basic validation for DSL graph completeness
|
||||
|
||||
#### Tests & Verification
|
||||
```bash
|
||||
# Test: Basic generation succeeds
|
||||
python -c "
|
||||
from databuild.test.app.dsl.graph import graph
|
||||
graph.generate_bazel_module('test_graph', '/tmp/generated')
|
||||
"
|
||||
|
||||
# Test: Generated files are valid
|
||||
cd /tmp/generated
|
||||
bazel build //... # Should succeed without errors
|
||||
|
||||
# Test: Module can be referenced externally
|
||||
# In separate workspace:
|
||||
# bazel build @test_graph//...
|
||||
```
|
||||
|
||||
#### Success Criteria
|
||||
- Generated `MODULE.bazel` has correct databuild dependency
|
||||
- Generated `BUILD.bazel` is syntactically valid
|
||||
- External workspace can reference `@generated_graph//...` targets
|
||||
- No compilation errors in generated bazel files
|
||||
|
||||
---
|
||||
|
||||
### Phase 2: Job Binary Generation
|
||||
**Goal**: Convert DSL job classes into executable databuild job targets
|
||||
|
||||
#### Deliverables
|
||||
- Auto-generate job binary Python files with config/exec subcommand handling
|
||||
- Create `databuild_job` targets for each DSL job class
|
||||
- Implement job lookup binary generation
|
||||
- Wire partition pattern matching to job target resolution
|
||||
|
||||
#### Implementation Tasks
|
||||
1. Create job binary template with subcommand dispatching:
|
||||
```python
|
||||
# Generated job_binary.py template
|
||||
if sys.argv[1] == "config":
|
||||
job_instance = MyDSLJob()
|
||||
config = job_instance.config(parse_outputs(sys.argv[2:]))
|
||||
print(json.dumps(config))
|
||||
elif sys.argv[1] == "exec":
|
||||
config = json.loads(sys.stdin.read())
|
||||
job_instance.exec(config)
|
||||
```
|
||||
|
||||
2. Generate job lookup binary from DSL job registrations:
|
||||
```python
|
||||
# Generated lookup.py
|
||||
def lookup_job_for_partition(partition_ref: str) -> str:
|
||||
for pattern, job_target in JOB_MAPPINGS.items():
|
||||
if pattern.match(partition_ref):
|
||||
return job_target
|
||||
raise ValueError(f"No job found for: {partition_ref}")
|
||||
```
|
||||
|
||||
3. Create `databuild_job` targets in generated `BUILD.bazel`
|
||||
4. Handle DSL job dependencies and imports in generated files
|
||||
|
||||
#### Tests & Verification
|
||||
```bash
|
||||
# Test: Job config execution
|
||||
bazel run @test_graph//:ingest_color_votes.cfg -- \
|
||||
"daily_color_votes/2024-01-01/red"
|
||||
# Should output valid JobConfig JSON
|
||||
|
||||
# Test: Job exec execution
|
||||
echo '{"outputs":[...], "env":{"DATA_DATE":"2024-01-01"}}' | \
|
||||
bazel run @test_graph//:ingest_color_votes.exec
|
||||
# Should execute successfully
|
||||
|
||||
# Test: Job lookup
|
||||
bazel run @test_graph//:job_lookup -- \
|
||||
"daily_color_votes/2024-01-01/red"
|
||||
# Should output: //:ingest_color_votes
|
||||
```
|
||||
|
||||
#### Success Criteria
|
||||
- All DSL jobs become executable `databuild_job` targets
|
||||
- Job binaries correctly handle config/exec subcommands
|
||||
- Job lookup correctly maps partition patterns to job targets
|
||||
- Generated jobs maintain DSL semantic behavior
|
||||
|
||||
---
|
||||
|
||||
### Phase 3: Graph Integration
|
||||
**Goal**: Generate complete databuild graph targets with all operational variants
|
||||
|
||||
#### Deliverables
|
||||
- Generate `databuild_graph` target with analyze/build/service capabilities
|
||||
- Create all graph variant targets (`.analyze`, `.build`, `.service`, etc.)
|
||||
- Wire job dependencies into graph configuration
|
||||
- Generate container deployment targets
|
||||
|
||||
#### Implementation Tasks
|
||||
1. Generate `databuild_graph` target with complete job list
|
||||
2. Create all required graph variants:
|
||||
- `my_graph.analyze` - Planning capability
|
||||
- `my_graph.build` - CLI execution
|
||||
- `my_graph.service` - HTTP service
|
||||
- `my_graph.service.image` - Container image
|
||||
3. Configure job lookup and dependency wiring
|
||||
4. Add graph label and identification metadata
|
||||
|
||||
#### Tests & Verification
|
||||
```bash
|
||||
# Test: Graph analysis
|
||||
bazel run @test_graph//:my_graph.analyze -- \
|
||||
"color_vote_report/2024-01-01/red"
|
||||
# Should output complete job execution plan
|
||||
|
||||
# Test: Graph building
|
||||
bazel run @test_graph//:my_graph.build -- \
|
||||
"daily_color_votes/2024-01-01/red"
|
||||
# Should execute end-to-end build
|
||||
|
||||
# Test: Service deployment
|
||||
bazel run @test_graph//:my_graph.service -- --port 8081
|
||||
# Should start HTTP service on port 8081
|
||||
|
||||
# Test: Container generation
|
||||
bazel build @test_graph//:my_graph.service.image
|
||||
# Should create deployable container image
|
||||
```
|
||||
|
||||
#### Success Criteria
|
||||
- Graph targets provide full databuild functionality
|
||||
- CLI and service interfaces produce identical results
|
||||
- All graph operations work with generated job targets
|
||||
- Container images are deployable and functional
|
||||
|
||||
---
|
||||
|
||||
### Phase 4: Dependency Resolution
|
||||
**Goal**: Handle external pip packages and bazel dependencies in generated modules
|
||||
|
||||
#### Deliverables
|
||||
- User-declared dependency system in DSL
|
||||
- Generated `MODULE.bazel` with proper pip and bazel dependencies
|
||||
- Dependency validation and conflict resolution
|
||||
- Support for requirements files and version pinning
|
||||
|
||||
#### Implementation Tasks
|
||||
1. Extend `DataBuildGraph` constructor to accept dependencies:
|
||||
```python
|
||||
graph = DataBuildGraph(
|
||||
"//my_graph",
|
||||
pip_deps=["pandas>=2.0.0", "numpy"],
|
||||
bazel_deps=["@my_repo//internal:lib"]
|
||||
)
|
||||
```
|
||||
|
||||
2. Generate `MODULE.bazel` with pip extension configuration:
|
||||
```python
|
||||
pip = use_extension("@rules_python//python/extensions:pip.bzl", "pip")
|
||||
pip.parse(
|
||||
hub_name = "pip_deps",
|
||||
python_version = "3.11",
|
||||
requirements_lock = "//:requirements_lock.txt"
|
||||
)
|
||||
```
|
||||
|
||||
3. Create requirements file generation from declared dependencies
|
||||
4. Add dependency validation during generation
|
||||
|
||||
#### Tests & Verification
|
||||
```bash
|
||||
# Test: Pip dependencies resolved
|
||||
bazel build @test_graph//:my_job
|
||||
# Should succeed with pandas/numpy available
|
||||
|
||||
# Test: Cross-module references work
|
||||
# Generate graph that depends on @other_repo//lib
|
||||
bazel build @test_graph//:dependent_job
|
||||
# Should resolve external bazel dependencies
|
||||
|
||||
# Test: Container includes all deps
|
||||
bazel run @test_graph//:my_graph.service.image_load
|
||||
docker run databuild_test_graph_service:latest python -c "import pandas"
|
||||
# Should succeed - pandas available in container
|
||||
```
|
||||
|
||||
#### Success Criteria
|
||||
- Generated modules resolve all external dependencies
|
||||
- Pip packages are available to job execution
|
||||
- Cross-repository bazel dependencies work correctly
|
||||
- Container images include complete dependency closure
|
||||
|
||||
---
|
||||
|
||||
### Phase 5: End-to-End Deployment
|
||||
**Goal**: Complete production deployment pipeline with observability
|
||||
|
||||
#### Deliverables
|
||||
- Production-ready container images with proper configuration
|
||||
- Integration with existing databuild observability systems
|
||||
- Build event log compatibility
|
||||
- Performance optimization and resource management
|
||||
|
||||
#### Implementation Tasks
|
||||
1. Optimize generated container images for production use
|
||||
2. Ensure build event logging works correctly in generated modules
|
||||
3. Add resource configuration and limits to generated targets
|
||||
4. Create deployment documentation and examples
|
||||
5. Performance testing and optimization
|
||||
|
||||
#### Tests & Verification
|
||||
```bash
|
||||
./run_e2e_tests.sh
|
||||
```
|
||||
|
||||
#### Success Criteria
|
||||
- Generated modules are production-ready
|
||||
- Full observability and logging integration
|
||||
- Performance meets production requirements
|
||||
- CLI/Service consistency maintained
|
||||
- Complete deployment documentation
|
||||
|
||||
## Validation Strategy
|
||||
|
||||
### Integration with Existing Tests
|
||||
- Extend `run_e2e_tests.sh` to test generated modules
|
||||
- Add generated module tests to CI/CD pipeline
|
||||
- Use existing test app DSL as primary test case
|
||||
|
||||
### Performance Benchmarks
|
||||
- Graph analysis speed comparison (DSL vs hand-written bazel)
|
||||
- Container image size optimization
|
||||
- Job execution overhead measurement
|
||||
|
||||
### Correctness Verification
|
||||
- Build event log structure validation
|
||||
- Partition resolution accuracy testing
|
||||
- Dependency resolution completeness checks
|
||||
|
|
@ -1,10 +1,8 @@
|
|||
|
||||
- Implement python dsl
|
||||
- Achieve fast configuration (betterproto2 imports are sus)
|
||||
- Remove manual reference of enum values, e.g. [here](../databuild/repositories/builds/mod.rs:85)
|
||||
- Type-safe mithril [claude link](https://claude.ai/share/f33f8605-472a-4db4-9211-5a1e52087316)
|
||||
- Status indicator for page selection
|
||||
- On build request detail page, show aggregated job results
|
||||
- Use path based navigation instead of hashbang?
|
||||
- Add build request notes
|
||||
- How do we encode job labels in the path? (Build event job links are not encoding job labels properly)
|
||||
- Resolve double type system with protobuf and openapi
|
||||
- Plan for external worker dispatch (e.g. k8s pod per build, or launch in container service)
|
||||
|
|
@ -12,3 +10,6 @@
|
|||
- Should we have meaningful exit codes? E.g. "retry-able error", etc?
|
||||
- Fully joinable build/job IDs - ensure all execution logs / metrics are joinable to build request ID?
|
||||
- Triggers?
|
||||
- Add build request notes
|
||||
- Status indicator for page selection
|
||||
- Use path based navigation instead of hashbang?
|
||||
|
|
|
|||
Loading…
Reference in a new issue