diff --git a/MODULE.bazel b/MODULE.bazel index 5a1744e..d455940 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -215,6 +215,24 @@ pip.parse( ) use_repo(pip, "pypi") +# OCI (Docker images) +oci = use_extension("@rules_oci//oci:extensions.bzl", "oci") + +# Declare external images you need to pull +oci.pull( + name = "debian", + image = "docker.io/library/python", + platforms = [ + "linux/arm64/v8", + "linux/amd64", + ], + # Using a pinned version for reproducibility + tag = "3.12-bookworm", +) + +# For each oci.pull call, repeat the "name" here to expose them as dependencies +use_repo(oci, "debian", "debian_linux_amd64", "debian_linux_arm64_v8") + # Ruff # macOS ARM64 (Apple Silicon) http_file( diff --git a/MODULE.bazel.lock b/MODULE.bazel.lock index 733ba4d..635af38 100644 --- a/MODULE.bazel.lock +++ b/MODULE.bazel.lock @@ -567,11 +567,54 @@ "@@rules_oci+//oci:extensions.bzl%oci": { "general": { "bzlTransitiveDigest": "KHcdN2ovRQGX1MKsH0nGoGPFd/84U43tssN2jImCeJU=", - "usagesDigest": "/O1PwnnkqSBmI9Oe08ZYYqjM4IS8JR+/9rjgzVTNDaQ=", + "usagesDigest": "Y6oSW43ZgWvZTMtL3eDjcxyo58BCPzyiFhH+D+xVgwM=", "recordedFileInputs": {}, "recordedDirentsInputs": {}, "envVariables": {}, "generatedRepoSpecs": { + "debian_linux_arm64_v8": { + "repoRuleId": "@@rules_oci+//oci/private:pull.bzl%oci_pull", + "attributes": { + "www_authenticate_challenges": {}, + "scheme": "https", + "registry": "index.docker.io", + "repository": "library/python", + "identifier": "3.12-bookworm", + "platform": "linux/arm64/v8", + "target_name": "debian_linux_arm64_v8", + "bazel_tags": [] + } + }, + "debian_linux_amd64": { + "repoRuleId": "@@rules_oci+//oci/private:pull.bzl%oci_pull", + "attributes": { + "www_authenticate_challenges": {}, + "scheme": "https", + "registry": "index.docker.io", + "repository": "library/python", + "identifier": "3.12-bookworm", + "platform": "linux/amd64", + "target_name": "debian_linux_amd64", + "bazel_tags": [] + } + }, + "debian": { + "repoRuleId": "@@rules_oci+//oci/private:pull.bzl%oci_alias", + "attributes": { + "target_name": "debian", + "www_authenticate_challenges": {}, + "scheme": "https", + "registry": "index.docker.io", + "repository": "library/python", + "identifier": "3.12-bookworm", + "platforms": { + "@@platforms//cpu:arm64": "@debian_linux_arm64_v8", + "@@platforms//cpu:x86_64": "@debian_linux_amd64" + }, + "bzlmod_repository": "debian", + "reproducible": true + } + }, "oci_crane_darwin_amd64": { "repoRuleId": "@@rules_oci+//oci:repositories.bzl%crane_repositories", "attributes": { @@ -687,7 +730,11 @@ } }, "moduleExtensionMetadata": { - "explicitRootModuleDirectDeps": [], + "explicitRootModuleDirectDeps": [ + "debian", + "debian_linux_arm64_v8", + "debian_linux_amd64" + ], "explicitRootModuleDirectDevDeps": [], "useAllRepos": "NO", "reproducible": false diff --git a/databuild/test/app/BUILD.bazel b/databuild/test/app/BUILD.bazel index ed10c58..0bf5d67 100644 --- a/databuild/test/app/BUILD.bazel +++ b/databuild/test/app/BUILD.bazel @@ -22,6 +22,20 @@ py_test( deps = [":job_src"], ) +py_test( + name = "test_aggregate_color_votes", + srcs = ["jobs/aggregate_color_votes/test.py"], + main = "jobs/aggregate_color_votes/test.py", + deps = [":job_src"], +) + +py_test( + name = "test_color_vote_report_calc", + srcs = ["jobs/color_vote_report_calc/test.py"], + main = "jobs/color_vote_report_calc/test.py", + deps = [":job_src"], +) + # Bazel-defined ## Graph databuild_graph( @@ -29,7 +43,8 @@ databuild_graph( jobs = [ ":ingest_color_votes", ":trailing_color_votes", - # TODO + ":aggregate_color_votes", + ":color_vote_report_calc", ], lookup = ":bazel_graph_lookup", ) @@ -67,10 +82,30 @@ py_binary( ) ## Aggregate Color Votes -# TODO +databuild_job( + name = "aggregate_color_votes", + binary = ":aggregate_color_votes_binary", +) + +py_binary( + name = "aggregate_color_votes_binary", + srcs = ["jobs/aggregate_color_votes/main.py"], + main = "jobs/aggregate_color_votes/main.py", + deps = [":job_src"], +) ## Color Vote Report Calc -# TODO +databuild_job( + name = "color_vote_report_calc", + binary = ":color_vote_report_calc_binary", +) + +py_binary( + name = "color_vote_report_calc_binary", + srcs = ["jobs/color_vote_report_calc/main.py"], + main = "jobs/color_vote_report_calc/main.py", + deps = [":job_src"], +) # Python-DSL-defined diff --git a/databuild/test/app/jobs/aggregate_color_votes/config.py b/databuild/test/app/jobs/aggregate_color_votes/config.py new file mode 100644 index 0000000..879b8c0 --- /dev/null +++ b/databuild/test/app/jobs/aggregate_color_votes/config.py @@ -0,0 +1,42 @@ +from databuild.proto import PartitionRef, JobConfigureResponse, JobConfig +from databuild.test.app.colors import COLORS +from datetime import date + +def configure(outputs: list[PartitionRef]) -> JobConfigureResponse: + configs = [] + + for output in outputs: + parts = output.str.split("/") + if len(parts) == 2: + output_type, data_date = parts + date.fromisoformat(data_date) # Validate date format + + # Determine input type based on output type + if output_type == "daily_votes": + input_prefix = "daily_color_votes" + elif output_type == "votes_1w": + input_prefix = "color_votes_1w" + elif output_type == "votes_1m": + input_prefix = "color_votes_1m" + else: + raise ValueError(f"Unknown output type: {output_type}") + + # Create inputs for all colors + inputs = [] + for color in COLORS: + input_ref = PartitionRef(str=f"{input_prefix}/{data_date}/{color}") + inputs.append(input_ref) + + configs.append(JobConfig( + outputs=[output], + inputs=inputs, + args=[], + env={ + "DATA_DATE": data_date, + "AGGREGATE_TYPE": output_type + } + )) + else: + raise ValueError(f"Invalid output partition format: {output.str}") + + return JobConfigureResponse(configs=configs) \ No newline at end of file diff --git a/databuild/test/app/jobs/aggregate_color_votes/execute.py b/databuild/test/app/jobs/aggregate_color_votes/execute.py new file mode 100644 index 0000000..4ef255f --- /dev/null +++ b/databuild/test/app/jobs/aggregate_color_votes/execute.py @@ -0,0 +1,26 @@ +from databuild.test.app import dal +from databuild.proto import PartitionRef +from databuild.test.app.colors import COLORS + +def execute(data_date: str, aggregate_type: str): + # Determine input prefix based on aggregate type + if aggregate_type == "daily_votes": + input_prefix = "daily_color_votes" + elif aggregate_type == "votes_1w": + input_prefix = "color_votes_1w" + elif aggregate_type == "votes_1m": + input_prefix = "color_votes_1m" + else: + raise ValueError(f"Unknown aggregate type: {aggregate_type}") + + # Read data from all colors for this date + input_refs = [] + for color in COLORS: + input_refs.append(PartitionRef(str=f"{input_prefix}/{data_date}/{color}")) + + data = dal.read(*input_refs) + total_votes = sum(record["votes"] for record in data) + + # Write aggregated result + output_ref = PartitionRef(str=f"{aggregate_type}/{data_date}") + dal.write(output_ref, [{"data_date": data_date, "votes": total_votes}]) \ No newline at end of file diff --git a/databuild/test/app/jobs/aggregate_color_votes/main.py b/databuild/test/app/jobs/aggregate_color_votes/main.py new file mode 100644 index 0000000..93f17a6 --- /dev/null +++ b/databuild/test/app/jobs/aggregate_color_votes/main.py @@ -0,0 +1,20 @@ +"""Main entrypoint for the aggregate_color_votes job for use with bazel-defined graph.""" + +import sys +import os +import json +from databuild.proto import PartitionRef +from databuild.test.app.jobs.aggregate_color_votes.config import configure +from databuild.test.app.jobs.aggregate_color_votes.execute import execute + +if __name__ == "__main__": + if sys.argv[1] == "config": + response = configure([ + PartitionRef(str=raw_ref) + for raw_ref in sys.argv[2:] + ]) + print(json.dumps(response.to_dict())) + elif sys.argv[1] == "exec": + execute(os.environ["DATA_DATE"], os.environ["AGGREGATE_TYPE"]) + else: + raise Exception(f"Invalid command `{sys.argv[1]}`") \ No newline at end of file diff --git a/databuild/test/app/jobs/aggregate_color_votes/test.py b/databuild/test/app/jobs/aggregate_color_votes/test.py new file mode 100644 index 0000000..5c6d943 --- /dev/null +++ b/databuild/test/app/jobs/aggregate_color_votes/test.py @@ -0,0 +1,59 @@ +import unittest +from databuild.proto import PartitionRef +from databuild.test.app.jobs.aggregate_color_votes.config import configure +from databuild.test.app.colors import COLORS + +class TestAggregateColorVotesConfig(unittest.TestCase): + def test_configure_daily_votes(self): + outputs = [PartitionRef(str="daily_votes/2024-01-15")] + response = configure(outputs) + + self.assertEqual(len(response.configs), 1) + config = response.configs[0] + self.assertEqual(len(config.outputs), 1) + self.assertEqual(len(config.inputs), len(COLORS)) # One input per color + self.assertEqual(config.env["AGGREGATE_TYPE"], "daily_votes") + self.assertEqual(config.env["DATA_DATE"], "2024-01-15") + + # Check that inputs are from daily_color_votes + for i, color in enumerate(COLORS): + expected_input = f"daily_color_votes/2024-01-15/{color}" + self.assertEqual(config.inputs[i].str, expected_input) + + def test_configure_weekly_votes(self): + outputs = [PartitionRef(str="votes_1w/2024-01-21")] + response = configure(outputs) + + self.assertEqual(len(response.configs), 1) + config = response.configs[0] + self.assertEqual(config.env["AGGREGATE_TYPE"], "votes_1w") + + # Check that inputs are from color_votes_1w + for i, color in enumerate(COLORS): + expected_input = f"color_votes_1w/2024-01-21/{color}" + self.assertEqual(config.inputs[i].str, expected_input) + + def test_configure_monthly_votes(self): + outputs = [PartitionRef(str="votes_1m/2024-01-31")] + response = configure(outputs) + + self.assertEqual(len(response.configs), 1) + config = response.configs[0] + self.assertEqual(config.env["AGGREGATE_TYPE"], "votes_1m") + + # Check that inputs are from color_votes_1m + for i, color in enumerate(COLORS): + expected_input = f"color_votes_1m/2024-01-31/{color}" + self.assertEqual(config.inputs[i].str, expected_input) + + def test_configure_multiple_outputs(self): + outputs = [ + PartitionRef(str="daily_votes/2024-01-15"), + PartitionRef(str="votes_1w/2024-01-21") + ] + response = configure(outputs) + + self.assertEqual(len(response.configs), 2) # One config per output + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/databuild/test/app/jobs/color_vote_report_calc/config.py b/databuild/test/app/jobs/color_vote_report_calc/config.py new file mode 100644 index 0000000..5a05677 --- /dev/null +++ b/databuild/test/app/jobs/color_vote_report_calc/config.py @@ -0,0 +1,48 @@ +from databuild.proto import PartitionRef, JobConfigureResponse, JobConfig +from datetime import date +from collections import defaultdict + +def configure(outputs: list[PartitionRef]) -> JobConfigureResponse: + # This job produces a single job config that handles all requested outputs + all_dates = set() + all_colors = set() + + for output in outputs: + parts = output.str.split("/") + if len(parts) == 3 and parts[0] == "color_vote_report": + prefix, data_date, color = parts + date.fromisoformat(data_date) # Validate date format + all_dates.add(data_date) + all_colors.add(color) + else: + raise ValueError(f"Invalid output partition format: {output.str}") + + # Build inputs for all dates and colors that are actually requested + inputs = [] + + # Add total vote aggregates for all dates + for data_date in all_dates: + inputs.extend([ + PartitionRef(str=f"daily_votes/{data_date}"), + PartitionRef(str=f"votes_1w/{data_date}"), + PartitionRef(str=f"votes_1m/{data_date}") + ]) + + # Add color-specific inputs for all date/color combinations that are requested + for output in outputs: + data_date, color = output.str.split("/")[1], output.str.split("/")[2] + inputs.extend([ + PartitionRef(str=f"daily_color_votes/{data_date}/{color}"), + PartitionRef(str=f"color_votes_1w/{data_date}/{color}"), + PartitionRef(str=f"color_votes_1m/{data_date}/{color}") + ]) + + # Single job config for all outputs - pass output partition refs as args + config = JobConfig( + outputs=outputs, + inputs=inputs, + args=[output.str for output in outputs], + env={} + ) + + return JobConfigureResponse(configs=[config]) \ No newline at end of file diff --git a/databuild/test/app/jobs/color_vote_report_calc/execute.py b/databuild/test/app/jobs/color_vote_report_calc/execute.py new file mode 100644 index 0000000..e2b2ba7 --- /dev/null +++ b/databuild/test/app/jobs/color_vote_report_calc/execute.py @@ -0,0 +1,51 @@ +from databuild.test.app import dal +from databuild.proto import PartitionRef + +def execute(output_partition_strs: list[str]): + # Parse requested outputs + outputs = [PartitionRef(str=ref_str) for ref_str in output_partition_strs] + + for output in outputs: + parts = output.str.split("/") + data_date, color = parts[1], parts[2] + + # Read total votes for this date - fail if missing + daily_total = dal.read(PartitionRef(str=f"daily_votes/{data_date}"), empty_ok=False) + weekly_total = dal.read(PartitionRef(str=f"votes_1w/{data_date}"), empty_ok=False) + monthly_total = dal.read(PartitionRef(str=f"votes_1m/{data_date}"), empty_ok=False) + + # Read color-specific votes for this date/color - fail if missing + daily_color = dal.read(PartitionRef(str=f"daily_color_votes/{data_date}/{color}"), empty_ok=False) + weekly_color = dal.read(PartitionRef(str=f"color_votes_1w/{data_date}/{color}"), empty_ok=False) + monthly_color = dal.read(PartitionRef(str=f"color_votes_1m/{data_date}/{color}"), empty_ok=False) + + # Extract vote counts + daily_total_votes = daily_total[0]["votes"] + weekly_total_votes = weekly_total[0]["votes"] + monthly_total_votes = monthly_total[0]["votes"] + + daily_color_votes = daily_color[0]["votes"] + weekly_color_votes = weekly_color[0]["votes"] + monthly_color_votes = monthly_color[0]["votes"] + + # Calculate percentages + daily_percent = (daily_color_votes / daily_total_votes * 100) if daily_total_votes > 0 else 0 + weekly_percent = (weekly_color_votes / weekly_total_votes * 100) if weekly_total_votes > 0 else 0 + monthly_percent = (monthly_color_votes / monthly_total_votes * 100) if monthly_total_votes > 0 else 0 + + # Write report + report_data = [{ + "color": color, + "data_date": data_date, + "daily_total_votes": daily_total_votes, + "weekly_total_votes": weekly_total_votes, + "monthly_total_votes": monthly_total_votes, + "daily_color_votes": daily_color_votes, + "weekly_color_votes": weekly_color_votes, + "monthly_color_votes": monthly_color_votes, + "daily_percent": daily_percent, + "weekly_percent": weekly_percent, + "monthly_percent": monthly_percent + }] + + dal.write(output, report_data) \ No newline at end of file diff --git a/databuild/test/app/jobs/color_vote_report_calc/main.py b/databuild/test/app/jobs/color_vote_report_calc/main.py new file mode 100644 index 0000000..a6a4e81 --- /dev/null +++ b/databuild/test/app/jobs/color_vote_report_calc/main.py @@ -0,0 +1,20 @@ +"""Main entrypoint for the color_vote_report_calc job for use with bazel-defined graph.""" + +import sys +import os +import json +from databuild.proto import PartitionRef +from databuild.test.app.jobs.color_vote_report_calc.config import configure +from databuild.test.app.jobs.color_vote_report_calc.execute import execute + +if __name__ == "__main__": + if sys.argv[1] == "config": + response = configure([ + PartitionRef(str=raw_ref) + for raw_ref in sys.argv[2:] + ]) + print(json.dumps(response.to_dict())) + elif sys.argv[1] == "exec": + execute(sys.argv[2:]) + else: + raise Exception(f"Invalid command `{sys.argv[1]}`") \ No newline at end of file diff --git a/databuild/test/app/jobs/color_vote_report_calc/test.py b/databuild/test/app/jobs/color_vote_report_calc/test.py new file mode 100644 index 0000000..1a056fe --- /dev/null +++ b/databuild/test/app/jobs/color_vote_report_calc/test.py @@ -0,0 +1,60 @@ +import unittest +from databuild.proto import PartitionRef +from databuild.test.app.jobs.color_vote_report_calc.config import configure + +class TestColorVoteReportCalcConfig(unittest.TestCase): + def test_configure_single_output(self): + outputs = [PartitionRef(str="color_vote_report/2024-01-15/red")] + response = configure(outputs) + + self.assertEqual(len(response.configs), 1) # Always single config + config = response.configs[0] + self.assertEqual(len(config.outputs), 1) + self.assertEqual(config.args, ["color_vote_report/2024-01-15/red"]) + + # Should have inputs for total votes and color-specific votes + expected_inputs = [ + "daily_votes/2024-01-15", + "votes_1w/2024-01-15", + "votes_1m/2024-01-15", + "daily_color_votes/2024-01-15/red", + "color_votes_1w/2024-01-15/red", + "color_votes_1m/2024-01-15/red" + ] + actual_inputs = [inp.str for inp in config.inputs] + for expected in expected_inputs: + self.assertIn(expected, actual_inputs) + + def test_configure_multiple_outputs_same_date(self): + outputs = [ + PartitionRef(str="color_vote_report/2024-01-15/red"), + PartitionRef(str="color_vote_report/2024-01-15/blue") + ] + response = configure(outputs) + + self.assertEqual(len(response.configs), 1) # Single config for all outputs + config = response.configs[0] + self.assertEqual(len(config.outputs), 2) + self.assertEqual(set(config.args), { + "color_vote_report/2024-01-15/red", + "color_vote_report/2024-01-15/blue" + }) + + def test_configure_multiple_dates(self): + outputs = [ + PartitionRef(str="color_vote_report/2024-01-15/red"), + PartitionRef(str="color_vote_report/2024-01-16/red") + ] + response = configure(outputs) + + self.assertEqual(len(response.configs), 1) # Single config for all outputs + config = response.configs[0] + self.assertEqual(len(config.outputs), 2) + + # Should have total vote inputs for both dates + actual_inputs = [inp.str for inp in config.inputs] + self.assertIn("daily_votes/2024-01-15", actual_inputs) + self.assertIn("daily_votes/2024-01-16", actual_inputs) + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/databuild/test/app/jobs/ingest_color_votes/main.py b/databuild/test/app/jobs/ingest_color_votes/main.py index 71470a5..fa81b47 100644 --- a/databuild/test/app/jobs/ingest_color_votes/main.py +++ b/databuild/test/app/jobs/ingest_color_votes/main.py @@ -2,17 +2,19 @@ import sys import os +import json from databuild.proto import PartitionRef from databuild.test.app.jobs.ingest_color_votes.config import configure from databuild.test.app.jobs.ingest_color_votes.execute import execute if __name__ == "__main__": if sys.argv[1] == "config": - configure([ + response = configure([ PartitionRef(str=raw_ref) for raw_ref in sys.argv[2:] ]) - elif sys.argv[1] == "execute": + print(json.dumps(response.to_dict())) + elif sys.argv[1] == "exec": execute(os.environ["DATA_DATE"], os.environ["COLOR"]) else: raise Exception(f"Invalid command `{sys.argv[1]}`") diff --git a/databuild/test/app/jobs/trailing_color_votes/main.py b/databuild/test/app/jobs/trailing_color_votes/main.py index 6e0d6c1..50d0f10 100644 --- a/databuild/test/app/jobs/trailing_color_votes/main.py +++ b/databuild/test/app/jobs/trailing_color_votes/main.py @@ -2,17 +2,19 @@ import sys import os +import json from databuild.proto import PartitionRef from databuild.test.app.jobs.trailing_color_votes.config import configure from databuild.test.app.jobs.trailing_color_votes.execute import execute if __name__ == "__main__": if sys.argv[1] == "config": - configure([ + response = configure([ PartitionRef(str=raw_ref) for raw_ref in sys.argv[2:] ]) - elif sys.argv[1] == "execute": + print(json.dumps(response.to_dict())) + elif sys.argv[1] == "exec": execute(os.environ["DATA_DATE"], os.environ["COLOR"]) else: raise Exception(f"Invalid command `{sys.argv[1]}`") \ No newline at end of file