Add tests for dsl test app

This commit is contained in:
Stuart Axelbrooke 2025-07-31 23:55:58 -07:00
parent ae5147cb36
commit 2ad4ae6d3c
10 changed files with 1047 additions and 3 deletions

View file

@ -78,6 +78,7 @@ class DataBuildGraph:
for partition in cls.output_types:
assert partition not in self.lookup, f"Partition `{partition}` already registered"
self.lookup[partition] = cls
return cls
def generate_bazel_module(self):
"""Generates a complete databuild application, packaging up referenced jobs and this graph via bazel targets"""
@ -101,7 +102,7 @@ class JobConfigBuilder:
def add_inputs(self, *partitions: PartitionPattern, dep_type: DepType=DepType.MATERIALIZE) -> Self:
for p in partitions:
dep_type_name = "materialize" if dep_type == DepType.Materialize else "query"
dep_type_name = "materialize" if dep_type == DepType.MATERIALIZE else "query"
self.inputs.append(DataDep(dep_type_code=dep_type, dep_type_name=dep_type_name, partition_ref=PartitionRef(str=p.serialize())))
return self

View file

@ -0,0 +1,13 @@
py_library(
name = "dsl_src",
srcs = glob(
["*.py"],
exclude = ["test_*.py"],
),
visibility = ["//visibility:public"],
deps = [
"//databuild:py_proto",
"//databuild/dsl/python:dsl",
"//databuild/test/app:job_src",
],
)

View file

@ -60,7 +60,7 @@ class TrailingColorVotes(DataBuildJob):
max_window = max(max_window, 28)
env = {"DATA_DATE": data_date, "COLOR": color, "WEEKLY": weekly, "MONTHLY": monthly}
config = JobConfigBuilder(env=env, outputs=outputs)
config = JobConfigBuilder(env=env).add_outputs(*outputs)
for i in range(max_window):
in_date = (date.fromisoformat(data_date) - timedelta(days=i)).isoformat()
config.add_inputs(IngestedColorPartition(data_date=in_date, color=color))
@ -107,7 +107,7 @@ class ColorVoteReportCalc(DataBuildJob):
output_types = [ColorVoteReportPartition]
def config(self, outputs: list[ColorVoteReportPartition]) -> list[JobConfig]:
config = JobConfigBuilder().add_outputs(*outputs).add_args(*[p.str for p in outputs])
config = JobConfigBuilder().add_outputs(*outputs).add_args(*[p.serialize() for p in outputs])
for data_date in set(p.data_date for p in outputs):
config.add_inputs(

View file

@ -0,0 +1,75 @@
# Individual job configuration tests
py_test(
name = "test_ingest_color_votes",
srcs = ["test_ingest_color_votes.py"],
main = "test_ingest_color_votes.py",
deps = [
"//databuild:py_proto",
"//databuild/dsl/python:dsl",
"//databuild/test/app:job_src",
"//databuild/test/app/dsl:dsl_src",
],
)
py_test(
name = "test_trailing_color_votes",
srcs = ["test_trailing_color_votes.py"],
main = "test_trailing_color_votes.py",
deps = [
"//databuild:py_proto",
"//databuild/dsl/python:dsl",
"//databuild/test/app:job_src",
"//databuild/test/app/dsl:dsl_src",
],
)
py_test(
name = "test_aggregate_color_votes",
srcs = ["test_aggregate_color_votes.py"],
main = "test_aggregate_color_votes.py",
deps = [
"//databuild:py_proto",
"//databuild/dsl/python:dsl",
"//databuild/test/app:job_src",
"//databuild/test/app/dsl:dsl_src",
],
)
py_test(
name = "test_color_vote_report_calc",
srcs = ["test_color_vote_report_calc.py"],
main = "test_color_vote_report_calc.py",
deps = [
"//databuild:py_proto",
"//databuild/dsl/python:dsl",
"//databuild/test/app:job_src",
"//databuild/test/app/dsl:dsl_src",
],
)
# Graph analysis test
py_test(
name = "test_graph_analysis",
srcs = ["test_graph_analysis.py"],
main = "test_graph_analysis.py",
deps = [
"//databuild:py_proto",
"//databuild/dsl/python:dsl",
"//databuild/test/app:job_src",
"//databuild/test/app/dsl:dsl_src",
],
)
# Bazel vs DSL comparison test
py_test(
name = "test_bazel_dsl_comparison",
srcs = ["test_bazel_dsl_comparison.py"],
main = "test_bazel_dsl_comparison.py",
deps = [
"//databuild:py_proto",
"//databuild/dsl/python:dsl",
"//databuild/test/app:job_src",
"//databuild/test/app/bazel:job_src",
"//databuild/test/app/dsl:dsl_src",
],
)

View file

@ -0,0 +1,159 @@
from databuild.test.app.dsl.graph import AggregateColorVotes
from databuild.test.app.dsl.partitions import (
DailyVotesPartition,
Votes1WPartition,
Votes1MPartition,
IngestedColorPartition,
TrailingColorVotes1WPartition,
TrailingColorVotes1MPartition
)
from databuild.test.app.colors import COLORS
from databuild.proto import DepType
def test_aggregate_color_votes_configure_daily_votes():
"""Test AggregateColorVotes config method with daily votes output."""
job = AggregateColorVotes()
outputs = [DailyVotesPartition(data_date="2025-01-15")]
configs = job.config(outputs)
assert len(configs) == 1
config = configs[0]
assert len(config.outputs) == 1
assert config.outputs[0].str == "daily_votes/2025-01-15"
assert config.env["DATA_DATE"] == "2025-01-15"
assert config.env["AGGREGATE_TYPE"] == "daily_votes"
# Should have inputs for all colors
assert len(config.inputs) == len(COLORS)
expected_inputs = {f"daily_color_votes/2025-01-15/{color}" for color in COLORS}
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
assert actual_inputs == expected_inputs
# All inputs should be MATERIALIZE type
for input_dep in config.inputs:
assert input_dep.dep_type_code == DepType.MATERIALIZE
assert input_dep.dep_type_name == "materialize"
def test_aggregate_color_votes_configure_votes_1w():
"""Test AggregateColorVotes config method with weekly votes output."""
job = AggregateColorVotes()
outputs = [Votes1WPartition(data_date="2025-01-15")]
configs = job.config(outputs)
assert len(configs) == 1
config = configs[0]
assert len(config.outputs) == 1
assert config.outputs[0].str == "votes_1w/2025-01-15"
assert config.env["DATA_DATE"] == "2025-01-15"
assert config.env["AGGREGATE_TYPE"] == "votes_1w"
# Should have inputs for all colors from trailing 1w partitions
assert len(config.inputs) == len(COLORS)
expected_inputs = {f"color_votes_1w/2025-01-15/{color}" for color in COLORS}
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
assert actual_inputs == expected_inputs
def test_aggregate_color_votes_configure_votes_1m():
"""Test AggregateColorVotes config method with monthly votes output."""
job = AggregateColorVotes()
outputs = [Votes1MPartition(data_date="2025-01-15")]
configs = job.config(outputs)
assert len(configs) == 1
config = configs[0]
assert len(config.outputs) == 1
assert config.outputs[0].str == "votes_1m/2025-01-15"
assert config.env["DATA_DATE"] == "2025-01-15"
assert config.env["AGGREGATE_TYPE"] == "votes_1m"
# Should have inputs for all colors from trailing 1m partitions
assert len(config.inputs) == len(COLORS)
expected_inputs = {f"color_votes_1m/2025-01-15/{color}" for color in COLORS}
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
assert actual_inputs == expected_inputs
def test_aggregate_color_votes_configure_multiple_outputs():
"""Test AggregateColorVotes config method with multiple different output types."""
job = AggregateColorVotes()
outputs = [
DailyVotesPartition(data_date="2025-01-15"),
Votes1WPartition(data_date="2025-01-16"),
Votes1MPartition(data_date="2025-01-17")
]
configs = job.config(outputs)
assert len(configs) == 3 # One config per output
# Find configs by date
daily_config = None
weekly_config = None
monthly_config = None
for config in configs:
if config.env["DATA_DATE"] == "2025-01-15":
daily_config = config
elif config.env["DATA_DATE"] == "2025-01-16":
weekly_config = config
elif config.env["DATA_DATE"] == "2025-01-17":
monthly_config = config
assert daily_config is not None
assert weekly_config is not None
assert monthly_config is not None
# Check daily config
assert daily_config.env["AGGREGATE_TYPE"] == "daily_votes"
assert daily_config.outputs[0].str == "daily_votes/2025-01-15"
assert len(daily_config.inputs) == len(COLORS)
assert all("daily_color_votes/2025-01-15/" in inp.partition_ref.str for inp in daily_config.inputs)
# Check weekly config
assert weekly_config.env["AGGREGATE_TYPE"] == "votes_1w"
assert weekly_config.outputs[0].str == "votes_1w/2025-01-16"
assert len(weekly_config.inputs) == len(COLORS)
assert all("color_votes_1w/2025-01-16/" in inp.partition_ref.str for inp in weekly_config.inputs)
# Check monthly config
assert monthly_config.env["AGGREGATE_TYPE"] == "votes_1m"
assert monthly_config.outputs[0].str == "votes_1m/2025-01-17"
assert len(monthly_config.inputs) == len(COLORS)
assert all("color_votes_1m/2025-01-17/" in inp.partition_ref.str for inp in monthly_config.inputs)
def test_aggregate_color_votes_configure_multiple_same_type():
"""Test AggregateColorVotes config method with multiple outputs of same type."""
job = AggregateColorVotes()
outputs = [
DailyVotesPartition(data_date="2025-01-15"),
DailyVotesPartition(data_date="2025-01-16")
]
configs = job.config(outputs)
assert len(configs) == 2 # One config per output
for config in configs:
assert config.env["AGGREGATE_TYPE"] == "daily_votes"
assert len(config.inputs) == len(COLORS)
if config.env["DATA_DATE"] == "2025-01-15":
assert config.outputs[0].str == "daily_votes/2025-01-15"
assert all("daily_color_votes/2025-01-15/" in inp.partition_ref.str for inp in config.inputs)
elif config.env["DATA_DATE"] == "2025-01-16":
assert config.outputs[0].str == "daily_votes/2025-01-16"
assert all("daily_color_votes/2025-01-16/" in inp.partition_ref.str for inp in config.inputs)
else:
assert False, f"Unexpected date: {config.env['DATA_DATE']}"
if __name__ == '__main__':
import pytest
raise SystemExit(pytest.main([__file__]))

View file

@ -0,0 +1,244 @@
#!/usr/bin/env python3
"""
Comparison test between Bazel and DSL implementations.
This test verifies that the DSL job configurations produce identical results
to the equivalent bazel job configurations for the same partition references.
"""
import unittest
from databuild.proto import PartitionRef, JobConfigureResponse
from databuild.test.app.dsl.graph import (
IngestColorVotes,
TrailingColorVotes,
AggregateColorVotes,
ColorVoteReportCalc
)
from databuild.test.app.dsl.partitions import (
IngestedColorPartition,
TrailingColorVotes1WPartition,
TrailingColorVotes1MPartition,
DailyVotesPartition,
Votes1WPartition,
Votes1MPartition,
ColorVoteReportPartition
)
# Import bazel job config functions
from databuild.test.app.bazel.jobs.ingest_color_votes.config import configure as bazel_ingest_config
from databuild.test.app.bazel.jobs.trailing_color_votes.config import configure as bazel_trailing_config
from databuild.test.app.bazel.jobs.aggregate_color_votes.config import configure as bazel_aggregate_config
from databuild.test.app.bazel.jobs.color_vote_report_calc.config import configure as bazel_report_config
class BazelDSLComparisonTest(unittest.TestCase):
"""Compare bazel and DSL job configurations to ensure they produce identical results."""
def _compare_job_configs(self, bazel_response, dsl_configs):
"""Helper to compare JobConfigureResponse from bazel with list[JobConfig] from DSL."""
self.assertIsInstance(bazel_response, JobConfigureResponse)
self.assertIsInstance(dsl_configs, list)
bazel_configs = bazel_response.configs
self.assertEqual(len(bazel_configs), len(dsl_configs),
"Bazel and DSL should produce same number of configs")
# Sort both by a stable key for comparison
def config_sort_key(config):
outputs_str = ",".join(sorted(out.str for out in config.outputs))
env_str = ",".join(f"{k}={v}" for k, v in sorted(config.env.items()))
return f"{outputs_str}:{env_str}"
bazel_sorted = sorted(bazel_configs, key=config_sort_key)
dsl_sorted = sorted(dsl_configs, key=config_sort_key)
for bazel_config, dsl_config in zip(bazel_sorted, dsl_sorted):
# Compare outputs
bazel_outputs = {out.str for out in bazel_config.outputs}
dsl_outputs = {out.str for out in dsl_config.outputs}
self.assertEqual(bazel_outputs, dsl_outputs, "Outputs should match")
# Compare inputs
bazel_inputs = {(inp.partition_ref.str, inp.dep_type_code, inp.dep_type_name)
for inp in bazel_config.inputs}
dsl_inputs = {(inp.partition_ref.str, inp.dep_type_code, inp.dep_type_name)
for inp in dsl_config.inputs}
self.assertEqual(bazel_inputs, dsl_inputs, "Inputs should match")
# Compare args
self.assertEqual(set(bazel_config.args), set(dsl_config.args), "Args should match")
# Compare env
self.assertEqual(bazel_config.env, dsl_config.env, "Environment should match")
def test_ingest_color_votes_comparison(self):
"""Compare IngestColorVotes bazel vs DSL configurations."""
# Test single output
partition_refs = [PartitionRef(str="daily_color_votes/2025-01-01/red")]
bazel_response = bazel_ingest_config(partition_refs)
partitions = [IngestedColorPartition.deserialize(ref.str) for ref in partition_refs]
dsl_job = IngestColorVotes()
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
# Test multiple outputs
partition_refs = [
PartitionRef(str="daily_color_votes/2025-01-02/red"),
PartitionRef(str="daily_color_votes/2025-01-02/blue")
]
bazel_response = bazel_ingest_config(partition_refs)
partitions = [IngestedColorPartition.deserialize(ref.str) for ref in partition_refs]
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
def test_trailing_color_votes_comparison(self):
"""Compare TrailingColorVotes bazel vs DSL configurations."""
# Test weekly output
partition_refs = [PartitionRef(str="color_votes_1w/2025-01-07/red")]
bazel_response = bazel_trailing_config(partition_refs)
partitions = [TrailingColorVotes1WPartition.deserialize(ref.str) for ref in partition_refs]
dsl_job = TrailingColorVotes()
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
# Test monthly output
partition_refs = [PartitionRef(str="color_votes_1m/2025-01-28/blue")]
bazel_response = bazel_trailing_config(partition_refs)
partitions = [TrailingColorVotes1MPartition.deserialize(ref.str) for ref in partition_refs]
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
# Test mixed weekly and monthly for same date/color
partition_refs = [
PartitionRef(str="color_votes_1w/2025-01-28/green"),
PartitionRef(str="color_votes_1m/2025-01-28/green")
]
bazel_response = bazel_trailing_config(partition_refs)
partitions = [
TrailingColorVotes1WPartition.deserialize(partition_refs[0].str),
TrailingColorVotes1MPartition.deserialize(partition_refs[1].str)
]
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
def test_aggregate_color_votes_comparison(self):
"""Compare AggregateColorVotes bazel vs DSL configurations."""
# Test daily votes
partition_refs = [PartitionRef(str="daily_votes/2025-01-15")]
bazel_response = bazel_aggregate_config(partition_refs)
partitions = [DailyVotesPartition.deserialize(ref.str) for ref in partition_refs]
dsl_job = AggregateColorVotes()
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
# Test weekly votes
partition_refs = [PartitionRef(str="votes_1w/2025-01-15")]
bazel_response = bazel_aggregate_config(partition_refs)
partitions = [Votes1WPartition.deserialize(ref.str) for ref in partition_refs]
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
# Test monthly votes
partition_refs = [PartitionRef(str="votes_1m/2025-01-15")]
bazel_response = bazel_aggregate_config(partition_refs)
partitions = [Votes1MPartition.deserialize(ref.str) for ref in partition_refs]
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
# Test multiple different types
partition_refs = [
PartitionRef(str="daily_votes/2025-01-15"),
PartitionRef(str="votes_1w/2025-01-16"),
PartitionRef(str="votes_1m/2025-01-17")
]
bazel_response = bazel_aggregate_config(partition_refs)
partitions = [
DailyVotesPartition.deserialize(partition_refs[0].str),
Votes1WPartition.deserialize(partition_refs[1].str),
Votes1MPartition.deserialize(partition_refs[2].str)
]
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
def test_color_vote_report_calc_comparison(self):
"""Compare ColorVoteReportCalc bazel vs DSL configurations."""
# Test single report
partition_refs = [PartitionRef(str="color_vote_report/2025-01-15/red")]
bazel_response = bazel_report_config(partition_refs)
partitions = [ColorVoteReportPartition.deserialize(ref.str) for ref in partition_refs]
dsl_job = ColorVoteReportCalc()
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
# Test multiple reports same date
partition_refs = [
PartitionRef(str="color_vote_report/2025-01-15/red"),
PartitionRef(str="color_vote_report/2025-01-15/blue")
]
bazel_response = bazel_report_config(partition_refs)
partitions = [ColorVoteReportPartition.deserialize(ref.str) for ref in partition_refs]
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
# Test multiple reports different dates
partition_refs = [
PartitionRef(str="color_vote_report/2025-01-15/red"),
PartitionRef(str="color_vote_report/2025-01-16/red")
]
bazel_response = bazel_report_config(partition_refs)
partitions = [ColorVoteReportPartition.deserialize(ref.str) for ref in partition_refs]
dsl_configs = dsl_job.config(partitions)
self._compare_job_configs(bazel_response, dsl_configs)
def test_partition_serialization_roundtrip(self):
"""Test that DSL partition serialization/deserialization works correctly."""
test_cases = [
IngestedColorPartition(data_date="2025-01-15", color="red"),
TrailingColorVotes1WPartition(data_date="2025-01-15", color="blue"),
TrailingColorVotes1MPartition(data_date="2025-01-28", color="green"),
DailyVotesPartition(data_date="2025-01-15"),
Votes1WPartition(data_date="2025-01-15"),
Votes1MPartition(data_date="2025-01-15"),
ColorVoteReportPartition(data_date="2025-01-15", color="yellow")
]
for partition in test_cases:
with self.subTest(partition=partition):
# Serialize then deserialize
serialized = partition.serialize()
deserialized = type(partition).deserialize(serialized)
# Should be equal
self.assertEqual(partition, deserialized)
# Serializing again should give same result
reserialized = deserialized.serialize()
self.assertEqual(serialized, reserialized)
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,204 @@
from databuild.test.app.dsl.graph import ColorVoteReportCalc
from databuild.test.app.dsl.partitions import (
ColorVoteReportPartition,
DailyVotesPartition,
Votes1WPartition,
Votes1MPartition,
IngestedColorPartition,
TrailingColorVotes1WPartition,
TrailingColorVotes1MPartition
)
from databuild.proto import DepType
def test_color_vote_report_calc_configure_single_output():
"""Test ColorVoteReportCalc config method with single color report output."""
job = ColorVoteReportCalc()
outputs = [ColorVoteReportPartition(data_date="2025-01-15", color="red")]
configs = job.config(outputs)
assert len(configs) == 1
config = configs[0]
# Check outputs
assert len(config.outputs) == 1
assert config.outputs[0].str == "color_vote_report/2025-01-15/red"
# Check args - should contain partition strings
assert len(config.args) == 1
assert config.args[0] == "color_vote_report/2025-01-15/red"
# Check inputs - should have aggregate inputs for the date and specific color inputs
expected_inputs = {
# Aggregate inputs for the date
"daily_votes/2025-01-15",
"votes_1w/2025-01-15",
"votes_1m/2025-01-15",
# Color-specific inputs
"daily_color_votes/2025-01-15/red",
"color_votes_1w/2025-01-15/red",
"color_votes_1m/2025-01-15/red"
}
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
assert actual_inputs == expected_inputs
# All inputs should be MATERIALIZE type
for input_dep in config.inputs:
assert input_dep.dep_type_code == DepType.MATERIALIZE
assert input_dep.dep_type_name == "materialize"
def test_color_vote_report_calc_configure_multiple_colors_same_date():
"""Test ColorVoteReportCalc config method with multiple colors for same date."""
job = ColorVoteReportCalc()
outputs = [
ColorVoteReportPartition(data_date="2025-01-15", color="red"),
ColorVoteReportPartition(data_date="2025-01-15", color="blue")
]
configs = job.config(outputs)
assert len(configs) == 1 # Single config since all outputs go to same job
config = configs[0]
# Check outputs
assert len(config.outputs) == 2
output_strs = {output.str for output in config.outputs}
assert "color_vote_report/2025-01-15/red" in output_strs
assert "color_vote_report/2025-01-15/blue" in output_strs
# Check args - should contain both partition strings
assert len(config.args) == 2
assert set(config.args) == {"color_vote_report/2025-01-15/red", "color_vote_report/2025-01-15/blue"}
# Check inputs - should have aggregate inputs for the date and color-specific inputs for both colors
expected_inputs = {
# Aggregate inputs for the date (only one set since same date)
"daily_votes/2025-01-15",
"votes_1w/2025-01-15",
"votes_1m/2025-01-15",
# Color-specific inputs for red
"daily_color_votes/2025-01-15/red",
"color_votes_1w/2025-01-15/red",
"color_votes_1m/2025-01-15/red",
# Color-specific inputs for blue
"daily_color_votes/2025-01-15/blue",
"color_votes_1w/2025-01-15/blue",
"color_votes_1m/2025-01-15/blue"
}
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
assert actual_inputs == expected_inputs
def test_color_vote_report_calc_configure_multiple_dates():
"""Test ColorVoteReportCalc config method with reports for different dates."""
job = ColorVoteReportCalc()
outputs = [
ColorVoteReportPartition(data_date="2025-01-15", color="red"),
ColorVoteReportPartition(data_date="2025-01-16", color="red")
]
configs = job.config(outputs)
assert len(configs) == 1 # Single config since all outputs go to same job
config = configs[0]
# Check outputs
assert len(config.outputs) == 2
output_strs = {output.str for output in config.outputs}
assert "color_vote_report/2025-01-15/red" in output_strs
assert "color_vote_report/2025-01-16/red" in output_strs
# Check args
assert len(config.args) == 2
assert set(config.args) == {"color_vote_report/2025-01-15/red", "color_vote_report/2025-01-16/red"}
# Check inputs - should have aggregate inputs for both dates and color-specific inputs
expected_inputs = {
# Aggregate inputs for both dates
"daily_votes/2025-01-15",
"votes_1w/2025-01-15",
"votes_1m/2025-01-15",
"daily_votes/2025-01-16",
"votes_1w/2025-01-16",
"votes_1m/2025-01-16",
# Color-specific inputs for red on both dates
"daily_color_votes/2025-01-15/red",
"color_votes_1w/2025-01-15/red",
"color_votes_1m/2025-01-15/red",
"daily_color_votes/2025-01-16/red",
"color_votes_1w/2025-01-16/red",
"color_votes_1m/2025-01-16/red"
}
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
assert actual_inputs == expected_inputs
def test_color_vote_report_calc_configure_complex_scenario():
"""Test ColorVoteReportCalc config method with complex multi-date, multi-color scenario."""
job = ColorVoteReportCalc()
outputs = [
ColorVoteReportPartition(data_date="2025-01-15", color="red"),
ColorVoteReportPartition(data_date="2025-01-15", color="blue"),
ColorVoteReportPartition(data_date="2025-01-16", color="green"),
ColorVoteReportPartition(data_date="2025-01-17", color="red")
]
configs = job.config(outputs)
assert len(configs) == 1 # Single config since all outputs go to same job
config = configs[0]
# Check outputs
assert len(config.outputs) == 4
expected_output_strs = {
"color_vote_report/2025-01-15/red",
"color_vote_report/2025-01-15/blue",
"color_vote_report/2025-01-16/green",
"color_vote_report/2025-01-17/red"
}
actual_output_strs = {output.str for output in config.outputs}
assert actual_output_strs == expected_output_strs
# Check args
assert len(config.args) == 4
assert set(config.args) == expected_output_strs
# Check inputs - should have aggregate inputs for all unique dates and color-specific inputs
expected_inputs = {
# Aggregate inputs for all dates
"daily_votes/2025-01-15",
"votes_1w/2025-01-15",
"votes_1m/2025-01-15",
"daily_votes/2025-01-16",
"votes_1w/2025-01-16",
"votes_1m/2025-01-16",
"daily_votes/2025-01-17",
"votes_1w/2025-01-17",
"votes_1m/2025-01-17",
# Color-specific inputs
"daily_color_votes/2025-01-15/red",
"color_votes_1w/2025-01-15/red",
"color_votes_1m/2025-01-15/red",
"daily_color_votes/2025-01-15/blue",
"color_votes_1w/2025-01-15/blue",
"color_votes_1m/2025-01-15/blue",
"daily_color_votes/2025-01-16/green",
"color_votes_1w/2025-01-16/green",
"color_votes_1m/2025-01-16/green",
"daily_color_votes/2025-01-17/red",
"color_votes_1w/2025-01-17/red",
"color_votes_1m/2025-01-17/red"
}
actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs}
assert actual_inputs == expected_inputs
if __name__ == '__main__':
import pytest
raise SystemExit(pytest.main([__file__]))

View file

@ -0,0 +1,157 @@
#!/usr/bin/env python3
"""
Integration test for the DSL graph analysis.
This test verifies that when we request color vote reports via the DSL graph,
the analyzer correctly identifies all upstream dependencies and jobs required.
NOTE: This test assumes the DSL graph will have an analyze() method similar to
the bazel graph analyzer. This functionality is not yet implemented but these
tests will validate it once available.
"""
import unittest
from databuild.test.app.dsl.graph import graph
from databuild.test.app.dsl.partitions import ColorVoteReportPartition
class DSLGraphAnalysisTest(unittest.TestCase):
def setUp(self):
# Ensure we have the graph instance
self.graph = graph
def test_single_color_report_dependencies(self):
"""Test dependencies for a single color vote report via DSL."""
partition_refs = ["color_vote_report/2024-01-15/red"]
# TODO: Once DSL graph analysis is implemented, this should call:
# result = self.graph.analyze(partition_refs)
# self.assertIn('nodes', result)
# For now, we can at least verify the graph structure
self.assertIsNotNone(self.graph)
self.assertGreater(len(self.graph.lookup), 0)
# Verify we can create the partition and find its producer
partition = ColorVoteReportPartition(data_date="2024-01-15", color="red")
producer_job_class = self.graph.lookup.get(ColorVoteReportPartition)
self.assertIsNotNone(producer_job_class, "ColorVoteReportPartition should have a registered producer")
# Test that we can call the job's config method
job_instance = producer_job_class()
configs = job_instance.config([partition])
self.assertIsInstance(configs, list)
self.assertGreater(len(configs), 0)
def test_multiple_color_reports_same_date(self):
"""Test dependencies when requesting multiple colors for the same date via DSL."""
partition_refs = [
"color_vote_report/2024-01-15/red",
"color_vote_report/2024-01-15/blue"
]
# TODO: Once DSL graph analysis is implemented, this should call:
# result = self.graph.analyze(partition_refs)
# self.assertIn('nodes', result)
# For now, verify we can handle multiple partitions
partitions = [
ColorVoteReportPartition(data_date="2024-01-15", color="red"),
ColorVoteReportPartition(data_date="2024-01-15", color="blue")
]
producer_job_class = self.graph.lookup.get(ColorVoteReportPartition)
self.assertIsNotNone(producer_job_class)
job_instance = producer_job_class()
configs = job_instance.config(partitions)
self.assertIsInstance(configs, list)
self.assertGreater(len(configs), 0)
def test_multiple_dates_dependencies(self):
"""Test dependencies when requesting reports for different dates via DSL."""
partition_refs = [
"color_vote_report/2024-01-15/red",
"color_vote_report/2024-01-16/red"
]
# TODO: Once DSL graph analysis is implemented, this should call:
# result = self.graph.analyze(partition_refs)
# self.assertIn('nodes', result)
# For now, verify we can handle different dates
partitions = [
ColorVoteReportPartition(data_date="2024-01-15", color="red"),
ColorVoteReportPartition(data_date="2024-01-16", color="red")
]
producer_job_class = self.graph.lookup.get(ColorVoteReportPartition)
self.assertIsNotNone(producer_job_class)
job_instance = producer_job_class()
configs = job_instance.config(partitions)
self.assertIsInstance(configs, list)
self.assertGreater(len(configs), 0)
def test_graph_completeness(self):
"""Test that the DSL graph has all expected partition types registered."""
from databuild.test.app.dsl.partitions import (
IngestedColorPartition,
TrailingColorVotes1WPartition,
TrailingColorVotes1MPartition,
DailyVotesPartition,
Votes1WPartition,
Votes1MPartition,
ColorVoteReportPartition
)
expected_partitions = {
IngestedColorPartition,
TrailingColorVotes1WPartition,
TrailingColorVotes1MPartition,
DailyVotesPartition,
Votes1WPartition,
Votes1MPartition,
ColorVoteReportPartition
}
registered_partitions = set(self.graph.lookup.keys())
self.assertEqual(registered_partitions, expected_partitions,
"All partition types should be registered in the graph")
def test_partition_lookup_functionality(self):
"""Test that partition lookup works correctly for all partition types."""
from databuild.test.app.dsl.partitions import (
IngestedColorPartition,
TrailingColorVotes1WPartition,
TrailingColorVotes1MPartition,
DailyVotesPartition,
Votes1WPartition,
Votes1MPartition,
ColorVoteReportPartition
)
# Test each partition type can be looked up and has a valid job
test_cases = [
(IngestedColorPartition, IngestedColorPartition(data_date="2024-01-15", color="red")),
(TrailingColorVotes1WPartition, TrailingColorVotes1WPartition(data_date="2024-01-15", color="red")),
(TrailingColorVotes1MPartition, TrailingColorVotes1MPartition(data_date="2024-01-15", color="red")),
(DailyVotesPartition, DailyVotesPartition(data_date="2024-01-15")),
(Votes1WPartition, Votes1WPartition(data_date="2024-01-15")),
(Votes1MPartition, Votes1MPartition(data_date="2024-01-15")),
(ColorVoteReportPartition, ColorVoteReportPartition(data_date="2024-01-15", color="red"))
]
for partition_type, partition_instance in test_cases:
with self.subTest(partition_type=partition_type.__name__):
job_class = self.graph.lookup.get(partition_type)
self.assertIsNotNone(job_class, f"Job class for {partition_type.__name__} should be registered")
# Verify we can instantiate the job and call config
job_instance = job_class()
configs = job_instance.config([partition_instance])
self.assertIsInstance(configs, list, f"Config method for {partition_type.__name__} should return a list")
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,56 @@
from databuild.test.app.dsl.graph import IngestColorVotes
from databuild.test.app.dsl.partitions import IngestedColorPartition
from databuild.proto import PartitionRef
def test_ingest_color_votes_configure_single():
"""Test IngestColorVotes config method with single output."""
job = IngestColorVotes()
outputs = [IngestedColorPartition(data_date="2025-01-01", color="red")]
configs = job.config(outputs)
assert len(configs) == 1
config = configs[0]
assert len(config.outputs) == 1
assert config.outputs[0].str == "daily_color_votes/2025-01-01/red"
assert config.env["COLOR"] == "red"
assert config.env["DATA_DATE"] == "2025-01-01"
assert len(config.inputs) == 0
assert len(config.args) == 0
def test_ingest_color_votes_configure_multiple():
"""Test IngestColorVotes config method with multiple outputs."""
job = IngestColorVotes()
outputs = [
IngestedColorPartition(data_date="2025-01-02", color="red"),
IngestedColorPartition(data_date="2025-01-02", color="blue"),
]
configs = job.config(outputs)
assert len(configs) == 2
# First config
config1 = configs[0]
assert len(config1.outputs) == 1
assert config1.outputs[0].str == "daily_color_votes/2025-01-02/red"
assert config1.env["COLOR"] == "red"
assert config1.env["DATA_DATE"] == "2025-01-02"
assert len(config1.inputs) == 0
assert len(config1.args) == 0
# Second config
config2 = configs[1]
assert len(config2.outputs) == 1
assert config2.outputs[0].str == "daily_color_votes/2025-01-02/blue"
assert config2.env["COLOR"] == "blue"
assert config2.env["DATA_DATE"] == "2025-01-02"
assert len(config2.inputs) == 0
assert len(config2.args) == 0
if __name__ == '__main__':
import pytest
raise SystemExit(pytest.main([__file__]))

View file

@ -0,0 +1,135 @@
from databuild.test.app.dsl.graph import TrailingColorVotes
from databuild.test.app.dsl.partitions import (
TrailingColorVotes1WPartition,
TrailingColorVotes1MPartition,
IngestedColorPartition
)
from databuild.proto import DepType
def test_trailing_color_votes_configure_weekly_only():
"""Test TrailingColorVotes config method with weekly output only."""
job = TrailingColorVotes()
outputs = [TrailingColorVotes1WPartition(data_date="2025-01-07", color="red")]
configs = job.config(outputs)
assert len(configs) == 1
config = configs[0]
assert len(config.outputs) == 1
assert config.outputs[0].str == "color_votes_1w/2025-01-07/red"
assert config.env["COLOR"] == "red"
assert config.env["DATA_DATE"] == "2025-01-07"
assert config.env["WEEKLY"] == "true"
assert config.env["MONTHLY"] == "false"
# Should have 7 days of inputs
assert len(config.inputs) == 7
expected_dates = ["2025-01-07", "2025-01-06", "2025-01-05", "2025-01-04",
"2025-01-03", "2025-01-02", "2025-01-01"]
for i, input_dep in enumerate(config.inputs):
assert input_dep.dep_type_code == DepType.MATERIALIZE
assert input_dep.dep_type_name == "materialize"
assert input_dep.partition_ref.str == f"daily_color_votes/{expected_dates[i]}/red"
def test_trailing_color_votes_configure_monthly_only():
"""Test TrailingColorVotes config method with monthly output only."""
job = TrailingColorVotes()
outputs = [TrailingColorVotes1MPartition(data_date="2025-01-28", color="blue")]
configs = job.config(outputs)
assert len(configs) == 1
config = configs[0]
assert len(config.outputs) == 1
assert config.outputs[0].str == "color_votes_1m/2025-01-28/blue"
assert config.env["COLOR"] == "blue"
assert config.env["DATA_DATE"] == "2025-01-28"
assert config.env["WEEKLY"] == "false"
assert config.env["MONTHLY"] == "true"
# Should have 28 days of inputs
assert len(config.inputs) == 28
# Check first and last input dates
assert config.inputs[0].partition_ref.str == "daily_color_votes/2025-01-28/blue"
assert config.inputs[27].partition_ref.str == "daily_color_votes/2025-01-01/blue"
def test_trailing_color_votes_configure_both_weekly_and_monthly():
"""Test TrailingColorVotes config method with both weekly and monthly outputs for same date/color."""
job = TrailingColorVotes()
outputs = [
TrailingColorVotes1WPartition(data_date="2025-01-28", color="green"),
TrailingColorVotes1MPartition(data_date="2025-01-28", color="green")
]
configs = job.config(outputs)
assert len(configs) == 1 # Should group by (data_date, color)
config = configs[0]
assert len(config.outputs) == 2
# Check outputs
output_strs = {output.str for output in config.outputs}
assert "color_votes_1w/2025-01-28/green" in output_strs
assert "color_votes_1m/2025-01-28/green" in output_strs
assert config.env["COLOR"] == "green"
assert config.env["DATA_DATE"] == "2025-01-28"
assert config.env["WEEKLY"] == "true"
assert config.env["MONTHLY"] == "true"
# Should have 28 days of inputs (max window)
assert len(config.inputs) == 28
def test_trailing_color_votes_configure_multiple_groups():
"""Test TrailingColorVotes config method with outputs that require separate configs."""
job = TrailingColorVotes()
outputs = [
TrailingColorVotes1WPartition(data_date="2025-01-07", color="red"),
TrailingColorVotes1WPartition(data_date="2025-01-07", color="blue"),
TrailingColorVotes1MPartition(data_date="2025-01-08", color="red")
]
configs = job.config(outputs)
assert len(configs) == 3 # Three different (data_date, color) combinations
# Find configs by their characteristics
red_7th_config = None
blue_7th_config = None
red_8th_config = None
for config in configs:
if config.env["DATA_DATE"] == "2025-01-07" and config.env["COLOR"] == "red":
red_7th_config = config
elif config.env["DATA_DATE"] == "2025-01-07" and config.env["COLOR"] == "blue":
blue_7th_config = config
elif config.env["DATA_DATE"] == "2025-01-08" and config.env["COLOR"] == "red":
red_8th_config = config
assert red_7th_config is not None
assert blue_7th_config is not None
assert red_8th_config is not None
# Check red 7th (weekly only)
assert red_7th_config.env["WEEKLY"] == "true"
assert red_7th_config.env["MONTHLY"] == "false"
assert len(red_7th_config.inputs) == 7
# Check blue 7th (weekly only)
assert blue_7th_config.env["WEEKLY"] == "true"
assert blue_7th_config.env["MONTHLY"] == "false"
assert len(blue_7th_config.inputs) == 7
# Check red 8th (monthly only)
assert red_8th_config.env["WEEKLY"] == "false"
assert red_8th_config.env["MONTHLY"] == "true"
assert len(red_8th_config.inputs) == 28
if __name__ == '__main__':
import pytest
raise SystemExit(pytest.main([__file__]))