diff --git a/databuild/dsl/python/BUILD.bazel b/databuild/dsl/python/BUILD.bazel index 6d44396..21c63f6 100644 --- a/databuild/dsl/python/BUILD.bazel +++ b/databuild/dsl/python/BUILD.bazel @@ -3,5 +3,6 @@ py_library( srcs = ["dsl.py"], visibility = ["//visibility:public"], deps = [ + "//databuild:py_proto", ], ) diff --git a/databuild/dsl/python/dsl.py b/databuild/dsl/python/dsl.py index c564dd9..53b1af3 100644 --- a/databuild/dsl/python/dsl.py +++ b/databuild/dsl/python/dsl.py @@ -1,6 +1,7 @@ +from databuild.proto import JobConfig, PartitionRef, DataDep, DepType from typing import Self, Protocol, get_type_hints, get_origin, get_args -from dataclasses import fields, is_dataclass +from dataclasses import fields, is_dataclass, dataclass, field import re @@ -58,21 +59,13 @@ class PartitionPattern: return result -class JobConfig: - """TODO need to generate this from databuild.proto""" - - -class PartitionManifest: - """TODO need to generate this from databuild.proto""" - - class DataBuildJob(Protocol): # The types of partitions that this job produces output_types: list[type[PartitionPattern]] def config(self, outputs: list[PartitionPattern]) -> list[JobConfig]: ... - def exec(self, config: JobConfig) -> PartitionManifest: ... + def exec(self, config: JobConfig) -> None: ... class DataBuildGraph: @@ -89,3 +82,49 @@ class DataBuildGraph: def generate_bazel_module(self): """Generates a complete databuild application, packaging up referenced jobs and this graph via bazel targets""" raise NotImplementedError + + +@dataclass +class JobConfigBuilder: + outputs: list[PartitionRef] = field(default_factory=list) + inputs: list[DataDep] = field(default_factory=list) + args: list[str] = field(default_factory=list) + env: dict[str, str] = field(default_factory=dict) + + def build(self) -> JobConfig: + return JobConfig( + outputs=self.outputs, + inputs=self.inputs, + args=self.args, + env=self.env, + ) + + def add_inputs(self, *partitions: PartitionPattern, dep_type: DepType=DepType.MATERIALIZE) -> Self: + for p in partitions: + dep_type_name = "materialize" if dep_type == DepType.Materialize else "query" + self.inputs.append(DataDep(dep_type_code=dep_type, dep_type_name=dep_type_name, partition_ref=PartitionRef(str=p.serialize()))) + return self + + def add_outputs(self, *partitions: PartitionPattern) -> Self: + for p in partitions: + self.outputs.append(PartitionRef(str=p.serialize())) + return self + + def add_args(self, *args: str) -> Self: + self.args.extend(args) + return self + + def set_args(self, args: list[str]) -> Self: + self.args = args + return self + + def set_env(self, env: dict[str, str]) -> Self: + self.env = env + return self + + def add_env(self, **kwargs) -> Self: + for k, v in kwargs.items(): + assert isinstance(k, str), f"Expected a string key, got `{k}`" + assert isinstance(v, str), f"Expected a string key, got `{v}`" + self.env[k] = v + return self diff --git a/databuild/dsl/python/test/dsl_test.py b/databuild/dsl/python/test/dsl_test.py index ba766eb..90e9f95 100644 --- a/databuild/dsl/python/test/dsl_test.py +++ b/databuild/dsl/python/test/dsl_test.py @@ -1,5 +1,6 @@ -from databuild.dsl.python.dsl import PartitionPattern, DataBuildGraph, DataBuildJob, JobConfig, PartitionManifest +from databuild.dsl.python.dsl import PartitionPattern, DataBuildGraph, DataBuildJob +from databuild.proto import JobConfig, PartitionManifest from dataclasses import dataclass import pytest @@ -45,7 +46,7 @@ def test_basic_graph_definition(): @graph.job class TestJob(DataBuildJob): output_types = [CategoryAnalysisPartition] - def exec(self, config: JobConfig) -> PartitionManifest: ... + def exec(self, config: JobConfig) -> None: ... def config(self, outputs: list[PartitionPattern]) -> list[JobConfig]: ... assert len(graph.lookup) == 1 @@ -58,14 +59,15 @@ def test_graph_collision(): @graph.job class TestJob1(DataBuildJob): output_types = [CategoryAnalysisPartition] - def exec(self, config: JobConfig) -> PartitionManifest: ... + def exec(self, config: JobConfig) -> None: ... def config(self, outputs: list[PartitionPattern]) -> list[JobConfig]: ... with pytest.raises(AssertionError): + # Outputs the same partition, so should raise @graph.job class TestJob2(DataBuildJob): output_types = [CategoryAnalysisPartition] - def exec(self, config: JobConfig) -> PartitionManifest: ... + def exec(self, config: JobConfig) -> None: ... def config(self, outputs: list[PartitionPattern]) -> list[JobConfig]: ... diff --git a/databuild/test/app/BUILD.bazel b/databuild/test/app/BUILD.bazel index 19742e2..b3606c7 100644 --- a/databuild/test/app/BUILD.bazel +++ b/databuild/test/app/BUILD.bazel @@ -1,123 +1,9 @@ -load("//databuild:rules.bzl", "databuild_graph", "databuild_job") - py_library( name = "job_src", srcs = glob(["**/*.py"]), visibility = ["//visibility:public"], - deps = ["//databuild:py_proto"], -) - -# Tests -py_test( - name = "test_trailing_color_votes", - srcs = ["jobs/trailing_color_votes/test.py"], - main = "jobs/trailing_color_votes/test.py", - deps = [":job_src"], -) - -py_test( - name = "test_ingest_color_votes", - srcs = ["jobs/ingest_color_votes/test.py"], - main = "jobs/ingest_color_votes/test.py", - deps = [":job_src"], -) - -py_test( - name = "test_aggregate_color_votes", - srcs = ["jobs/aggregate_color_votes/test.py"], - main = "jobs/aggregate_color_votes/test.py", - deps = [":job_src"], -) - -py_test( - name = "test_color_vote_report_calc", - srcs = ["jobs/color_vote_report_calc/test.py"], - main = "jobs/color_vote_report_calc/test.py", - deps = [":job_src"], -) - -py_test( - name = "test_graph_analysis", - srcs = ["graph/graph_test.py"], - main = "graph/graph_test.py", - data = [ - ":bazel_graph.analyze", - ":bazel_graph_lookup", + deps = [ + "//databuild:py_proto", + "//databuild/dsl/python:dsl", ], - deps = [":job_src"], ) - -# Bazel-defined -## Graph -databuild_graph( - name = "bazel_graph", - jobs = [ - ":ingest_color_votes", - ":trailing_color_votes", - ":aggregate_color_votes", - ":color_vote_report_calc", - ], - lookup = ":bazel_graph_lookup", -) - -py_binary( - name = "bazel_graph_lookup", - srcs = ["graph/lookup.py"], - main = "graph/lookup.py", -) - -## Ingest Color Votes -databuild_job( - name = "ingest_color_votes", - binary = ":ingest_color_votes_binary", -) - -py_binary( - name = "ingest_color_votes_binary", - srcs = ["jobs/ingest_color_votes/main.py"], - main = "jobs/ingest_color_votes/main.py", - deps = [":job_src"], -) - -## Trailing Color Votes -databuild_job( - name = "trailing_color_votes", - binary = ":trailing_color_votes_binary", -) - -py_binary( - name = "trailing_color_votes_binary", - srcs = ["jobs/trailing_color_votes/main.py"], - main = "jobs/trailing_color_votes/main.py", - deps = [":job_src"], -) - -## Aggregate Color Votes -databuild_job( - name = "aggregate_color_votes", - binary = ":aggregate_color_votes_binary", -) - -py_binary( - name = "aggregate_color_votes_binary", - srcs = ["jobs/aggregate_color_votes/main.py"], - main = "jobs/aggregate_color_votes/main.py", - deps = [":job_src"], -) - -## Color Vote Report Calc -databuild_job( - name = "color_vote_report_calc", - binary = ":color_vote_report_calc_binary", -) - -py_binary( - name = "color_vote_report_calc_binary", - srcs = ["jobs/color_vote_report_calc/main.py"], - main = "jobs/color_vote_report_calc/main.py", - deps = [":job_src"], -) - -# Python-DSL-defined - -# TODO diff --git a/databuild/test/app/bazel/BUILD.bazel b/databuild/test/app/bazel/BUILD.bazel new file mode 100644 index 0000000..6a75e5c --- /dev/null +++ b/databuild/test/app/bazel/BUILD.bazel @@ -0,0 +1,149 @@ +load("//databuild:rules.bzl", "databuild_graph", "databuild_job") + +py_library( + name = "job_src", + srcs = glob(["**/*.py"]), + visibility = ["//visibility:public"], + deps = [ + "//databuild:py_proto", + "//databuild/dsl/python:dsl", + ], +) + +# Tests +py_test( + name = "test_trailing_color_votes", + srcs = ["jobs/trailing_color_votes/test.py"], + main = "jobs/trailing_color_votes/test.py", + deps = [ + ":job_src", + "//databuild/test/app:job_src", + ], +) + +py_test( + name = "test_ingest_color_votes", + srcs = ["jobs/ingest_color_votes/test.py"], + main = "jobs/ingest_color_votes/test.py", + deps = [ + ":job_src", + "//databuild/test/app:job_src", + ], +) + +py_test( + name = "test_aggregate_color_votes", + srcs = ["jobs/aggregate_color_votes/test.py"], + main = "jobs/aggregate_color_votes/test.py", + deps = [ + ":job_src", + "//databuild/test/app:job_src", + ], +) + +py_test( + name = "test_color_vote_report_calc", + srcs = ["jobs/color_vote_report_calc/test.py"], + main = "jobs/color_vote_report_calc/test.py", + deps = [ + ":job_src", + "//databuild/test/app:job_src", + ], +) + +py_test( + name = "test_graph_analysis", + srcs = ["graph/graph_test.py"], + data = [ + ":bazel_graph.analyze", + ":bazel_graph_lookup", + ], + main = "graph/graph_test.py", + deps = [ + ":job_src", + "//databuild/test/app:job_src", + ], +) + +# Bazel-defined +## Graph +databuild_graph( + name = "bazel_graph", + jobs = [ + ":ingest_color_votes", + ":trailing_color_votes", + ":aggregate_color_votes", + ":color_vote_report_calc", + ], + lookup = ":bazel_graph_lookup", +) + +py_binary( + name = "bazel_graph_lookup", + srcs = ["graph/lookup.py"], + main = "graph/lookup.py", +) + +## Ingest Color Votes +databuild_job( + name = "ingest_color_votes", + binary = ":ingest_color_votes_binary", +) + +py_binary( + name = "ingest_color_votes_binary", + srcs = ["jobs/ingest_color_votes/main.py"], + main = "jobs/ingest_color_votes/main.py", + deps = [ + ":job_src", + "//databuild/test/app:job_src", + ], +) + +## Trailing Color Votes +databuild_job( + name = "trailing_color_votes", + binary = ":trailing_color_votes_binary", +) + +py_binary( + name = "trailing_color_votes_binary", + srcs = ["jobs/trailing_color_votes/main.py"], + main = "jobs/trailing_color_votes/main.py", + deps = [ + ":job_src", + "//databuild/test/app:job_src", + ], +) + +## Aggregate Color Votes +databuild_job( + name = "aggregate_color_votes", + binary = ":aggregate_color_votes_binary", +) + +py_binary( + name = "aggregate_color_votes_binary", + srcs = ["jobs/aggregate_color_votes/main.py"], + main = "jobs/aggregate_color_votes/main.py", + deps = [ + ":job_src", + "//databuild/test/app:job_src", + ], +) + +## Color Vote Report Calc +databuild_job( + name = "color_vote_report_calc", + binary = ":color_vote_report_calc_binary", +) + +py_binary( + name = "color_vote_report_calc_binary", + srcs = ["jobs/color_vote_report_calc/main.py"], + main = "jobs/color_vote_report_calc/main.py", + deps = [ + ":job_src", + "//databuild/test/app:job_src", + ], +) diff --git a/databuild/test/app/bazel/README.md b/databuild/test/app/bazel/README.md new file mode 100644 index 0000000..90e2433 --- /dev/null +++ b/databuild/test/app/bazel/README.md @@ -0,0 +1,4 @@ + +# Bazel-Based Graph Definition + +The bazel-based graph definition relies on declaring `databuild_job` and `databuild_graph` targets which reference binaries. diff --git a/databuild/test/app/graph/graph_test.py b/databuild/test/app/bazel/graph/graph_test.py similarity index 100% rename from databuild/test/app/graph/graph_test.py rename to databuild/test/app/bazel/graph/graph_test.py diff --git a/databuild/test/app/graph/lookup.py b/databuild/test/app/bazel/graph/lookup.py similarity index 100% rename from databuild/test/app/graph/lookup.py rename to databuild/test/app/bazel/graph/lookup.py diff --git a/databuild/test/app/bazel/graph/test.py b/databuild/test/app/bazel/graph/test.py new file mode 100644 index 0000000..e69de29 diff --git a/databuild/test/app/bazel/jobs/aggregate_color_votes/README.md b/databuild/test/app/bazel/jobs/aggregate_color_votes/README.md new file mode 120000 index 0000000..c5d6fcd --- /dev/null +++ b/databuild/test/app/bazel/jobs/aggregate_color_votes/README.md @@ -0,0 +1 @@ +jobs/aggregate_color_votes/README.md \ No newline at end of file diff --git a/databuild/test/app/jobs/aggregate_color_votes/config.py b/databuild/test/app/bazel/jobs/aggregate_color_votes/config.py similarity index 100% rename from databuild/test/app/jobs/aggregate_color_votes/config.py rename to databuild/test/app/bazel/jobs/aggregate_color_votes/config.py diff --git a/databuild/test/app/jobs/aggregate_color_votes/main.py b/databuild/test/app/bazel/jobs/aggregate_color_votes/main.py similarity index 88% rename from databuild/test/app/jobs/aggregate_color_votes/main.py rename to databuild/test/app/bazel/jobs/aggregate_color_votes/main.py index 8f72908..1053e80 100644 --- a/databuild/test/app/jobs/aggregate_color_votes/main.py +++ b/databuild/test/app/bazel/jobs/aggregate_color_votes/main.py @@ -4,10 +4,9 @@ import sys import os import json from databuild.proto import PartitionRef, to_dict -from databuild.test.app.jobs.aggregate_color_votes.config import configure +from databuild.test.app.bazel.jobs.aggregate_color_votes.config import configure from databuild.test.app.jobs.aggregate_color_votes.execute import execute - if __name__ == "__main__": if sys.argv[1] == "config": response = configure([ diff --git a/databuild/test/app/jobs/aggregate_color_votes/test.py b/databuild/test/app/bazel/jobs/aggregate_color_votes/test.py similarity index 96% rename from databuild/test/app/jobs/aggregate_color_votes/test.py rename to databuild/test/app/bazel/jobs/aggregate_color_votes/test.py index ad67e45..96b332b 100644 --- a/databuild/test/app/jobs/aggregate_color_votes/test.py +++ b/databuild/test/app/bazel/jobs/aggregate_color_votes/test.py @@ -1,6 +1,6 @@ import unittest from databuild.proto import PartitionRef -from databuild.test.app.jobs.aggregate_color_votes.config import configure +from databuild.test.app.bazel.jobs.aggregate_color_votes.config import configure from databuild.test.app.colors import COLORS class TestAggregateColorVotesConfig(unittest.TestCase): diff --git a/databuild/test/app/bazel/jobs/color_vote_report_calc/README.md b/databuild/test/app/bazel/jobs/color_vote_report_calc/README.md new file mode 120000 index 0000000..7128a82 --- /dev/null +++ b/databuild/test/app/bazel/jobs/color_vote_report_calc/README.md @@ -0,0 +1 @@ +jobs/color_vote_report_calc/README.md \ No newline at end of file diff --git a/databuild/test/app/jobs/color_vote_report_calc/config.py b/databuild/test/app/bazel/jobs/color_vote_report_calc/config.py similarity index 100% rename from databuild/test/app/jobs/color_vote_report_calc/config.py rename to databuild/test/app/bazel/jobs/color_vote_report_calc/config.py diff --git a/databuild/test/app/jobs/color_vote_report_calc/main.py b/databuild/test/app/bazel/jobs/color_vote_report_calc/main.py similarity index 83% rename from databuild/test/app/jobs/color_vote_report_calc/main.py rename to databuild/test/app/bazel/jobs/color_vote_report_calc/main.py index 21d7880..60aaa43 100644 --- a/databuild/test/app/jobs/color_vote_report_calc/main.py +++ b/databuild/test/app/bazel/jobs/color_vote_report_calc/main.py @@ -4,9 +4,8 @@ import sys import os import json from databuild.proto import PartitionRef, to_dict -from databuild.test.app.jobs.color_vote_report_calc.config import configure +from databuild.test.app.bazel.jobs.color_vote_report_calc.config import configure from databuild.test.app.jobs.color_vote_report_calc.execute import execute -from betterproto2 import Casing, OutputFormat if __name__ == "__main__": if sys.argv[1] == "config": diff --git a/databuild/test/app/jobs/color_vote_report_calc/test.py b/databuild/test/app/bazel/jobs/color_vote_report_calc/test.py similarity index 96% rename from databuild/test/app/jobs/color_vote_report_calc/test.py rename to databuild/test/app/bazel/jobs/color_vote_report_calc/test.py index d902364..978ed77 100644 --- a/databuild/test/app/jobs/color_vote_report_calc/test.py +++ b/databuild/test/app/bazel/jobs/color_vote_report_calc/test.py @@ -1,6 +1,6 @@ import unittest from databuild.proto import PartitionRef -from databuild.test.app.jobs.color_vote_report_calc.config import configure +from databuild.test.app.bazel.jobs.color_vote_report_calc.config import configure class TestColorVoteReportCalcConfig(unittest.TestCase): def test_configure_single_output(self): diff --git a/databuild/test/app/bazel/jobs/ingest_color_votes/README.md b/databuild/test/app/bazel/jobs/ingest_color_votes/README.md new file mode 120000 index 0000000..58d5aca --- /dev/null +++ b/databuild/test/app/bazel/jobs/ingest_color_votes/README.md @@ -0,0 +1 @@ +jobs/ingest_color_votes/README.md \ No newline at end of file diff --git a/databuild/test/app/jobs/ingest_color_votes/config.py b/databuild/test/app/bazel/jobs/ingest_color_votes/config.py similarity index 100% rename from databuild/test/app/jobs/ingest_color_votes/config.py rename to databuild/test/app/bazel/jobs/ingest_color_votes/config.py diff --git a/databuild/test/app/jobs/ingest_color_votes/main.py b/databuild/test/app/bazel/jobs/ingest_color_votes/main.py similarity index 85% rename from databuild/test/app/jobs/ingest_color_votes/main.py rename to databuild/test/app/bazel/jobs/ingest_color_votes/main.py index 141c87b..888dbec 100644 --- a/databuild/test/app/jobs/ingest_color_votes/main.py +++ b/databuild/test/app/bazel/jobs/ingest_color_votes/main.py @@ -4,9 +4,8 @@ import sys import os import json from databuild.proto import PartitionRef, to_dict -from databuild.test.app.jobs.ingest_color_votes.config import configure +from databuild.test.app.bazel.jobs.ingest_color_votes.config import configure from databuild.test.app.jobs.ingest_color_votes.execute import execute -from betterproto2 import Casing if __name__ == "__main__": if sys.argv[1] == "config": diff --git a/databuild/test/app/bazel/jobs/ingest_color_votes/test.py b/databuild/test/app/bazel/jobs/ingest_color_votes/test.py new file mode 100644 index 0000000..91c364d --- /dev/null +++ b/databuild/test/app/bazel/jobs/ingest_color_votes/test.py @@ -0,0 +1,32 @@ +from databuild.test.app.bazel.jobs.ingest_color_votes.config import configure +from databuild.proto import PartitionRef + + +def test_ingest_color_votes_configure(): + refs_single = [PartitionRef(str="daily_color_votes/2025-01-01/red")] + config_single = configure(refs_single) + assert len(config_single.configs) == 1 + assert config_single.configs[0].outputs[0].str == "daily_color_votes/2025-01-01/red" + assert config_single.configs[0].env["COLOR"] == "red" + assert config_single.configs[0].env["DATA_DATE"] == "2025-01-01" + + refs_multiple = [ + PartitionRef(str="daily_color_votes/2025-01-02/red"), + PartitionRef(str="daily_color_votes/2025-01-02/blue"), + ] + + config_multiple = configure(refs_multiple) + assert len(config_multiple.configs) == 2 + assert len(config_multiple.configs[0].outputs) == 1 + assert config_multiple.configs[0].outputs[0].str == "daily_color_votes/2025-01-02/red" + assert config_multiple.configs[0].env["COLOR"] == "red" + assert config_multiple.configs[0].env["DATA_DATE"] == "2025-01-02" + assert len(config_multiple.configs[1].outputs) == 1 + assert config_multiple.configs[1].outputs[0].str == "daily_color_votes/2025-01-02/blue" + assert config_multiple.configs[1].env["COLOR"] == "blue" + assert config_multiple.configs[1].env["DATA_DATE"] == "2025-01-02" + + +if __name__ == '__main__': + import pytest + raise SystemExit(pytest.main([__file__])) diff --git a/databuild/test/app/bazel/jobs/trailing_color_votes/README.md b/databuild/test/app/bazel/jobs/trailing_color_votes/README.md new file mode 120000 index 0000000..d3e0cf7 --- /dev/null +++ b/databuild/test/app/bazel/jobs/trailing_color_votes/README.md @@ -0,0 +1 @@ +jobs/trailing_color_votes/README.md \ No newline at end of file diff --git a/databuild/test/app/jobs/trailing_color_votes/config.py b/databuild/test/app/bazel/jobs/trailing_color_votes/config.py similarity index 87% rename from databuild/test/app/jobs/trailing_color_votes/config.py rename to databuild/test/app/bazel/jobs/trailing_color_votes/config.py index 9978cd7..12930c4 100644 --- a/databuild/test/app/jobs/trailing_color_votes/config.py +++ b/databuild/test/app/bazel/jobs/trailing_color_votes/config.py @@ -9,9 +9,7 @@ def configure(outputs: list[PartitionRef]) -> JobConfigureResponse: for output in outputs: parts = output.str.split("/") if len(parts) == 3 and parts[0] in ["color_votes_1w", "color_votes_1m"]: - prefix, data_date, color = parts - key = (data_date, color) - grouped_outputs[key].append(output) + grouped_outputs[tuple(parts[1:])].append(output) else: raise ValueError(f"Invalid output partition format: {output.str}") @@ -29,8 +27,7 @@ def configure(outputs: list[PartitionRef]) -> JobConfigureResponse: inputs = [] for i in range(max_window): input_date = output_date - timedelta(days=i) - input_ref = PartitionRef(str=f"daily_color_votes/{input_date.isoformat()}/{color}") - inputs.append(input_ref) + inputs.append(PartitionRef(str=f"daily_color_votes/{input_date.isoformat()}/{color}")) env = { "DATA_DATE": data_date, diff --git a/databuild/test/app/jobs/trailing_color_votes/main.py b/databuild/test/app/bazel/jobs/trailing_color_votes/main.py similarity index 84% rename from databuild/test/app/jobs/trailing_color_votes/main.py rename to databuild/test/app/bazel/jobs/trailing_color_votes/main.py index ccce115..e16051d 100644 --- a/databuild/test/app/jobs/trailing_color_votes/main.py +++ b/databuild/test/app/bazel/jobs/trailing_color_votes/main.py @@ -4,9 +4,8 @@ import sys import os import json from databuild.proto import PartitionRef, to_dict -from databuild.test.app.jobs.trailing_color_votes.config import configure +from databuild.test.app.bazel.jobs.trailing_color_votes.config import configure from databuild.test.app.jobs.trailing_color_votes.execute import execute -from betterproto2 import Casing, OutputFormat if __name__ == "__main__": if sys.argv[1] == "config": diff --git a/databuild/test/app/jobs/trailing_color_votes/test.py b/databuild/test/app/bazel/jobs/trailing_color_votes/test.py similarity index 96% rename from databuild/test/app/jobs/trailing_color_votes/test.py rename to databuild/test/app/bazel/jobs/trailing_color_votes/test.py index 059ae4f..aaebed8 100644 --- a/databuild/test/app/jobs/trailing_color_votes/test.py +++ b/databuild/test/app/bazel/jobs/trailing_color_votes/test.py @@ -1,6 +1,6 @@ import unittest from databuild.proto import PartitionRef -from databuild.test.app.jobs.trailing_color_votes.config import configure +from databuild.test.app.bazel.jobs.trailing_color_votes.config import configure class TestTrailingColorVotesConfig(unittest.TestCase): def test_configure_weekly_only(self): diff --git a/databuild/test/app/dsl/BUILD.bazel b/databuild/test/app/dsl/BUILD.bazel new file mode 100644 index 0000000..e69de29 diff --git a/databuild/test/app/dsl/graph.py b/databuild/test/app/dsl/graph.py new file mode 100644 index 0000000..c464cbf --- /dev/null +++ b/databuild/test/app/dsl/graph.py @@ -0,0 +1,130 @@ +"""Python DSL implementation of test app""" + +from collections import defaultdict +from databuild.dsl.python.dsl import DataBuildGraph, DataBuildJob, JobConfigBuilder +from databuild.proto import JobConfig +from databuild.test.app.colors import COLORS +from databuild.test.app.jobs.ingest_color_votes.execute import execute as ingest_color_votes_exec +from databuild.test.app.jobs.trailing_color_votes.execute import execute as trailing_color_votes_exec +from databuild.test.app.jobs.aggregate_color_votes.execute import execute as aggregate_color_votes_exec +from databuild.test.app.jobs.color_vote_report_calc.execute import execute as color_vote_report_calc_exec +from databuild.test.app.dsl.partitions import ( + IngestedColorPartition, + TrailingColorVotes1MPartition, + TrailingColorVotes1WPartition, + DailyVotesPartition, + Votes1WPartition, + Votes1MPartition, + ColorVoteReportPartition +) +from datetime import date, timedelta + +graph = DataBuildGraph("//databuild/test/app:dsl_graph") + + +@graph.job +class IngestColorVotes(DataBuildJob): + output_types = [IngestedColorPartition] + + def config(self, outputs: list[IngestedColorPartition]) -> list[JobConfig]: + configs = [] + for output in outputs: + env = {"DATA_DATE": output.data_date, "COLOR": output.color} + configs.append(JobConfigBuilder().add_outputs(output).set_env(env).build()) + return configs + + def exec(self, config: JobConfig) -> None: + ingest_color_votes_exec(data_date=config.env["DATA_DATE"], color=config.env["COLOR"]) + + +@graph.job +class TrailingColorVotes(DataBuildJob): + output_types = [TrailingColorVotes1MPartition, TrailingColorVotes1WPartition] + + def config(self, outputs: list[TrailingColorVotes1MPartition | TrailingColorVotes1WPartition]) -> list[JobConfig]: + groups = defaultdict(list) + for output in outputs: + groups[(output.data_date, output.color)].append(output) + + configs = [] + for (data_date, color), outputs in groups.items(): + weekly = "false" + monthly = "false" + max_window = 0 + for output in outputs: + if isinstance(output, TrailingColorVotes1WPartition): + weekly = "true" + max_window = max(max_window, 7) + elif isinstance(output, TrailingColorVotes1MPartition): + monthly = "true" + max_window = max(max_window, 28) + + env = {"DATA_DATE": data_date, "COLOR": color, "WEEKLY": weekly, "MONTHLY": monthly} + config = JobConfigBuilder(env=env, outputs=outputs) + for i in range(max_window): + in_date = (date.fromisoformat(data_date) - timedelta(days=i)).isoformat() + config.add_inputs(IngestedColorPartition(data_date=in_date, color=color)) + + configs.append(config.build()) + return configs + + def exec(self, config: JobConfig) -> None: + trailing_color_votes_exec(data_date=config.env["DATA_DATE"], color=config.env["COLOR"]) + + +@graph.job +class AggregateColorVotes(DataBuildJob): + output_types = [DailyVotesPartition, Votes1WPartition, Votes1MPartition] + + def config(self, outputs: list[DailyVotesPartition | Votes1WPartition | Votes1MPartition]) -> list[JobConfig]: + configs = [] + + for output in outputs: + if isinstance(output, DailyVotesPartition): + InPartition = IngestedColorPartition + agg_type = "daily_votes" + elif isinstance(output, Votes1WPartition): + InPartition = TrailingColorVotes1WPartition + agg_type = "votes_1w" + elif isinstance(output, Votes1MPartition): + InPartition = TrailingColorVotes1MPartition + agg_type = "votes_1m" + else: + raise ValueError(f"Unknown output type: {output.type}") + + inputs = [InPartition(data_date=output.data_date, color=color) for color in COLORS] + env = {"DATA_DATE": output.data_date, "AGGREGATE_TYPE": agg_type} + configs.append(JobConfigBuilder().add_outputs(output).add_inputs(*inputs).set_env(env).build()) + + return configs + + def exec(self, config: JobConfig) -> None: + aggregate_color_votes_exec(data_date=config.env["DATA_DATE"], aggregate_type=config.env["AGGREGATE_TYPE"]) + + +@graph.job +class ColorVoteReportCalc(DataBuildJob): + output_types = [ColorVoteReportPartition] + + def config(self, outputs: list[ColorVoteReportPartition]) -> list[JobConfig]: + config = JobConfigBuilder().add_outputs(*outputs).add_args(*[p.str for p in outputs]) + + for data_date in set(p.data_date for p in outputs): + config.add_inputs( + DailyVotesPartition(data_date=data_date), + Votes1WPartition(data_date=data_date), + Votes1MPartition(data_date=data_date), + ) + + for output in outputs: + config.add_inputs( + IngestedColorPartition(data_date=output.data_date, color=output.color), + TrailingColorVotes1WPartition(data_date=output.data_date, color=output.color), + TrailingColorVotes1MPartition(data_date=output.data_date, color=output.color), + ) + + return [config.build()] + + def exec(self, config: JobConfig) -> None: + color_vote_report_calc_exec(config.args) + diff --git a/databuild/test/app/dsl/partitions.py b/databuild/test/app/dsl/partitions.py new file mode 100644 index 0000000..408c246 --- /dev/null +++ b/databuild/test/app/dsl/partitions.py @@ -0,0 +1,40 @@ +from dataclasses import dataclass +from databuild.dsl.python.dsl import PartitionPattern + +@dataclass +class DatePartitioned: + data_date: str + + +@dataclass +class DateColorPartitioned: + data_date: str + color: str + + +class IngestedColorPartition(DateColorPartitioned, PartitionPattern): + _raw_pattern = r"daily_color_votes/(?P\d{4}-\d{2}-\d{2})/(?P[^/]+)" + + +class TrailingColorVotes1WPartition(DateColorPartitioned, PartitionPattern): + _raw_pattern = r"color_votes_1w/(?P\d{4}-\d{2}-\d{2})/(?P[^/]+)" + + +class TrailingColorVotes1MPartition(DateColorPartitioned, PartitionPattern): + _raw_pattern = r"color_votes_1m/(?P\d{4}-\d{2}-\d{2})/(?P[^/]+)" + + +class DailyVotesPartition(DatePartitioned, PartitionPattern): + _raw_pattern = r"daily_votes/(?P\d{4}-\d{2}-\d{2})" + + +class Votes1WPartition(DatePartitioned, PartitionPattern): + _raw_pattern = r"votes_1w/(?P\d{4}-\d{2}-\d{2})" + + +class Votes1MPartition(DatePartitioned, PartitionPattern): + _raw_pattern = r"votes_1m/(?P\d{4}-\d{2}-\d{2})" + + +class ColorVoteReportPartition(DateColorPartitioned, PartitionPattern): + _raw_pattern = r"color_vote_report/(?P\d{4}-\d{2}-\d{2})/(?P[^/]+)" diff --git a/databuild/test/app/jobs/ingest_color_votes/test.py b/databuild/test/app/jobs/ingest_color_votes/test.py index fc3433f..8c7cc54 100644 --- a/databuild/test/app/jobs/ingest_color_votes/test.py +++ b/databuild/test/app/jobs/ingest_color_votes/test.py @@ -1,34 +1,8 @@ -from databuild.test.app.jobs.ingest_color_votes.config import configure from databuild.test.app.jobs.ingest_color_votes.execute import execute from databuild.test.app import dal from databuild.proto import PartitionRef -def test_ingest_color_votes_configure(): - refs_single = [PartitionRef(str="daily_color_votes/2025-01-01/red")] - config_single = configure(refs_single) - assert len(config_single.configs) == 1 - assert config_single.configs[0].outputs[0].str == "daily_color_votes/2025-01-01/red" - assert config_single.configs[0].env["COLOR"] == "red" - assert config_single.configs[0].env["DATA_DATE"] == "2025-01-01" - - refs_multiple = [ - PartitionRef(str="daily_color_votes/2025-01-02/red"), - PartitionRef(str="daily_color_votes/2025-01-02/blue"), - ] - - config_multiple = configure(refs_multiple) - assert len(config_multiple.configs) == 2 - assert len(config_multiple.configs[0].outputs) == 1 - assert config_multiple.configs[0].outputs[0].str == "daily_color_votes/2025-01-02/red" - assert config_multiple.configs[0].env["COLOR"] == "red" - assert config_multiple.configs[0].env["DATA_DATE"] == "2025-01-02" - assert len(config_multiple.configs[1].outputs) == 1 - assert config_multiple.configs[1].outputs[0].str == "daily_color_votes/2025-01-02/blue" - assert config_multiple.configs[1].env["COLOR"] == "blue" - assert config_multiple.configs[1].env["DATA_DATE"] == "2025-01-02" - - def test_ingest_color_votes(): execute("2025-01-01", "red") results = dal.read(PartitionRef(str="daily_color_votes/2025-01-01/red")) diff --git a/plans/todo.md b/plans/todo.md index 29befb1..34f87a3 100644 --- a/plans/todo.md +++ b/plans/todo.md @@ -1,10 +1,8 @@ +- Implement python dsl +- Achieve fast configuration (betterproto2 imports are sus) - Remove manual reference of enum values, e.g. [here](../databuild/repositories/builds/mod.rs:85) -- Type-safe mithril [claude link](https://claude.ai/share/f33f8605-472a-4db4-9211-5a1e52087316) -- Status indicator for page selection - On build request detail page, show aggregated job results -- Use path based navigation instead of hashbang? -- Add build request notes - How do we encode job labels in the path? (Build event job links are not encoding job labels properly) - Resolve double type system with protobuf and openapi - Plan for external worker dispatch (e.g. k8s pod per build, or launch in container service) @@ -12,3 +10,6 @@ - Should we have meaningful exit codes? E.g. "retry-able error", etc? - Fully joinable build/job IDs - ensure all execution logs / metrics are joinable to build request ID? - Triggers? +- Add build request notes +- Status indicator for page selection +- Use path based navigation instead of hashbang?