Implement remaining test app jobs

This commit is contained in:
Stuart Axelbrooke 2025-07-30 22:53:52 -07:00
parent 63f9518486
commit 6d55d54267
13 changed files with 439 additions and 9 deletions

View file

@ -215,6 +215,24 @@ pip.parse(
)
use_repo(pip, "pypi")
# OCI (Docker images)
oci = use_extension("@rules_oci//oci:extensions.bzl", "oci")
# Declare external images you need to pull
oci.pull(
name = "debian",
image = "docker.io/library/python",
platforms = [
"linux/arm64/v8",
"linux/amd64",
],
# Using a pinned version for reproducibility
tag = "3.12-bookworm",
)
# For each oci.pull call, repeat the "name" here to expose them as dependencies
use_repo(oci, "debian", "debian_linux_amd64", "debian_linux_arm64_v8")
# Ruff
# macOS ARM64 (Apple Silicon)
http_file(

View file

@ -567,11 +567,54 @@
"@@rules_oci+//oci:extensions.bzl%oci": {
"general": {
"bzlTransitiveDigest": "KHcdN2ovRQGX1MKsH0nGoGPFd/84U43tssN2jImCeJU=",
"usagesDigest": "/O1PwnnkqSBmI9Oe08ZYYqjM4IS8JR+/9rjgzVTNDaQ=",
"usagesDigest": "Y6oSW43ZgWvZTMtL3eDjcxyo58BCPzyiFhH+D+xVgwM=",
"recordedFileInputs": {},
"recordedDirentsInputs": {},
"envVariables": {},
"generatedRepoSpecs": {
"debian_linux_arm64_v8": {
"repoRuleId": "@@rules_oci+//oci/private:pull.bzl%oci_pull",
"attributes": {
"www_authenticate_challenges": {},
"scheme": "https",
"registry": "index.docker.io",
"repository": "library/python",
"identifier": "3.12-bookworm",
"platform": "linux/arm64/v8",
"target_name": "debian_linux_arm64_v8",
"bazel_tags": []
}
},
"debian_linux_amd64": {
"repoRuleId": "@@rules_oci+//oci/private:pull.bzl%oci_pull",
"attributes": {
"www_authenticate_challenges": {},
"scheme": "https",
"registry": "index.docker.io",
"repository": "library/python",
"identifier": "3.12-bookworm",
"platform": "linux/amd64",
"target_name": "debian_linux_amd64",
"bazel_tags": []
}
},
"debian": {
"repoRuleId": "@@rules_oci+//oci/private:pull.bzl%oci_alias",
"attributes": {
"target_name": "debian",
"www_authenticate_challenges": {},
"scheme": "https",
"registry": "index.docker.io",
"repository": "library/python",
"identifier": "3.12-bookworm",
"platforms": {
"@@platforms//cpu:arm64": "@debian_linux_arm64_v8",
"@@platforms//cpu:x86_64": "@debian_linux_amd64"
},
"bzlmod_repository": "debian",
"reproducible": true
}
},
"oci_crane_darwin_amd64": {
"repoRuleId": "@@rules_oci+//oci:repositories.bzl%crane_repositories",
"attributes": {
@ -687,7 +730,11 @@
}
},
"moduleExtensionMetadata": {
"explicitRootModuleDirectDeps": [],
"explicitRootModuleDirectDeps": [
"debian",
"debian_linux_arm64_v8",
"debian_linux_amd64"
],
"explicitRootModuleDirectDevDeps": [],
"useAllRepos": "NO",
"reproducible": false

View file

@ -22,6 +22,20 @@ py_test(
deps = [":job_src"],
)
py_test(
name = "test_aggregate_color_votes",
srcs = ["jobs/aggregate_color_votes/test.py"],
main = "jobs/aggregate_color_votes/test.py",
deps = [":job_src"],
)
py_test(
name = "test_color_vote_report_calc",
srcs = ["jobs/color_vote_report_calc/test.py"],
main = "jobs/color_vote_report_calc/test.py",
deps = [":job_src"],
)
# Bazel-defined
## Graph
databuild_graph(
@ -29,7 +43,8 @@ databuild_graph(
jobs = [
":ingest_color_votes",
":trailing_color_votes",
# TODO
":aggregate_color_votes",
":color_vote_report_calc",
],
lookup = ":bazel_graph_lookup",
)
@ -67,10 +82,30 @@ py_binary(
)
## Aggregate Color Votes
# TODO
databuild_job(
name = "aggregate_color_votes",
binary = ":aggregate_color_votes_binary",
)
py_binary(
name = "aggregate_color_votes_binary",
srcs = ["jobs/aggregate_color_votes/main.py"],
main = "jobs/aggregate_color_votes/main.py",
deps = [":job_src"],
)
## Color Vote Report Calc
# TODO
databuild_job(
name = "color_vote_report_calc",
binary = ":color_vote_report_calc_binary",
)
py_binary(
name = "color_vote_report_calc_binary",
srcs = ["jobs/color_vote_report_calc/main.py"],
main = "jobs/color_vote_report_calc/main.py",
deps = [":job_src"],
)
# Python-DSL-defined

View file

@ -0,0 +1,42 @@
from databuild.proto import PartitionRef, JobConfigureResponse, JobConfig
from databuild.test.app.colors import COLORS
from datetime import date
def configure(outputs: list[PartitionRef]) -> JobConfigureResponse:
configs = []
for output in outputs:
parts = output.str.split("/")
if len(parts) == 2:
output_type, data_date = parts
date.fromisoformat(data_date) # Validate date format
# Determine input type based on output type
if output_type == "daily_votes":
input_prefix = "daily_color_votes"
elif output_type == "votes_1w":
input_prefix = "color_votes_1w"
elif output_type == "votes_1m":
input_prefix = "color_votes_1m"
else:
raise ValueError(f"Unknown output type: {output_type}")
# Create inputs for all colors
inputs = []
for color in COLORS:
input_ref = PartitionRef(str=f"{input_prefix}/{data_date}/{color}")
inputs.append(input_ref)
configs.append(JobConfig(
outputs=[output],
inputs=inputs,
args=[],
env={
"DATA_DATE": data_date,
"AGGREGATE_TYPE": output_type
}
))
else:
raise ValueError(f"Invalid output partition format: {output.str}")
return JobConfigureResponse(configs=configs)

View file

@ -0,0 +1,26 @@
from databuild.test.app import dal
from databuild.proto import PartitionRef
from databuild.test.app.colors import COLORS
def execute(data_date: str, aggregate_type: str):
# Determine input prefix based on aggregate type
if aggregate_type == "daily_votes":
input_prefix = "daily_color_votes"
elif aggregate_type == "votes_1w":
input_prefix = "color_votes_1w"
elif aggregate_type == "votes_1m":
input_prefix = "color_votes_1m"
else:
raise ValueError(f"Unknown aggregate type: {aggregate_type}")
# Read data from all colors for this date
input_refs = []
for color in COLORS:
input_refs.append(PartitionRef(str=f"{input_prefix}/{data_date}/{color}"))
data = dal.read(*input_refs)
total_votes = sum(record["votes"] for record in data)
# Write aggregated result
output_ref = PartitionRef(str=f"{aggregate_type}/{data_date}")
dal.write(output_ref, [{"data_date": data_date, "votes": total_votes}])

View file

@ -0,0 +1,20 @@
"""Main entrypoint for the aggregate_color_votes job for use with bazel-defined graph."""
import sys
import os
import json
from databuild.proto import PartitionRef
from databuild.test.app.jobs.aggregate_color_votes.config import configure
from databuild.test.app.jobs.aggregate_color_votes.execute import execute
if __name__ == "__main__":
if sys.argv[1] == "config":
response = configure([
PartitionRef(str=raw_ref)
for raw_ref in sys.argv[2:]
])
print(json.dumps(response.to_dict()))
elif sys.argv[1] == "exec":
execute(os.environ["DATA_DATE"], os.environ["AGGREGATE_TYPE"])
else:
raise Exception(f"Invalid command `{sys.argv[1]}`")

View file

@ -0,0 +1,59 @@
import unittest
from databuild.proto import PartitionRef
from databuild.test.app.jobs.aggregate_color_votes.config import configure
from databuild.test.app.colors import COLORS
class TestAggregateColorVotesConfig(unittest.TestCase):
def test_configure_daily_votes(self):
outputs = [PartitionRef(str="daily_votes/2024-01-15")]
response = configure(outputs)
self.assertEqual(len(response.configs), 1)
config = response.configs[0]
self.assertEqual(len(config.outputs), 1)
self.assertEqual(len(config.inputs), len(COLORS)) # One input per color
self.assertEqual(config.env["AGGREGATE_TYPE"], "daily_votes")
self.assertEqual(config.env["DATA_DATE"], "2024-01-15")
# Check that inputs are from daily_color_votes
for i, color in enumerate(COLORS):
expected_input = f"daily_color_votes/2024-01-15/{color}"
self.assertEqual(config.inputs[i].str, expected_input)
def test_configure_weekly_votes(self):
outputs = [PartitionRef(str="votes_1w/2024-01-21")]
response = configure(outputs)
self.assertEqual(len(response.configs), 1)
config = response.configs[0]
self.assertEqual(config.env["AGGREGATE_TYPE"], "votes_1w")
# Check that inputs are from color_votes_1w
for i, color in enumerate(COLORS):
expected_input = f"color_votes_1w/2024-01-21/{color}"
self.assertEqual(config.inputs[i].str, expected_input)
def test_configure_monthly_votes(self):
outputs = [PartitionRef(str="votes_1m/2024-01-31")]
response = configure(outputs)
self.assertEqual(len(response.configs), 1)
config = response.configs[0]
self.assertEqual(config.env["AGGREGATE_TYPE"], "votes_1m")
# Check that inputs are from color_votes_1m
for i, color in enumerate(COLORS):
expected_input = f"color_votes_1m/2024-01-31/{color}"
self.assertEqual(config.inputs[i].str, expected_input)
def test_configure_multiple_outputs(self):
outputs = [
PartitionRef(str="daily_votes/2024-01-15"),
PartitionRef(str="votes_1w/2024-01-21")
]
response = configure(outputs)
self.assertEqual(len(response.configs), 2) # One config per output
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,48 @@
from databuild.proto import PartitionRef, JobConfigureResponse, JobConfig
from datetime import date
from collections import defaultdict
def configure(outputs: list[PartitionRef]) -> JobConfigureResponse:
# This job produces a single job config that handles all requested outputs
all_dates = set()
all_colors = set()
for output in outputs:
parts = output.str.split("/")
if len(parts) == 3 and parts[0] == "color_vote_report":
prefix, data_date, color = parts
date.fromisoformat(data_date) # Validate date format
all_dates.add(data_date)
all_colors.add(color)
else:
raise ValueError(f"Invalid output partition format: {output.str}")
# Build inputs for all dates and colors that are actually requested
inputs = []
# Add total vote aggregates for all dates
for data_date in all_dates:
inputs.extend([
PartitionRef(str=f"daily_votes/{data_date}"),
PartitionRef(str=f"votes_1w/{data_date}"),
PartitionRef(str=f"votes_1m/{data_date}")
])
# Add color-specific inputs for all date/color combinations that are requested
for output in outputs:
data_date, color = output.str.split("/")[1], output.str.split("/")[2]
inputs.extend([
PartitionRef(str=f"daily_color_votes/{data_date}/{color}"),
PartitionRef(str=f"color_votes_1w/{data_date}/{color}"),
PartitionRef(str=f"color_votes_1m/{data_date}/{color}")
])
# Single job config for all outputs - pass output partition refs as args
config = JobConfig(
outputs=outputs,
inputs=inputs,
args=[output.str for output in outputs],
env={}
)
return JobConfigureResponse(configs=[config])

View file

@ -0,0 +1,51 @@
from databuild.test.app import dal
from databuild.proto import PartitionRef
def execute(output_partition_strs: list[str]):
# Parse requested outputs
outputs = [PartitionRef(str=ref_str) for ref_str in output_partition_strs]
for output in outputs:
parts = output.str.split("/")
data_date, color = parts[1], parts[2]
# Read total votes for this date - fail if missing
daily_total = dal.read(PartitionRef(str=f"daily_votes/{data_date}"), empty_ok=False)
weekly_total = dal.read(PartitionRef(str=f"votes_1w/{data_date}"), empty_ok=False)
monthly_total = dal.read(PartitionRef(str=f"votes_1m/{data_date}"), empty_ok=False)
# Read color-specific votes for this date/color - fail if missing
daily_color = dal.read(PartitionRef(str=f"daily_color_votes/{data_date}/{color}"), empty_ok=False)
weekly_color = dal.read(PartitionRef(str=f"color_votes_1w/{data_date}/{color}"), empty_ok=False)
monthly_color = dal.read(PartitionRef(str=f"color_votes_1m/{data_date}/{color}"), empty_ok=False)
# Extract vote counts
daily_total_votes = daily_total[0]["votes"]
weekly_total_votes = weekly_total[0]["votes"]
monthly_total_votes = monthly_total[0]["votes"]
daily_color_votes = daily_color[0]["votes"]
weekly_color_votes = weekly_color[0]["votes"]
monthly_color_votes = monthly_color[0]["votes"]
# Calculate percentages
daily_percent = (daily_color_votes / daily_total_votes * 100) if daily_total_votes > 0 else 0
weekly_percent = (weekly_color_votes / weekly_total_votes * 100) if weekly_total_votes > 0 else 0
monthly_percent = (monthly_color_votes / monthly_total_votes * 100) if monthly_total_votes > 0 else 0
# Write report
report_data = [{
"color": color,
"data_date": data_date,
"daily_total_votes": daily_total_votes,
"weekly_total_votes": weekly_total_votes,
"monthly_total_votes": monthly_total_votes,
"daily_color_votes": daily_color_votes,
"weekly_color_votes": weekly_color_votes,
"monthly_color_votes": monthly_color_votes,
"daily_percent": daily_percent,
"weekly_percent": weekly_percent,
"monthly_percent": monthly_percent
}]
dal.write(output, report_data)

View file

@ -0,0 +1,20 @@
"""Main entrypoint for the color_vote_report_calc job for use with bazel-defined graph."""
import sys
import os
import json
from databuild.proto import PartitionRef
from databuild.test.app.jobs.color_vote_report_calc.config import configure
from databuild.test.app.jobs.color_vote_report_calc.execute import execute
if __name__ == "__main__":
if sys.argv[1] == "config":
response = configure([
PartitionRef(str=raw_ref)
for raw_ref in sys.argv[2:]
])
print(json.dumps(response.to_dict()))
elif sys.argv[1] == "exec":
execute(sys.argv[2:])
else:
raise Exception(f"Invalid command `{sys.argv[1]}`")

View file

@ -0,0 +1,60 @@
import unittest
from databuild.proto import PartitionRef
from databuild.test.app.jobs.color_vote_report_calc.config import configure
class TestColorVoteReportCalcConfig(unittest.TestCase):
def test_configure_single_output(self):
outputs = [PartitionRef(str="color_vote_report/2024-01-15/red")]
response = configure(outputs)
self.assertEqual(len(response.configs), 1) # Always single config
config = response.configs[0]
self.assertEqual(len(config.outputs), 1)
self.assertEqual(config.args, ["color_vote_report/2024-01-15/red"])
# Should have inputs for total votes and color-specific votes
expected_inputs = [
"daily_votes/2024-01-15",
"votes_1w/2024-01-15",
"votes_1m/2024-01-15",
"daily_color_votes/2024-01-15/red",
"color_votes_1w/2024-01-15/red",
"color_votes_1m/2024-01-15/red"
]
actual_inputs = [inp.str for inp in config.inputs]
for expected in expected_inputs:
self.assertIn(expected, actual_inputs)
def test_configure_multiple_outputs_same_date(self):
outputs = [
PartitionRef(str="color_vote_report/2024-01-15/red"),
PartitionRef(str="color_vote_report/2024-01-15/blue")
]
response = configure(outputs)
self.assertEqual(len(response.configs), 1) # Single config for all outputs
config = response.configs[0]
self.assertEqual(len(config.outputs), 2)
self.assertEqual(set(config.args), {
"color_vote_report/2024-01-15/red",
"color_vote_report/2024-01-15/blue"
})
def test_configure_multiple_dates(self):
outputs = [
PartitionRef(str="color_vote_report/2024-01-15/red"),
PartitionRef(str="color_vote_report/2024-01-16/red")
]
response = configure(outputs)
self.assertEqual(len(response.configs), 1) # Single config for all outputs
config = response.configs[0]
self.assertEqual(len(config.outputs), 2)
# Should have total vote inputs for both dates
actual_inputs = [inp.str for inp in config.inputs]
self.assertIn("daily_votes/2024-01-15", actual_inputs)
self.assertIn("daily_votes/2024-01-16", actual_inputs)
if __name__ == "__main__":
unittest.main()

View file

@ -2,17 +2,19 @@
import sys
import os
import json
from databuild.proto import PartitionRef
from databuild.test.app.jobs.ingest_color_votes.config import configure
from databuild.test.app.jobs.ingest_color_votes.execute import execute
if __name__ == "__main__":
if sys.argv[1] == "config":
configure([
response = configure([
PartitionRef(str=raw_ref)
for raw_ref in sys.argv[2:]
])
elif sys.argv[1] == "execute":
print(json.dumps(response.to_dict()))
elif sys.argv[1] == "exec":
execute(os.environ["DATA_DATE"], os.environ["COLOR"])
else:
raise Exception(f"Invalid command `{sys.argv[1]}`")

View file

@ -2,17 +2,19 @@
import sys
import os
import json
from databuild.proto import PartitionRef
from databuild.test.app.jobs.trailing_color_votes.config import configure
from databuild.test.app.jobs.trailing_color_votes.execute import execute
if __name__ == "__main__":
if sys.argv[1] == "config":
configure([
response = configure([
PartitionRef(str=raw_ref)
for raw_ref in sys.argv[2:]
])
elif sys.argv[1] == "execute":
print(json.dumps(response.to_dict()))
elif sys.argv[1] == "exec":
execute(os.environ["DATA_DATE"], os.environ["COLOR"])
else:
raise Exception(f"Invalid command `{sys.argv[1]}`")