From 63f951848678818541245913ace51dbc30782038 Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Wed, 30 Jul 2025 22:26:32 -0700 Subject: [PATCH] Add graph and trailling job --- databuild/test/app/BUILD.bazel | 62 +++++++++++++++++-- .../app/jobs/trailing_color_votes/config.py | 49 +++++++++++++++ .../app/jobs/trailing_color_votes/execute.py | 28 +++++++++ .../app/jobs/trailing_color_votes/main.py | 18 ++++++ .../app/jobs/trailing_color_votes/test.py | 53 ++++++++++++++++ databuild/test/app/lookup.py | 23 +++++++ 6 files changed, 228 insertions(+), 5 deletions(-) create mode 100644 databuild/test/app/jobs/trailing_color_votes/config.py create mode 100644 databuild/test/app/jobs/trailing_color_votes/execute.py create mode 100644 databuild/test/app/jobs/trailing_color_votes/main.py create mode 100644 databuild/test/app/jobs/trailing_color_votes/test.py create mode 100644 databuild/test/app/lookup.py diff --git a/databuild/test/app/BUILD.bazel b/databuild/test/app/BUILD.bazel index 7ad5aa5..ed10c58 100644 --- a/databuild/test/app/BUILD.bazel +++ b/databuild/test/app/BUILD.bazel @@ -9,16 +9,68 @@ py_library( # Tests py_test( - name = "test", - srcs = glob(["**/test.py"]), + name = "test_trailing_color_votes", + srcs = ["jobs/trailing_color_votes/test.py"], + main = "jobs/trailing_color_votes/test.py", + deps = [":job_src"], +) + +py_test( + name = "test_ingest_color_votes", + srcs = ["jobs/ingest_color_votes/test.py"], + main = "jobs/ingest_color_votes/test.py", deps = [":job_src"], ) # Bazel-defined +## Graph +databuild_graph( + name = "bazel_graph", + jobs = [ + ":ingest_color_votes", + ":trailing_color_votes", + # TODO + ], + lookup = ":bazel_graph_lookup", +) -#databuild_job( -# name = "ingest_color_votes", -#) +py_binary( + name = "bazel_graph_lookup", + srcs = ["lookup.py"], + main = "lookup.py", +) + +## Ingest Color Votes +databuild_job( + name = "ingest_color_votes", + binary = ":ingest_color_votes_binary", +) + +py_binary( + name = "ingest_color_votes_binary", + srcs = ["jobs/ingest_color_votes/main.py"], + main = "jobs/ingest_color_votes/main.py", + deps = [":job_src"], +) + +## Trailing Color Votes +databuild_job( + name = "trailing_color_votes", + binary = ":trailing_color_votes_binary", +) + +py_binary( + name = "trailing_color_votes_binary", + srcs = ["jobs/trailing_color_votes/main.py"], + main = "jobs/trailing_color_votes/main.py", + deps = [":job_src"], +) + +## Aggregate Color Votes +# TODO + +## Color Vote Report Calc +# TODO # Python-DSL-defined diff --git a/databuild/test/app/jobs/trailing_color_votes/config.py b/databuild/test/app/jobs/trailing_color_votes/config.py new file mode 100644 index 0000000..6cece17 --- /dev/null +++ b/databuild/test/app/jobs/trailing_color_votes/config.py @@ -0,0 +1,49 @@ +from databuild.proto import PartitionRef, JobConfigureResponse, JobConfig +from datetime import date, timedelta +from collections import defaultdict + +def configure(outputs: list[PartitionRef]) -> JobConfigureResponse: + # Group outputs by date and color + grouped_outputs = defaultdict(list) + + for output in outputs: + parts = output.str.split("/") + if len(parts) == 3 and parts[0] in ["color_votes_1w", "color_votes_1m"]: + prefix, data_date, color = parts + key = (data_date, color) + grouped_outputs[key].append(output) + else: + raise ValueError(f"Invalid output partition format: {output.str}") + + configs = [] + for (data_date, color), output_partitions in grouped_outputs.items(): + # Parse the output date + output_date = date.fromisoformat(data_date) + + # Determine which windows are needed and the maximum window + has_weekly = any(output.str.startswith("color_votes_1w/") for output in output_partitions) + has_monthly = any(output.str.startswith("color_votes_1m/") for output in output_partitions) + max_window = max(7 if has_weekly else 0, 28 if has_monthly else 0) + + # Generate input partition refs for the required trailing window + inputs = [] + for i in range(max_window): + input_date = output_date - timedelta(days=i) + input_ref = PartitionRef(str=f"daily_color_votes/{input_date.isoformat()}/{color}") + inputs.append(input_ref) + + env = { + "DATA_DATE": data_date, + "COLOR": color, + "WEEKLY": "true" if has_weekly else "false", + "MONTHLY": "true" if has_monthly else "false" + } + + configs.append(JobConfig( + outputs=output_partitions, + inputs=inputs, + args=[], + env=env + )) + + return JobConfigureResponse(configs=configs) \ No newline at end of file diff --git a/databuild/test/app/jobs/trailing_color_votes/execute.py b/databuild/test/app/jobs/trailing_color_votes/execute.py new file mode 100644 index 0000000..947e08e --- /dev/null +++ b/databuild/test/app/jobs/trailing_color_votes/execute.py @@ -0,0 +1,28 @@ +from databuild.test.app import dal +from databuild.proto import PartitionRef +from datetime import date, timedelta +import os + +def execute(data_date: str, color: str): + output_date = date.fromisoformat(data_date) + weekly = os.environ.get("WEEKLY", "false").lower() == "true" + monthly = os.environ.get("MONTHLY", "false").lower() == "true" + + def calculate_and_write(window_days: int, output_prefix: str): + # Read trailing data and sum votes + input_refs = [] + for i in range(window_days): + input_date = output_date - timedelta(days=i) + input_refs.append(PartitionRef(str=f"daily_color_votes/{input_date.isoformat()}/{color}")) + + data = dal.read(*input_refs) + total_votes = sum(record["votes"] for record in data) + + output_ref = PartitionRef(str=f"{output_prefix}/{data_date}/{color}") + dal.write(output_ref, [{"color": color, "data_date": data_date, "votes": total_votes}]) + + if weekly: + calculate_and_write(7, "color_votes_1w") + + if monthly: + calculate_and_write(28, "color_votes_1m") \ No newline at end of file diff --git a/databuild/test/app/jobs/trailing_color_votes/main.py b/databuild/test/app/jobs/trailing_color_votes/main.py new file mode 100644 index 0000000..6e0d6c1 --- /dev/null +++ b/databuild/test/app/jobs/trailing_color_votes/main.py @@ -0,0 +1,18 @@ +"""Main entrypoint for the trailing_color_votes job for use with bazel-defined graph.""" + +import sys +import os +from databuild.proto import PartitionRef +from databuild.test.app.jobs.trailing_color_votes.config import configure +from databuild.test.app.jobs.trailing_color_votes.execute import execute + +if __name__ == "__main__": + if sys.argv[1] == "config": + configure([ + PartitionRef(str=raw_ref) + for raw_ref in sys.argv[2:] + ]) + elif sys.argv[1] == "execute": + execute(os.environ["DATA_DATE"], os.environ["COLOR"]) + else: + raise Exception(f"Invalid command `{sys.argv[1]}`") \ No newline at end of file diff --git a/databuild/test/app/jobs/trailing_color_votes/test.py b/databuild/test/app/jobs/trailing_color_votes/test.py new file mode 100644 index 0000000..059ae4f --- /dev/null +++ b/databuild/test/app/jobs/trailing_color_votes/test.py @@ -0,0 +1,53 @@ +import unittest +from databuild.proto import PartitionRef +from databuild.test.app.jobs.trailing_color_votes.config import configure + +class TestTrailingColorVotesConfig(unittest.TestCase): + def test_configure_weekly_only(self): + outputs = [PartitionRef(str="color_votes_1w/2024-01-07/red")] + response = configure(outputs) + + self.assertEqual(len(response.configs), 1) + config = response.configs[0] + self.assertEqual(len(config.outputs), 1) + self.assertEqual(len(config.inputs), 7) # 7 days for weekly + self.assertEqual(config.env["WEEKLY"], "true") + self.assertEqual(config.env["MONTHLY"], "false") + + def test_configure_monthly_only(self): + outputs = [PartitionRef(str="color_votes_1m/2024-01-28/blue")] + response = configure(outputs) + + self.assertEqual(len(response.configs), 1) + config = response.configs[0] + self.assertEqual(len(config.outputs), 1) + self.assertEqual(len(config.inputs), 28) # 28 days for monthly + self.assertEqual(config.env["WEEKLY"], "false") + self.assertEqual(config.env["MONTHLY"], "true") + + def test_configure_both_weekly_and_monthly(self): + outputs = [ + PartitionRef(str="color_votes_1w/2024-01-28/green"), + PartitionRef(str="color_votes_1m/2024-01-28/green") + ] + response = configure(outputs) + + self.assertEqual(len(response.configs), 1) # Single config for same date/color + config = response.configs[0] + self.assertEqual(len(config.outputs), 2) # Both outputs + self.assertEqual(len(config.inputs), 28) # 28 days (max of 7 and 28) + self.assertEqual(config.env["WEEKLY"], "true") + self.assertEqual(config.env["MONTHLY"], "true") + + def test_configure_multiple_colors_dates(self): + outputs = [ + PartitionRef(str="color_votes_1w/2024-01-07/red"), + PartitionRef(str="color_votes_1w/2024-01-07/blue"), + PartitionRef(str="color_votes_1m/2024-01-14/red") + ] + response = configure(outputs) + + self.assertEqual(len(response.configs), 3) # One config per unique date/color combination + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/databuild/test/app/lookup.py b/databuild/test/app/lookup.py new file mode 100644 index 0000000..252ea1b --- /dev/null +++ b/databuild/test/app/lookup.py @@ -0,0 +1,23 @@ +from collections import defaultdict +import sys + +LABEL_BASE = "//databuild/test/app" + + +def lookup(raw_ref: str): + if raw_ref.startswith("daily_color_votes"): + return LABEL_BASE + ":ingest_color_votes" + elif raw_ref.startswith("color_votes_1"): + return LABEL_BASE + ":trailing_color_votes" + elif raw_ref.startswith("daily_votes") or raw_ref.startswith("votes_1w") or raw_ref.startswith("votes_1m"): + return LABEL_BASE + ":aggregate_color_votes" + elif raw_ref.startswith("color_vote_report"): + return LABEL_BASE + ":color_vote_report_calc" + else: + raise ValueError(f"Unable to resolve job for partition: `{raw_ref}`") + + +if __name__ == "__main__": + results = defaultdict(list) + for raw_ref in sys.argv[1:]: + results[lookup(raw_ref)].append(raw_ref)