Add graph and trailling job

This commit is contained in:
Stuart Axelbrooke 2025-07-30 22:26:32 -07:00
parent 30f1d9addb
commit 63f9518486
6 changed files with 228 additions and 5 deletions

View file

@ -9,16 +9,68 @@ py_library(
# Tests
py_test(
name = "test",
srcs = glob(["**/test.py"]),
name = "test_trailing_color_votes",
srcs = ["jobs/trailing_color_votes/test.py"],
main = "jobs/trailing_color_votes/test.py",
deps = [":job_src"],
)
py_test(
name = "test_ingest_color_votes",
srcs = ["jobs/ingest_color_votes/test.py"],
main = "jobs/ingest_color_votes/test.py",
deps = [":job_src"],
)
# Bazel-defined
## Graph
databuild_graph(
name = "bazel_graph",
jobs = [
":ingest_color_votes",
":trailing_color_votes",
# TODO
],
lookup = ":bazel_graph_lookup",
)
#databuild_job(
# name = "ingest_color_votes",
#)
py_binary(
name = "bazel_graph_lookup",
srcs = ["lookup.py"],
main = "lookup.py",
)
## Ingest Color Votes
databuild_job(
name = "ingest_color_votes",
binary = ":ingest_color_votes_binary",
)
py_binary(
name = "ingest_color_votes_binary",
srcs = ["jobs/ingest_color_votes/main.py"],
main = "jobs/ingest_color_votes/main.py",
deps = [":job_src"],
)
## Trailing Color Votes
databuild_job(
name = "trailing_color_votes",
binary = ":trailing_color_votes_binary",
)
py_binary(
name = "trailing_color_votes_binary",
srcs = ["jobs/trailing_color_votes/main.py"],
main = "jobs/trailing_color_votes/main.py",
deps = [":job_src"],
)
## Aggregate Color Votes
# TODO
## Color Vote Report Calc
# TODO
# Python-DSL-defined

View file

@ -0,0 +1,49 @@
from databuild.proto import PartitionRef, JobConfigureResponse, JobConfig
from datetime import date, timedelta
from collections import defaultdict
def configure(outputs: list[PartitionRef]) -> JobConfigureResponse:
# Group outputs by date and color
grouped_outputs = defaultdict(list)
for output in outputs:
parts = output.str.split("/")
if len(parts) == 3 and parts[0] in ["color_votes_1w", "color_votes_1m"]:
prefix, data_date, color = parts
key = (data_date, color)
grouped_outputs[key].append(output)
else:
raise ValueError(f"Invalid output partition format: {output.str}")
configs = []
for (data_date, color), output_partitions in grouped_outputs.items():
# Parse the output date
output_date = date.fromisoformat(data_date)
# Determine which windows are needed and the maximum window
has_weekly = any(output.str.startswith("color_votes_1w/") for output in output_partitions)
has_monthly = any(output.str.startswith("color_votes_1m/") for output in output_partitions)
max_window = max(7 if has_weekly else 0, 28 if has_monthly else 0)
# Generate input partition refs for the required trailing window
inputs = []
for i in range(max_window):
input_date = output_date - timedelta(days=i)
input_ref = PartitionRef(str=f"daily_color_votes/{input_date.isoformat()}/{color}")
inputs.append(input_ref)
env = {
"DATA_DATE": data_date,
"COLOR": color,
"WEEKLY": "true" if has_weekly else "false",
"MONTHLY": "true" if has_monthly else "false"
}
configs.append(JobConfig(
outputs=output_partitions,
inputs=inputs,
args=[],
env=env
))
return JobConfigureResponse(configs=configs)

View file

@ -0,0 +1,28 @@
from databuild.test.app import dal
from databuild.proto import PartitionRef
from datetime import date, timedelta
import os
def execute(data_date: str, color: str):
output_date = date.fromisoformat(data_date)
weekly = os.environ.get("WEEKLY", "false").lower() == "true"
monthly = os.environ.get("MONTHLY", "false").lower() == "true"
def calculate_and_write(window_days: int, output_prefix: str):
# Read trailing data and sum votes
input_refs = []
for i in range(window_days):
input_date = output_date - timedelta(days=i)
input_refs.append(PartitionRef(str=f"daily_color_votes/{input_date.isoformat()}/{color}"))
data = dal.read(*input_refs)
total_votes = sum(record["votes"] for record in data)
output_ref = PartitionRef(str=f"{output_prefix}/{data_date}/{color}")
dal.write(output_ref, [{"color": color, "data_date": data_date, "votes": total_votes}])
if weekly:
calculate_and_write(7, "color_votes_1w")
if monthly:
calculate_and_write(28, "color_votes_1m")

View file

@ -0,0 +1,18 @@
"""Main entrypoint for the trailing_color_votes job for use with bazel-defined graph."""
import sys
import os
from databuild.proto import PartitionRef
from databuild.test.app.jobs.trailing_color_votes.config import configure
from databuild.test.app.jobs.trailing_color_votes.execute import execute
if __name__ == "__main__":
if sys.argv[1] == "config":
configure([
PartitionRef(str=raw_ref)
for raw_ref in sys.argv[2:]
])
elif sys.argv[1] == "execute":
execute(os.environ["DATA_DATE"], os.environ["COLOR"])
else:
raise Exception(f"Invalid command `{sys.argv[1]}`")

View file

@ -0,0 +1,53 @@
import unittest
from databuild.proto import PartitionRef
from databuild.test.app.jobs.trailing_color_votes.config import configure
class TestTrailingColorVotesConfig(unittest.TestCase):
def test_configure_weekly_only(self):
outputs = [PartitionRef(str="color_votes_1w/2024-01-07/red")]
response = configure(outputs)
self.assertEqual(len(response.configs), 1)
config = response.configs[0]
self.assertEqual(len(config.outputs), 1)
self.assertEqual(len(config.inputs), 7) # 7 days for weekly
self.assertEqual(config.env["WEEKLY"], "true")
self.assertEqual(config.env["MONTHLY"], "false")
def test_configure_monthly_only(self):
outputs = [PartitionRef(str="color_votes_1m/2024-01-28/blue")]
response = configure(outputs)
self.assertEqual(len(response.configs), 1)
config = response.configs[0]
self.assertEqual(len(config.outputs), 1)
self.assertEqual(len(config.inputs), 28) # 28 days for monthly
self.assertEqual(config.env["WEEKLY"], "false")
self.assertEqual(config.env["MONTHLY"], "true")
def test_configure_both_weekly_and_monthly(self):
outputs = [
PartitionRef(str="color_votes_1w/2024-01-28/green"),
PartitionRef(str="color_votes_1m/2024-01-28/green")
]
response = configure(outputs)
self.assertEqual(len(response.configs), 1) # Single config for same date/color
config = response.configs[0]
self.assertEqual(len(config.outputs), 2) # Both outputs
self.assertEqual(len(config.inputs), 28) # 28 days (max of 7 and 28)
self.assertEqual(config.env["WEEKLY"], "true")
self.assertEqual(config.env["MONTHLY"], "true")
def test_configure_multiple_colors_dates(self):
outputs = [
PartitionRef(str="color_votes_1w/2024-01-07/red"),
PartitionRef(str="color_votes_1w/2024-01-07/blue"),
PartitionRef(str="color_votes_1m/2024-01-14/red")
]
response = configure(outputs)
self.assertEqual(len(response.configs), 3) # One config per unique date/color combination
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,23 @@
from collections import defaultdict
import sys
LABEL_BASE = "//databuild/test/app"
def lookup(raw_ref: str):
if raw_ref.startswith("daily_color_votes"):
return LABEL_BASE + ":ingest_color_votes"
elif raw_ref.startswith("color_votes_1"):
return LABEL_BASE + ":trailing_color_votes"
elif raw_ref.startswith("daily_votes") or raw_ref.startswith("votes_1w") or raw_ref.startswith("votes_1m"):
return LABEL_BASE + ":aggregate_color_votes"
elif raw_ref.startswith("color_vote_report"):
return LABEL_BASE + ":color_vote_report_calc"
else:
raise ValueError(f"Unable to resolve job for partition: `{raw_ref}`")
if __name__ == "__main__":
results = defaultdict(list)
for raw_ref in sys.argv[1:]:
results[lookup(raw_ref)].append(raw_ref)