diff --git a/databuild/test/app/README.md b/databuild/test/app/README.md index 59d7ff5..35aaa4b 100644 --- a/databuild/test/app/README.md +++ b/databuild/test/app/README.md @@ -9,7 +9,9 @@ The fictitious use case is "daily color votes". The underlying input data is vot - Time-range: 1 day depending on N prior days - Multi-partition-output jobs - - Both for "always output multiple" or "consume different inputs based on desired output" + - Always output multiple, e.g. producing per type + - Consume different inputs based on desired output + - Produce multiple of the same type depending on input ```mermaid flowchart TD @@ -27,3 +29,6 @@ flowchart TD color_votes_1m --> aggregate_color_votes --> votes_1m daily_votes & votes_1w & votes_1m & color_votes_1w & color_votes_1m --> color_vote_report_calc --> color_vote_report ``` + +## Data Access +Data access is implemented in [`dal.py`](./dal.py), with data written as lists of dicts in JSON. Partition fields are stored as values in those dicts. diff --git a/databuild/test/app/colors.py b/databuild/test/app/colors.py new file mode 100644 index 0000000..abae089 --- /dev/null +++ b/databuild/test/app/colors.py @@ -0,0 +1,2 @@ + +COLORS = ["red", "blue", "green", "yellow", "cerulean", "cucumber", "sage", "forest"] diff --git a/databuild/test/app/jobs/aggregate_color_votes/README.md b/databuild/test/app/jobs/aggregate_color_votes/README.md new file mode 100644 index 0000000..24ea627 --- /dev/null +++ b/databuild/test/app/jobs/aggregate_color_votes/README.md @@ -0,0 +1,10 @@ + +# Aggregate Color Votes +This job adds up votes across colors for the same date. It uses an arbitrary list of colors from [`colors.py`](../../colors.py). + +## Configure +When requested for a given date, it creates a job config for each date and type of aggregation, handling daily, weekly, +and monthly aggregates. Declares data deps based on date and the colors in [`colors.py`](../../colors.py). + +## Execute +Simply sums the `votes` from the referenced partitions and writes them. diff --git a/databuild/test/app/jobs/color_vote_report_calc/README.md b/databuild/test/app/jobs/color_vote_report_calc/README.md new file mode 100644 index 0000000..7f2b601 --- /dev/null +++ b/databuild/test/app/jobs/color_vote_report_calc/README.md @@ -0,0 +1,18 @@ + +# Color Vote Report Calc +Calculates some metrics based on data calculated by other aggregates: +- Total votes + - On this day + - In last week + - In last month +- Percent of total votes going to this color + - On this day + - In last week + - In last month + +## Configure +This job tests the "produce multiple partitions based on requested inputs in one run" mode. It only ever produces a +single job config, which produces all requested outputs. + +## Execute +Iterates over requested partitions and performs calculations described above. diff --git a/databuild/test/app/jobs/ingest_color_votes/README.md b/databuild/test/app/jobs/ingest_color_votes/README.md new file mode 100644 index 0000000..2299dd2 --- /dev/null +++ b/databuild/test/app/jobs/ingest_color_votes/README.md @@ -0,0 +1,9 @@ + +# Ingest Color Votes +This job simply generates a random number between 0 and 1000 and writes it to the output. + +## Configure +The job has no inputs, and communicates params via env var. It generates a single job config per color date requested. + +## Execute +Generates data with `votes` being a number between 0 and 1000. diff --git a/databuild/test/app/jobs/ingest_color_votes/config.py b/databuild/test/app/jobs/ingest_color_votes/config.py index 961945d..cea5008 100644 --- a/databuild/test/app/jobs/ingest_color_votes/config.py +++ b/databuild/test/app/jobs/ingest_color_votes/config.py @@ -4,8 +4,10 @@ from datetime import date def configure(outputs: list[PartitionRef]) -> JobConfigureResponse: - prefix, data_date, color = outputs[0].str.split("/") - date.fromisoformat(data_date) # Should be able to parse date - assert prefix == "daily_color_votes" - config = JobConfig(outputs = outputs, inputs=[], args=[], env={"data_date": data_date, "color": color}) - return JobConfigureResponse(configs=[config]) + configs = [] + for output in outputs: + prefix, data_date, color = output.str.split("/") + date.fromisoformat(data_date) # Should be able to parse date + assert prefix == "daily_color_votes" + configs.append(JobConfig(outputs = [output], inputs=[], args=[], env={"DATA_DATE": data_date, "COLOR": color})) + return JobConfigureResponse(configs=configs) diff --git a/databuild/test/app/jobs/ingest_color_votes/main.py b/databuild/test/app/jobs/ingest_color_votes/main.py index f349699..71470a5 100644 --- a/databuild/test/app/jobs/ingest_color_votes/main.py +++ b/databuild/test/app/jobs/ingest_color_votes/main.py @@ -1,5 +1,7 @@ +"""Main entrypoint for the ingest_color_votes job for use with bazel-defined graph.""" import sys +import os from databuild.proto import PartitionRef from databuild.test.app.jobs.ingest_color_votes.config import configure from databuild.test.app.jobs.ingest_color_votes.execute import execute @@ -11,6 +13,6 @@ if __name__ == "__main__": for raw_ref in sys.argv[2:] ]) elif sys.argv[1] == "execute": - execute(sys.argv[2], sys.argv[3]) + execute(os.environ["DATA_DATE"], os.environ["COLOR"]) else: raise Exception(f"Invalid command `{sys.argv[1]}`") diff --git a/databuild/test/app/jobs/ingest_color_votes/test.py b/databuild/test/app/jobs/ingest_color_votes/test.py index 460c037..fc3433f 100644 --- a/databuild/test/app/jobs/ingest_color_votes/test.py +++ b/databuild/test/app/jobs/ingest_color_votes/test.py @@ -1,9 +1,34 @@ - +from databuild.test.app.jobs.ingest_color_votes.config import configure from databuild.test.app.jobs.ingest_color_votes.execute import execute from databuild.test.app import dal from databuild.proto import PartitionRef +def test_ingest_color_votes_configure(): + refs_single = [PartitionRef(str="daily_color_votes/2025-01-01/red")] + config_single = configure(refs_single) + assert len(config_single.configs) == 1 + assert config_single.configs[0].outputs[0].str == "daily_color_votes/2025-01-01/red" + assert config_single.configs[0].env["COLOR"] == "red" + assert config_single.configs[0].env["DATA_DATE"] == "2025-01-01" + + refs_multiple = [ + PartitionRef(str="daily_color_votes/2025-01-02/red"), + PartitionRef(str="daily_color_votes/2025-01-02/blue"), + ] + + config_multiple = configure(refs_multiple) + assert len(config_multiple.configs) == 2 + assert len(config_multiple.configs[0].outputs) == 1 + assert config_multiple.configs[0].outputs[0].str == "daily_color_votes/2025-01-02/red" + assert config_multiple.configs[0].env["COLOR"] == "red" + assert config_multiple.configs[0].env["DATA_DATE"] == "2025-01-02" + assert len(config_multiple.configs[1].outputs) == 1 + assert config_multiple.configs[1].outputs[0].str == "daily_color_votes/2025-01-02/blue" + assert config_multiple.configs[1].env["COLOR"] == "blue" + assert config_multiple.configs[1].env["DATA_DATE"] == "2025-01-02" + + def test_ingest_color_votes(): execute("2025-01-01", "red") results = dal.read(PartitionRef(str="daily_color_votes/2025-01-01/red")) diff --git a/databuild/test/app/jobs/trailing_color_votes/README.md b/databuild/test/app/jobs/trailing_color_votes/README.md new file mode 100644 index 0000000..9b7479c --- /dev/null +++ b/databuild/test/app/jobs/trailing_color_votes/README.md @@ -0,0 +1,11 @@ + +# Trailing Color Votes +This job adds up votes from trailing days. For week granularity, it's 7 days, for month granularity its 28 days. + +## Configure +Produces a job config for every color and date. Uses "WEEKLY" and "MONTHLY" env vars to signal when those aggregates +should be calculated and written, e.g. when both weekly and monthly are requested for the same color + date, a single +job config is produced that configures the job to produce both. + +## Execute +Just reads trailing data for the specified color and date and adds it up, writing `votes` with the sum.