diff --git a/databuild/dsl/python/dsl.py b/databuild/dsl/python/dsl.py index 53b1af3..e997ed3 100644 --- a/databuild/dsl/python/dsl.py +++ b/databuild/dsl/python/dsl.py @@ -78,6 +78,7 @@ class DataBuildGraph: for partition in cls.output_types: assert partition not in self.lookup, f"Partition `{partition}` already registered" self.lookup[partition] = cls + return cls def generate_bazel_module(self): """Generates a complete databuild application, packaging up referenced jobs and this graph via bazel targets""" @@ -101,7 +102,7 @@ class JobConfigBuilder: def add_inputs(self, *partitions: PartitionPattern, dep_type: DepType=DepType.MATERIALIZE) -> Self: for p in partitions: - dep_type_name = "materialize" if dep_type == DepType.Materialize else "query" + dep_type_name = "materialize" if dep_type == DepType.MATERIALIZE else "query" self.inputs.append(DataDep(dep_type_code=dep_type, dep_type_name=dep_type_name, partition_ref=PartitionRef(str=p.serialize()))) return self diff --git a/databuild/test/app/dsl/BUILD.bazel b/databuild/test/app/dsl/BUILD.bazel index e69de29..61a3a09 100644 --- a/databuild/test/app/dsl/BUILD.bazel +++ b/databuild/test/app/dsl/BUILD.bazel @@ -0,0 +1,13 @@ +py_library( + name = "dsl_src", + srcs = glob( + ["*.py"], + exclude = ["test_*.py"], + ), + visibility = ["//visibility:public"], + deps = [ + "//databuild:py_proto", + "//databuild/dsl/python:dsl", + "//databuild/test/app:job_src", + ], +) diff --git a/databuild/test/app/dsl/graph.py b/databuild/test/app/dsl/graph.py index c464cbf..1d1fdcf 100644 --- a/databuild/test/app/dsl/graph.py +++ b/databuild/test/app/dsl/graph.py @@ -60,7 +60,7 @@ class TrailingColorVotes(DataBuildJob): max_window = max(max_window, 28) env = {"DATA_DATE": data_date, "COLOR": color, "WEEKLY": weekly, "MONTHLY": monthly} - config = JobConfigBuilder(env=env, outputs=outputs) + config = JobConfigBuilder(env=env).add_outputs(*outputs) for i in range(max_window): in_date = (date.fromisoformat(data_date) - timedelta(days=i)).isoformat() config.add_inputs(IngestedColorPartition(data_date=in_date, color=color)) @@ -107,7 +107,7 @@ class ColorVoteReportCalc(DataBuildJob): output_types = [ColorVoteReportPartition] def config(self, outputs: list[ColorVoteReportPartition]) -> list[JobConfig]: - config = JobConfigBuilder().add_outputs(*outputs).add_args(*[p.str for p in outputs]) + config = JobConfigBuilder().add_outputs(*outputs).add_args(*[p.serialize() for p in outputs]) for data_date in set(p.data_date for p in outputs): config.add_inputs( diff --git a/databuild/test/app/dsl/test/BUILD.bazel b/databuild/test/app/dsl/test/BUILD.bazel new file mode 100644 index 0000000..1c93f48 --- /dev/null +++ b/databuild/test/app/dsl/test/BUILD.bazel @@ -0,0 +1,75 @@ +# Individual job configuration tests +py_test( + name = "test_ingest_color_votes", + srcs = ["test_ingest_color_votes.py"], + main = "test_ingest_color_votes.py", + deps = [ + "//databuild:py_proto", + "//databuild/dsl/python:dsl", + "//databuild/test/app:job_src", + "//databuild/test/app/dsl:dsl_src", + ], +) + +py_test( + name = "test_trailing_color_votes", + srcs = ["test_trailing_color_votes.py"], + main = "test_trailing_color_votes.py", + deps = [ + "//databuild:py_proto", + "//databuild/dsl/python:dsl", + "//databuild/test/app:job_src", + "//databuild/test/app/dsl:dsl_src", + ], +) + +py_test( + name = "test_aggregate_color_votes", + srcs = ["test_aggregate_color_votes.py"], + main = "test_aggregate_color_votes.py", + deps = [ + "//databuild:py_proto", + "//databuild/dsl/python:dsl", + "//databuild/test/app:job_src", + "//databuild/test/app/dsl:dsl_src", + ], +) + +py_test( + name = "test_color_vote_report_calc", + srcs = ["test_color_vote_report_calc.py"], + main = "test_color_vote_report_calc.py", + deps = [ + "//databuild:py_proto", + "//databuild/dsl/python:dsl", + "//databuild/test/app:job_src", + "//databuild/test/app/dsl:dsl_src", + ], +) + +# Graph analysis test +py_test( + name = "test_graph_analysis", + srcs = ["test_graph_analysis.py"], + main = "test_graph_analysis.py", + deps = [ + "//databuild:py_proto", + "//databuild/dsl/python:dsl", + "//databuild/test/app:job_src", + "//databuild/test/app/dsl:dsl_src", + ], +) + +# Bazel vs DSL comparison test +py_test( + name = "test_bazel_dsl_comparison", + srcs = ["test_bazel_dsl_comparison.py"], + main = "test_bazel_dsl_comparison.py", + deps = [ + "//databuild:py_proto", + "//databuild/dsl/python:dsl", + "//databuild/test/app:job_src", + "//databuild/test/app/bazel:job_src", + "//databuild/test/app/dsl:dsl_src", + ], +) diff --git a/databuild/test/app/dsl/test/test_aggregate_color_votes.py b/databuild/test/app/dsl/test/test_aggregate_color_votes.py new file mode 100644 index 0000000..9cadf9c --- /dev/null +++ b/databuild/test/app/dsl/test/test_aggregate_color_votes.py @@ -0,0 +1,159 @@ +from databuild.test.app.dsl.graph import AggregateColorVotes +from databuild.test.app.dsl.partitions import ( + DailyVotesPartition, + Votes1WPartition, + Votes1MPartition, + IngestedColorPartition, + TrailingColorVotes1WPartition, + TrailingColorVotes1MPartition +) +from databuild.test.app.colors import COLORS +from databuild.proto import DepType + + +def test_aggregate_color_votes_configure_daily_votes(): + """Test AggregateColorVotes config method with daily votes output.""" + job = AggregateColorVotes() + outputs = [DailyVotesPartition(data_date="2025-01-15")] + + configs = job.config(outputs) + + assert len(configs) == 1 + config = configs[0] + assert len(config.outputs) == 1 + assert config.outputs[0].str == "daily_votes/2025-01-15" + assert config.env["DATA_DATE"] == "2025-01-15" + assert config.env["AGGREGATE_TYPE"] == "daily_votes" + + # Should have inputs for all colors + assert len(config.inputs) == len(COLORS) + expected_inputs = {f"daily_color_votes/2025-01-15/{color}" for color in COLORS} + actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs} + assert actual_inputs == expected_inputs + + # All inputs should be MATERIALIZE type + for input_dep in config.inputs: + assert input_dep.dep_type_code == DepType.MATERIALIZE + assert input_dep.dep_type_name == "materialize" + + +def test_aggregate_color_votes_configure_votes_1w(): + """Test AggregateColorVotes config method with weekly votes output.""" + job = AggregateColorVotes() + outputs = [Votes1WPartition(data_date="2025-01-15")] + + configs = job.config(outputs) + + assert len(configs) == 1 + config = configs[0] + assert len(config.outputs) == 1 + assert config.outputs[0].str == "votes_1w/2025-01-15" + assert config.env["DATA_DATE"] == "2025-01-15" + assert config.env["AGGREGATE_TYPE"] == "votes_1w" + + # Should have inputs for all colors from trailing 1w partitions + assert len(config.inputs) == len(COLORS) + expected_inputs = {f"color_votes_1w/2025-01-15/{color}" for color in COLORS} + actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs} + assert actual_inputs == expected_inputs + + +def test_aggregate_color_votes_configure_votes_1m(): + """Test AggregateColorVotes config method with monthly votes output.""" + job = AggregateColorVotes() + outputs = [Votes1MPartition(data_date="2025-01-15")] + + configs = job.config(outputs) + + assert len(configs) == 1 + config = configs[0] + assert len(config.outputs) == 1 + assert config.outputs[0].str == "votes_1m/2025-01-15" + assert config.env["DATA_DATE"] == "2025-01-15" + assert config.env["AGGREGATE_TYPE"] == "votes_1m" + + # Should have inputs for all colors from trailing 1m partitions + assert len(config.inputs) == len(COLORS) + expected_inputs = {f"color_votes_1m/2025-01-15/{color}" for color in COLORS} + actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs} + assert actual_inputs == expected_inputs + + +def test_aggregate_color_votes_configure_multiple_outputs(): + """Test AggregateColorVotes config method with multiple different output types.""" + job = AggregateColorVotes() + outputs = [ + DailyVotesPartition(data_date="2025-01-15"), + Votes1WPartition(data_date="2025-01-16"), + Votes1MPartition(data_date="2025-01-17") + ] + + configs = job.config(outputs) + + assert len(configs) == 3 # One config per output + + # Find configs by date + daily_config = None + weekly_config = None + monthly_config = None + + for config in configs: + if config.env["DATA_DATE"] == "2025-01-15": + daily_config = config + elif config.env["DATA_DATE"] == "2025-01-16": + weekly_config = config + elif config.env["DATA_DATE"] == "2025-01-17": + monthly_config = config + + assert daily_config is not None + assert weekly_config is not None + assert monthly_config is not None + + # Check daily config + assert daily_config.env["AGGREGATE_TYPE"] == "daily_votes" + assert daily_config.outputs[0].str == "daily_votes/2025-01-15" + assert len(daily_config.inputs) == len(COLORS) + assert all("daily_color_votes/2025-01-15/" in inp.partition_ref.str for inp in daily_config.inputs) + + # Check weekly config + assert weekly_config.env["AGGREGATE_TYPE"] == "votes_1w" + assert weekly_config.outputs[0].str == "votes_1w/2025-01-16" + assert len(weekly_config.inputs) == len(COLORS) + assert all("color_votes_1w/2025-01-16/" in inp.partition_ref.str for inp in weekly_config.inputs) + + # Check monthly config + assert monthly_config.env["AGGREGATE_TYPE"] == "votes_1m" + assert monthly_config.outputs[0].str == "votes_1m/2025-01-17" + assert len(monthly_config.inputs) == len(COLORS) + assert all("color_votes_1m/2025-01-17/" in inp.partition_ref.str for inp in monthly_config.inputs) + + +def test_aggregate_color_votes_configure_multiple_same_type(): + """Test AggregateColorVotes config method with multiple outputs of same type.""" + job = AggregateColorVotes() + outputs = [ + DailyVotesPartition(data_date="2025-01-15"), + DailyVotesPartition(data_date="2025-01-16") + ] + + configs = job.config(outputs) + + assert len(configs) == 2 # One config per output + + for config in configs: + assert config.env["AGGREGATE_TYPE"] == "daily_votes" + assert len(config.inputs) == len(COLORS) + + if config.env["DATA_DATE"] == "2025-01-15": + assert config.outputs[0].str == "daily_votes/2025-01-15" + assert all("daily_color_votes/2025-01-15/" in inp.partition_ref.str for inp in config.inputs) + elif config.env["DATA_DATE"] == "2025-01-16": + assert config.outputs[0].str == "daily_votes/2025-01-16" + assert all("daily_color_votes/2025-01-16/" in inp.partition_ref.str for inp in config.inputs) + else: + assert False, f"Unexpected date: {config.env['DATA_DATE']}" + + +if __name__ == '__main__': + import pytest + raise SystemExit(pytest.main([__file__])) \ No newline at end of file diff --git a/databuild/test/app/dsl/test/test_bazel_dsl_comparison.py b/databuild/test/app/dsl/test/test_bazel_dsl_comparison.py new file mode 100644 index 0000000..f2444d3 --- /dev/null +++ b/databuild/test/app/dsl/test/test_bazel_dsl_comparison.py @@ -0,0 +1,244 @@ +#!/usr/bin/env python3 +""" +Comparison test between Bazel and DSL implementations. + +This test verifies that the DSL job configurations produce identical results +to the equivalent bazel job configurations for the same partition references. +""" + +import unittest +from databuild.proto import PartitionRef, JobConfigureResponse +from databuild.test.app.dsl.graph import ( + IngestColorVotes, + TrailingColorVotes, + AggregateColorVotes, + ColorVoteReportCalc +) +from databuild.test.app.dsl.partitions import ( + IngestedColorPartition, + TrailingColorVotes1WPartition, + TrailingColorVotes1MPartition, + DailyVotesPartition, + Votes1WPartition, + Votes1MPartition, + ColorVoteReportPartition +) + +# Import bazel job config functions +from databuild.test.app.bazel.jobs.ingest_color_votes.config import configure as bazel_ingest_config +from databuild.test.app.bazel.jobs.trailing_color_votes.config import configure as bazel_trailing_config +from databuild.test.app.bazel.jobs.aggregate_color_votes.config import configure as bazel_aggregate_config +from databuild.test.app.bazel.jobs.color_vote_report_calc.config import configure as bazel_report_config + + +class BazelDSLComparisonTest(unittest.TestCase): + """Compare bazel and DSL job configurations to ensure they produce identical results.""" + + def _compare_job_configs(self, bazel_response, dsl_configs): + """Helper to compare JobConfigureResponse from bazel with list[JobConfig] from DSL.""" + self.assertIsInstance(bazel_response, JobConfigureResponse) + self.assertIsInstance(dsl_configs, list) + + bazel_configs = bazel_response.configs + self.assertEqual(len(bazel_configs), len(dsl_configs), + "Bazel and DSL should produce same number of configs") + + # Sort both by a stable key for comparison + def config_sort_key(config): + outputs_str = ",".join(sorted(out.str for out in config.outputs)) + env_str = ",".join(f"{k}={v}" for k, v in sorted(config.env.items())) + return f"{outputs_str}:{env_str}" + + bazel_sorted = sorted(bazel_configs, key=config_sort_key) + dsl_sorted = sorted(dsl_configs, key=config_sort_key) + + for bazel_config, dsl_config in zip(bazel_sorted, dsl_sorted): + # Compare outputs + bazel_outputs = {out.str for out in bazel_config.outputs} + dsl_outputs = {out.str for out in dsl_config.outputs} + self.assertEqual(bazel_outputs, dsl_outputs, "Outputs should match") + + # Compare inputs + bazel_inputs = {(inp.partition_ref.str, inp.dep_type_code, inp.dep_type_name) + for inp in bazel_config.inputs} + dsl_inputs = {(inp.partition_ref.str, inp.dep_type_code, inp.dep_type_name) + for inp in dsl_config.inputs} + self.assertEqual(bazel_inputs, dsl_inputs, "Inputs should match") + + # Compare args + self.assertEqual(set(bazel_config.args), set(dsl_config.args), "Args should match") + + # Compare env + self.assertEqual(bazel_config.env, dsl_config.env, "Environment should match") + + def test_ingest_color_votes_comparison(self): + """Compare IngestColorVotes bazel vs DSL configurations.""" + # Test single output + partition_refs = [PartitionRef(str="daily_color_votes/2025-01-01/red")] + bazel_response = bazel_ingest_config(partition_refs) + + partitions = [IngestedColorPartition.deserialize(ref.str) for ref in partition_refs] + dsl_job = IngestColorVotes() + dsl_configs = dsl_job.config(partitions) + + self._compare_job_configs(bazel_response, dsl_configs) + + # Test multiple outputs + partition_refs = [ + PartitionRef(str="daily_color_votes/2025-01-02/red"), + PartitionRef(str="daily_color_votes/2025-01-02/blue") + ] + bazel_response = bazel_ingest_config(partition_refs) + + partitions = [IngestedColorPartition.deserialize(ref.str) for ref in partition_refs] + dsl_configs = dsl_job.config(partitions) + + self._compare_job_configs(bazel_response, dsl_configs) + + def test_trailing_color_votes_comparison(self): + """Compare TrailingColorVotes bazel vs DSL configurations.""" + # Test weekly output + partition_refs = [PartitionRef(str="color_votes_1w/2025-01-07/red")] + bazel_response = bazel_trailing_config(partition_refs) + + partitions = [TrailingColorVotes1WPartition.deserialize(ref.str) for ref in partition_refs] + dsl_job = TrailingColorVotes() + dsl_configs = dsl_job.config(partitions) + + self._compare_job_configs(bazel_response, dsl_configs) + + # Test monthly output + partition_refs = [PartitionRef(str="color_votes_1m/2025-01-28/blue")] + bazel_response = bazel_trailing_config(partition_refs) + + partitions = [TrailingColorVotes1MPartition.deserialize(ref.str) for ref in partition_refs] + dsl_configs = dsl_job.config(partitions) + + self._compare_job_configs(bazel_response, dsl_configs) + + # Test mixed weekly and monthly for same date/color + partition_refs = [ + PartitionRef(str="color_votes_1w/2025-01-28/green"), + PartitionRef(str="color_votes_1m/2025-01-28/green") + ] + bazel_response = bazel_trailing_config(partition_refs) + + partitions = [ + TrailingColorVotes1WPartition.deserialize(partition_refs[0].str), + TrailingColorVotes1MPartition.deserialize(partition_refs[1].str) + ] + dsl_configs = dsl_job.config(partitions) + + self._compare_job_configs(bazel_response, dsl_configs) + + def test_aggregate_color_votes_comparison(self): + """Compare AggregateColorVotes bazel vs DSL configurations.""" + # Test daily votes + partition_refs = [PartitionRef(str="daily_votes/2025-01-15")] + bazel_response = bazel_aggregate_config(partition_refs) + + partitions = [DailyVotesPartition.deserialize(ref.str) for ref in partition_refs] + dsl_job = AggregateColorVotes() + dsl_configs = dsl_job.config(partitions) + + self._compare_job_configs(bazel_response, dsl_configs) + + # Test weekly votes + partition_refs = [PartitionRef(str="votes_1w/2025-01-15")] + bazel_response = bazel_aggregate_config(partition_refs) + + partitions = [Votes1WPartition.deserialize(ref.str) for ref in partition_refs] + dsl_configs = dsl_job.config(partitions) + + self._compare_job_configs(bazel_response, dsl_configs) + + # Test monthly votes + partition_refs = [PartitionRef(str="votes_1m/2025-01-15")] + bazel_response = bazel_aggregate_config(partition_refs) + + partitions = [Votes1MPartition.deserialize(ref.str) for ref in partition_refs] + dsl_configs = dsl_job.config(partitions) + + self._compare_job_configs(bazel_response, dsl_configs) + + # Test multiple different types + partition_refs = [ + PartitionRef(str="daily_votes/2025-01-15"), + PartitionRef(str="votes_1w/2025-01-16"), + PartitionRef(str="votes_1m/2025-01-17") + ] + bazel_response = bazel_aggregate_config(partition_refs) + + partitions = [ + DailyVotesPartition.deserialize(partition_refs[0].str), + Votes1WPartition.deserialize(partition_refs[1].str), + Votes1MPartition.deserialize(partition_refs[2].str) + ] + dsl_configs = dsl_job.config(partitions) + + self._compare_job_configs(bazel_response, dsl_configs) + + def test_color_vote_report_calc_comparison(self): + """Compare ColorVoteReportCalc bazel vs DSL configurations.""" + # Test single report + partition_refs = [PartitionRef(str="color_vote_report/2025-01-15/red")] + bazel_response = bazel_report_config(partition_refs) + + partitions = [ColorVoteReportPartition.deserialize(ref.str) for ref in partition_refs] + dsl_job = ColorVoteReportCalc() + dsl_configs = dsl_job.config(partitions) + + self._compare_job_configs(bazel_response, dsl_configs) + + # Test multiple reports same date + partition_refs = [ + PartitionRef(str="color_vote_report/2025-01-15/red"), + PartitionRef(str="color_vote_report/2025-01-15/blue") + ] + bazel_response = bazel_report_config(partition_refs) + + partitions = [ColorVoteReportPartition.deserialize(ref.str) for ref in partition_refs] + dsl_configs = dsl_job.config(partitions) + + self._compare_job_configs(bazel_response, dsl_configs) + + # Test multiple reports different dates + partition_refs = [ + PartitionRef(str="color_vote_report/2025-01-15/red"), + PartitionRef(str="color_vote_report/2025-01-16/red") + ] + bazel_response = bazel_report_config(partition_refs) + + partitions = [ColorVoteReportPartition.deserialize(ref.str) for ref in partition_refs] + dsl_configs = dsl_job.config(partitions) + + self._compare_job_configs(bazel_response, dsl_configs) + + def test_partition_serialization_roundtrip(self): + """Test that DSL partition serialization/deserialization works correctly.""" + test_cases = [ + IngestedColorPartition(data_date="2025-01-15", color="red"), + TrailingColorVotes1WPartition(data_date="2025-01-15", color="blue"), + TrailingColorVotes1MPartition(data_date="2025-01-28", color="green"), + DailyVotesPartition(data_date="2025-01-15"), + Votes1WPartition(data_date="2025-01-15"), + Votes1MPartition(data_date="2025-01-15"), + ColorVoteReportPartition(data_date="2025-01-15", color="yellow") + ] + + for partition in test_cases: + with self.subTest(partition=partition): + # Serialize then deserialize + serialized = partition.serialize() + deserialized = type(partition).deserialize(serialized) + + # Should be equal + self.assertEqual(partition, deserialized) + + # Serializing again should give same result + reserialized = deserialized.serialize() + self.assertEqual(serialized, reserialized) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/databuild/test/app/dsl/test/test_color_vote_report_calc.py b/databuild/test/app/dsl/test/test_color_vote_report_calc.py new file mode 100644 index 0000000..90e5284 --- /dev/null +++ b/databuild/test/app/dsl/test/test_color_vote_report_calc.py @@ -0,0 +1,204 @@ +from databuild.test.app.dsl.graph import ColorVoteReportCalc +from databuild.test.app.dsl.partitions import ( + ColorVoteReportPartition, + DailyVotesPartition, + Votes1WPartition, + Votes1MPartition, + IngestedColorPartition, + TrailingColorVotes1WPartition, + TrailingColorVotes1MPartition +) +from databuild.proto import DepType + + +def test_color_vote_report_calc_configure_single_output(): + """Test ColorVoteReportCalc config method with single color report output.""" + job = ColorVoteReportCalc() + outputs = [ColorVoteReportPartition(data_date="2025-01-15", color="red")] + + configs = job.config(outputs) + + assert len(configs) == 1 + config = configs[0] + + # Check outputs + assert len(config.outputs) == 1 + assert config.outputs[0].str == "color_vote_report/2025-01-15/red" + + # Check args - should contain partition strings + assert len(config.args) == 1 + assert config.args[0] == "color_vote_report/2025-01-15/red" + + # Check inputs - should have aggregate inputs for the date and specific color inputs + expected_inputs = { + # Aggregate inputs for the date + "daily_votes/2025-01-15", + "votes_1w/2025-01-15", + "votes_1m/2025-01-15", + # Color-specific inputs + "daily_color_votes/2025-01-15/red", + "color_votes_1w/2025-01-15/red", + "color_votes_1m/2025-01-15/red" + } + + actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs} + assert actual_inputs == expected_inputs + + # All inputs should be MATERIALIZE type + for input_dep in config.inputs: + assert input_dep.dep_type_code == DepType.MATERIALIZE + assert input_dep.dep_type_name == "materialize" + + +def test_color_vote_report_calc_configure_multiple_colors_same_date(): + """Test ColorVoteReportCalc config method with multiple colors for same date.""" + job = ColorVoteReportCalc() + outputs = [ + ColorVoteReportPartition(data_date="2025-01-15", color="red"), + ColorVoteReportPartition(data_date="2025-01-15", color="blue") + ] + + configs = job.config(outputs) + + assert len(configs) == 1 # Single config since all outputs go to same job + config = configs[0] + + # Check outputs + assert len(config.outputs) == 2 + output_strs = {output.str for output in config.outputs} + assert "color_vote_report/2025-01-15/red" in output_strs + assert "color_vote_report/2025-01-15/blue" in output_strs + + # Check args - should contain both partition strings + assert len(config.args) == 2 + assert set(config.args) == {"color_vote_report/2025-01-15/red", "color_vote_report/2025-01-15/blue"} + + # Check inputs - should have aggregate inputs for the date and color-specific inputs for both colors + expected_inputs = { + # Aggregate inputs for the date (only one set since same date) + "daily_votes/2025-01-15", + "votes_1w/2025-01-15", + "votes_1m/2025-01-15", + # Color-specific inputs for red + "daily_color_votes/2025-01-15/red", + "color_votes_1w/2025-01-15/red", + "color_votes_1m/2025-01-15/red", + # Color-specific inputs for blue + "daily_color_votes/2025-01-15/blue", + "color_votes_1w/2025-01-15/blue", + "color_votes_1m/2025-01-15/blue" + } + + actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs} + assert actual_inputs == expected_inputs + + +def test_color_vote_report_calc_configure_multiple_dates(): + """Test ColorVoteReportCalc config method with reports for different dates.""" + job = ColorVoteReportCalc() + outputs = [ + ColorVoteReportPartition(data_date="2025-01-15", color="red"), + ColorVoteReportPartition(data_date="2025-01-16", color="red") + ] + + configs = job.config(outputs) + + assert len(configs) == 1 # Single config since all outputs go to same job + config = configs[0] + + # Check outputs + assert len(config.outputs) == 2 + output_strs = {output.str for output in config.outputs} + assert "color_vote_report/2025-01-15/red" in output_strs + assert "color_vote_report/2025-01-16/red" in output_strs + + # Check args + assert len(config.args) == 2 + assert set(config.args) == {"color_vote_report/2025-01-15/red", "color_vote_report/2025-01-16/red"} + + # Check inputs - should have aggregate inputs for both dates and color-specific inputs + expected_inputs = { + # Aggregate inputs for both dates + "daily_votes/2025-01-15", + "votes_1w/2025-01-15", + "votes_1m/2025-01-15", + "daily_votes/2025-01-16", + "votes_1w/2025-01-16", + "votes_1m/2025-01-16", + # Color-specific inputs for red on both dates + "daily_color_votes/2025-01-15/red", + "color_votes_1w/2025-01-15/red", + "color_votes_1m/2025-01-15/red", + "daily_color_votes/2025-01-16/red", + "color_votes_1w/2025-01-16/red", + "color_votes_1m/2025-01-16/red" + } + + actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs} + assert actual_inputs == expected_inputs + + +def test_color_vote_report_calc_configure_complex_scenario(): + """Test ColorVoteReportCalc config method with complex multi-date, multi-color scenario.""" + job = ColorVoteReportCalc() + outputs = [ + ColorVoteReportPartition(data_date="2025-01-15", color="red"), + ColorVoteReportPartition(data_date="2025-01-15", color="blue"), + ColorVoteReportPartition(data_date="2025-01-16", color="green"), + ColorVoteReportPartition(data_date="2025-01-17", color="red") + ] + + configs = job.config(outputs) + + assert len(configs) == 1 # Single config since all outputs go to same job + config = configs[0] + + # Check outputs + assert len(config.outputs) == 4 + expected_output_strs = { + "color_vote_report/2025-01-15/red", + "color_vote_report/2025-01-15/blue", + "color_vote_report/2025-01-16/green", + "color_vote_report/2025-01-17/red" + } + actual_output_strs = {output.str for output in config.outputs} + assert actual_output_strs == expected_output_strs + + # Check args + assert len(config.args) == 4 + assert set(config.args) == expected_output_strs + + # Check inputs - should have aggregate inputs for all unique dates and color-specific inputs + expected_inputs = { + # Aggregate inputs for all dates + "daily_votes/2025-01-15", + "votes_1w/2025-01-15", + "votes_1m/2025-01-15", + "daily_votes/2025-01-16", + "votes_1w/2025-01-16", + "votes_1m/2025-01-16", + "daily_votes/2025-01-17", + "votes_1w/2025-01-17", + "votes_1m/2025-01-17", + # Color-specific inputs + "daily_color_votes/2025-01-15/red", + "color_votes_1w/2025-01-15/red", + "color_votes_1m/2025-01-15/red", + "daily_color_votes/2025-01-15/blue", + "color_votes_1w/2025-01-15/blue", + "color_votes_1m/2025-01-15/blue", + "daily_color_votes/2025-01-16/green", + "color_votes_1w/2025-01-16/green", + "color_votes_1m/2025-01-16/green", + "daily_color_votes/2025-01-17/red", + "color_votes_1w/2025-01-17/red", + "color_votes_1m/2025-01-17/red" + } + + actual_inputs = {input_dep.partition_ref.str for input_dep in config.inputs} + assert actual_inputs == expected_inputs + + +if __name__ == '__main__': + import pytest + raise SystemExit(pytest.main([__file__])) \ No newline at end of file diff --git a/databuild/test/app/dsl/test/test_graph_analysis.py b/databuild/test/app/dsl/test/test_graph_analysis.py new file mode 100644 index 0000000..8a56a15 --- /dev/null +++ b/databuild/test/app/dsl/test/test_graph_analysis.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +""" +Integration test for the DSL graph analysis. + +This test verifies that when we request color vote reports via the DSL graph, +the analyzer correctly identifies all upstream dependencies and jobs required. + +NOTE: This test assumes the DSL graph will have an analyze() method similar to +the bazel graph analyzer. This functionality is not yet implemented but these +tests will validate it once available. +""" + +import unittest +from databuild.test.app.dsl.graph import graph +from databuild.test.app.dsl.partitions import ColorVoteReportPartition + + +class DSLGraphAnalysisTest(unittest.TestCase): + def setUp(self): + # Ensure we have the graph instance + self.graph = graph + + def test_single_color_report_dependencies(self): + """Test dependencies for a single color vote report via DSL.""" + partition_refs = ["color_vote_report/2024-01-15/red"] + + # TODO: Once DSL graph analysis is implemented, this should call: + # result = self.graph.analyze(partition_refs) + # self.assertIn('nodes', result) + + # For now, we can at least verify the graph structure + self.assertIsNotNone(self.graph) + self.assertGreater(len(self.graph.lookup), 0) + + # Verify we can create the partition and find its producer + partition = ColorVoteReportPartition(data_date="2024-01-15", color="red") + producer_job_class = self.graph.lookup.get(ColorVoteReportPartition) + self.assertIsNotNone(producer_job_class, "ColorVoteReportPartition should have a registered producer") + + # Test that we can call the job's config method + job_instance = producer_job_class() + configs = job_instance.config([partition]) + self.assertIsInstance(configs, list) + self.assertGreater(len(configs), 0) + + def test_multiple_color_reports_same_date(self): + """Test dependencies when requesting multiple colors for the same date via DSL.""" + partition_refs = [ + "color_vote_report/2024-01-15/red", + "color_vote_report/2024-01-15/blue" + ] + + # TODO: Once DSL graph analysis is implemented, this should call: + # result = self.graph.analyze(partition_refs) + # self.assertIn('nodes', result) + + # For now, verify we can handle multiple partitions + partitions = [ + ColorVoteReportPartition(data_date="2024-01-15", color="red"), + ColorVoteReportPartition(data_date="2024-01-15", color="blue") + ] + + producer_job_class = self.graph.lookup.get(ColorVoteReportPartition) + self.assertIsNotNone(producer_job_class) + + job_instance = producer_job_class() + configs = job_instance.config(partitions) + self.assertIsInstance(configs, list) + self.assertGreater(len(configs), 0) + + def test_multiple_dates_dependencies(self): + """Test dependencies when requesting reports for different dates via DSL.""" + partition_refs = [ + "color_vote_report/2024-01-15/red", + "color_vote_report/2024-01-16/red" + ] + + # TODO: Once DSL graph analysis is implemented, this should call: + # result = self.graph.analyze(partition_refs) + # self.assertIn('nodes', result) + + # For now, verify we can handle different dates + partitions = [ + ColorVoteReportPartition(data_date="2024-01-15", color="red"), + ColorVoteReportPartition(data_date="2024-01-16", color="red") + ] + + producer_job_class = self.graph.lookup.get(ColorVoteReportPartition) + self.assertIsNotNone(producer_job_class) + + job_instance = producer_job_class() + configs = job_instance.config(partitions) + self.assertIsInstance(configs, list) + self.assertGreater(len(configs), 0) + + def test_graph_completeness(self): + """Test that the DSL graph has all expected partition types registered.""" + from databuild.test.app.dsl.partitions import ( + IngestedColorPartition, + TrailingColorVotes1WPartition, + TrailingColorVotes1MPartition, + DailyVotesPartition, + Votes1WPartition, + Votes1MPartition, + ColorVoteReportPartition + ) + + expected_partitions = { + IngestedColorPartition, + TrailingColorVotes1WPartition, + TrailingColorVotes1MPartition, + DailyVotesPartition, + Votes1WPartition, + Votes1MPartition, + ColorVoteReportPartition + } + + registered_partitions = set(self.graph.lookup.keys()) + self.assertEqual(registered_partitions, expected_partitions, + "All partition types should be registered in the graph") + + def test_partition_lookup_functionality(self): + """Test that partition lookup works correctly for all partition types.""" + from databuild.test.app.dsl.partitions import ( + IngestedColorPartition, + TrailingColorVotes1WPartition, + TrailingColorVotes1MPartition, + DailyVotesPartition, + Votes1WPartition, + Votes1MPartition, + ColorVoteReportPartition + ) + + # Test each partition type can be looked up and has a valid job + test_cases = [ + (IngestedColorPartition, IngestedColorPartition(data_date="2024-01-15", color="red")), + (TrailingColorVotes1WPartition, TrailingColorVotes1WPartition(data_date="2024-01-15", color="red")), + (TrailingColorVotes1MPartition, TrailingColorVotes1MPartition(data_date="2024-01-15", color="red")), + (DailyVotesPartition, DailyVotesPartition(data_date="2024-01-15")), + (Votes1WPartition, Votes1WPartition(data_date="2024-01-15")), + (Votes1MPartition, Votes1MPartition(data_date="2024-01-15")), + (ColorVoteReportPartition, ColorVoteReportPartition(data_date="2024-01-15", color="red")) + ] + + for partition_type, partition_instance in test_cases: + with self.subTest(partition_type=partition_type.__name__): + job_class = self.graph.lookup.get(partition_type) + self.assertIsNotNone(job_class, f"Job class for {partition_type.__name__} should be registered") + + # Verify we can instantiate the job and call config + job_instance = job_class() + configs = job_instance.config([partition_instance]) + self.assertIsInstance(configs, list, f"Config method for {partition_type.__name__} should return a list") + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/databuild/test/app/dsl/test/test_ingest_color_votes.py b/databuild/test/app/dsl/test/test_ingest_color_votes.py new file mode 100644 index 0000000..4c3d16e --- /dev/null +++ b/databuild/test/app/dsl/test/test_ingest_color_votes.py @@ -0,0 +1,56 @@ +from databuild.test.app.dsl.graph import IngestColorVotes +from databuild.test.app.dsl.partitions import IngestedColorPartition +from databuild.proto import PartitionRef + + +def test_ingest_color_votes_configure_single(): + """Test IngestColorVotes config method with single output.""" + job = IngestColorVotes() + outputs = [IngestedColorPartition(data_date="2025-01-01", color="red")] + + configs = job.config(outputs) + + assert len(configs) == 1 + config = configs[0] + assert len(config.outputs) == 1 + assert config.outputs[0].str == "daily_color_votes/2025-01-01/red" + assert config.env["COLOR"] == "red" + assert config.env["DATA_DATE"] == "2025-01-01" + assert len(config.inputs) == 0 + assert len(config.args) == 0 + + +def test_ingest_color_votes_configure_multiple(): + """Test IngestColorVotes config method with multiple outputs.""" + job = IngestColorVotes() + outputs = [ + IngestedColorPartition(data_date="2025-01-02", color="red"), + IngestedColorPartition(data_date="2025-01-02", color="blue"), + ] + + configs = job.config(outputs) + + assert len(configs) == 2 + + # First config + config1 = configs[0] + assert len(config1.outputs) == 1 + assert config1.outputs[0].str == "daily_color_votes/2025-01-02/red" + assert config1.env["COLOR"] == "red" + assert config1.env["DATA_DATE"] == "2025-01-02" + assert len(config1.inputs) == 0 + assert len(config1.args) == 0 + + # Second config + config2 = configs[1] + assert len(config2.outputs) == 1 + assert config2.outputs[0].str == "daily_color_votes/2025-01-02/blue" + assert config2.env["COLOR"] == "blue" + assert config2.env["DATA_DATE"] == "2025-01-02" + assert len(config2.inputs) == 0 + assert len(config2.args) == 0 + + +if __name__ == '__main__': + import pytest + raise SystemExit(pytest.main([__file__])) \ No newline at end of file diff --git a/databuild/test/app/dsl/test/test_trailing_color_votes.py b/databuild/test/app/dsl/test/test_trailing_color_votes.py new file mode 100644 index 0000000..9b025e7 --- /dev/null +++ b/databuild/test/app/dsl/test/test_trailing_color_votes.py @@ -0,0 +1,135 @@ +from databuild.test.app.dsl.graph import TrailingColorVotes +from databuild.test.app.dsl.partitions import ( + TrailingColorVotes1WPartition, + TrailingColorVotes1MPartition, + IngestedColorPartition +) +from databuild.proto import DepType + + +def test_trailing_color_votes_configure_weekly_only(): + """Test TrailingColorVotes config method with weekly output only.""" + job = TrailingColorVotes() + outputs = [TrailingColorVotes1WPartition(data_date="2025-01-07", color="red")] + + configs = job.config(outputs) + + assert len(configs) == 1 + config = configs[0] + assert len(config.outputs) == 1 + assert config.outputs[0].str == "color_votes_1w/2025-01-07/red" + assert config.env["COLOR"] == "red" + assert config.env["DATA_DATE"] == "2025-01-07" + assert config.env["WEEKLY"] == "true" + assert config.env["MONTHLY"] == "false" + + # Should have 7 days of inputs + assert len(config.inputs) == 7 + expected_dates = ["2025-01-07", "2025-01-06", "2025-01-05", "2025-01-04", + "2025-01-03", "2025-01-02", "2025-01-01"] + for i, input_dep in enumerate(config.inputs): + assert input_dep.dep_type_code == DepType.MATERIALIZE + assert input_dep.dep_type_name == "materialize" + assert input_dep.partition_ref.str == f"daily_color_votes/{expected_dates[i]}/red" + + +def test_trailing_color_votes_configure_monthly_only(): + """Test TrailingColorVotes config method with monthly output only.""" + job = TrailingColorVotes() + outputs = [TrailingColorVotes1MPartition(data_date="2025-01-28", color="blue")] + + configs = job.config(outputs) + + assert len(configs) == 1 + config = configs[0] + assert len(config.outputs) == 1 + assert config.outputs[0].str == "color_votes_1m/2025-01-28/blue" + assert config.env["COLOR"] == "blue" + assert config.env["DATA_DATE"] == "2025-01-28" + assert config.env["WEEKLY"] == "false" + assert config.env["MONTHLY"] == "true" + + # Should have 28 days of inputs + assert len(config.inputs) == 28 + # Check first and last input dates + assert config.inputs[0].partition_ref.str == "daily_color_votes/2025-01-28/blue" + assert config.inputs[27].partition_ref.str == "daily_color_votes/2025-01-01/blue" + + +def test_trailing_color_votes_configure_both_weekly_and_monthly(): + """Test TrailingColorVotes config method with both weekly and monthly outputs for same date/color.""" + job = TrailingColorVotes() + outputs = [ + TrailingColorVotes1WPartition(data_date="2025-01-28", color="green"), + TrailingColorVotes1MPartition(data_date="2025-01-28", color="green") + ] + + configs = job.config(outputs) + + assert len(configs) == 1 # Should group by (data_date, color) + config = configs[0] + assert len(config.outputs) == 2 + + # Check outputs + output_strs = {output.str for output in config.outputs} + assert "color_votes_1w/2025-01-28/green" in output_strs + assert "color_votes_1m/2025-01-28/green" in output_strs + + assert config.env["COLOR"] == "green" + assert config.env["DATA_DATE"] == "2025-01-28" + assert config.env["WEEKLY"] == "true" + assert config.env["MONTHLY"] == "true" + + # Should have 28 days of inputs (max window) + assert len(config.inputs) == 28 + + +def test_trailing_color_votes_configure_multiple_groups(): + """Test TrailingColorVotes config method with outputs that require separate configs.""" + job = TrailingColorVotes() + outputs = [ + TrailingColorVotes1WPartition(data_date="2025-01-07", color="red"), + TrailingColorVotes1WPartition(data_date="2025-01-07", color="blue"), + TrailingColorVotes1MPartition(data_date="2025-01-08", color="red") + ] + + configs = job.config(outputs) + + assert len(configs) == 3 # Three different (data_date, color) combinations + + # Find configs by their characteristics + red_7th_config = None + blue_7th_config = None + red_8th_config = None + + for config in configs: + if config.env["DATA_DATE"] == "2025-01-07" and config.env["COLOR"] == "red": + red_7th_config = config + elif config.env["DATA_DATE"] == "2025-01-07" and config.env["COLOR"] == "blue": + blue_7th_config = config + elif config.env["DATA_DATE"] == "2025-01-08" and config.env["COLOR"] == "red": + red_8th_config = config + + assert red_7th_config is not None + assert blue_7th_config is not None + assert red_8th_config is not None + + # Check red 7th (weekly only) + assert red_7th_config.env["WEEKLY"] == "true" + assert red_7th_config.env["MONTHLY"] == "false" + assert len(red_7th_config.inputs) == 7 + + # Check blue 7th (weekly only) + assert blue_7th_config.env["WEEKLY"] == "true" + assert blue_7th_config.env["MONTHLY"] == "false" + assert len(blue_7th_config.inputs) == 7 + + # Check red 8th (monthly only) + assert red_8th_config.env["WEEKLY"] == "false" + assert red_8th_config.env["MONTHLY"] == "true" + assert len(red_8th_config.inputs) == 28 + + +if __name__ == '__main__': + import pytest + raise SystemExit(pytest.main([__file__])) \ No newline at end of file