This commit is contained in:
parent
e4db350833
commit
30f1d9addb
9 changed files with 92 additions and 8 deletions
|
|
@ -9,7 +9,9 @@ The fictitious use case is "daily color votes". The underlying input data is vot
|
|||
|
||||
- Time-range: 1 day depending on N prior days
|
||||
- Multi-partition-output jobs
|
||||
- Both for "always output multiple" or "consume different inputs based on desired output"
|
||||
- Always output multiple, e.g. producing per type
|
||||
- Consume different inputs based on desired output
|
||||
- Produce multiple of the same type depending on input
|
||||
|
||||
```mermaid
|
||||
flowchart TD
|
||||
|
|
@ -27,3 +29,6 @@ flowchart TD
|
|||
color_votes_1m --> aggregate_color_votes --> votes_1m
|
||||
daily_votes & votes_1w & votes_1m & color_votes_1w & color_votes_1m --> color_vote_report_calc --> color_vote_report
|
||||
```
|
||||
|
||||
## Data Access
|
||||
Data access is implemented in [`dal.py`](./dal.py), with data written as lists of dicts in JSON. Partition fields are stored as values in those dicts.
|
||||
|
|
|
|||
2
databuild/test/app/colors.py
Normal file
2
databuild/test/app/colors.py
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
|
||||
COLORS = ["red", "blue", "green", "yellow", "cerulean", "cucumber", "sage", "forest"]
|
||||
10
databuild/test/app/jobs/aggregate_color_votes/README.md
Normal file
10
databuild/test/app/jobs/aggregate_color_votes/README.md
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
|
||||
# Aggregate Color Votes
|
||||
This job adds up votes across colors for the same date. It uses an arbitrary list of colors from [`colors.py`](../../colors.py).
|
||||
|
||||
## Configure
|
||||
When requested for a given date, it creates a job config for each date and type of aggregation, handling daily, weekly,
|
||||
and monthly aggregates. Declares data deps based on date and the colors in [`colors.py`](../../colors.py).
|
||||
|
||||
## Execute
|
||||
Simply sums the `votes` from the referenced partitions and writes them.
|
||||
18
databuild/test/app/jobs/color_vote_report_calc/README.md
Normal file
18
databuild/test/app/jobs/color_vote_report_calc/README.md
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
|
||||
# Color Vote Report Calc
|
||||
Calculates some metrics based on data calculated by other aggregates:
|
||||
- Total votes
|
||||
- On this day
|
||||
- In last week
|
||||
- In last month
|
||||
- Percent of total votes going to this color
|
||||
- On this day
|
||||
- In last week
|
||||
- In last month
|
||||
|
||||
## Configure
|
||||
This job tests the "produce multiple partitions based on requested inputs in one run" mode. It only ever produces a
|
||||
single job config, which produces all requested outputs.
|
||||
|
||||
## Execute
|
||||
Iterates over requested partitions and performs calculations described above.
|
||||
9
databuild/test/app/jobs/ingest_color_votes/README.md
Normal file
9
databuild/test/app/jobs/ingest_color_votes/README.md
Normal file
|
|
@ -0,0 +1,9 @@
|
|||
|
||||
# Ingest Color Votes
|
||||
This job simply generates a random number between 0 and 1000 and writes it to the output.
|
||||
|
||||
## Configure
|
||||
The job has no inputs, and communicates params via env var. It generates a single job config per color date requested.
|
||||
|
||||
## Execute
|
||||
Generates data with `votes` being a number between 0 and 1000.
|
||||
|
|
@ -4,8 +4,10 @@ from datetime import date
|
|||
|
||||
|
||||
def configure(outputs: list[PartitionRef]) -> JobConfigureResponse:
|
||||
prefix, data_date, color = outputs[0].str.split("/")
|
||||
date.fromisoformat(data_date) # Should be able to parse date
|
||||
assert prefix == "daily_color_votes"
|
||||
config = JobConfig(outputs = outputs, inputs=[], args=[], env={"data_date": data_date, "color": color})
|
||||
return JobConfigureResponse(configs=[config])
|
||||
configs = []
|
||||
for output in outputs:
|
||||
prefix, data_date, color = output.str.split("/")
|
||||
date.fromisoformat(data_date) # Should be able to parse date
|
||||
assert prefix == "daily_color_votes"
|
||||
configs.append(JobConfig(outputs = [output], inputs=[], args=[], env={"DATA_DATE": data_date, "COLOR": color}))
|
||||
return JobConfigureResponse(configs=configs)
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
"""Main entrypoint for the ingest_color_votes job for use with bazel-defined graph."""
|
||||
|
||||
import sys
|
||||
import os
|
||||
from databuild.proto import PartitionRef
|
||||
from databuild.test.app.jobs.ingest_color_votes.config import configure
|
||||
from databuild.test.app.jobs.ingest_color_votes.execute import execute
|
||||
|
|
@ -11,6 +13,6 @@ if __name__ == "__main__":
|
|||
for raw_ref in sys.argv[2:]
|
||||
])
|
||||
elif sys.argv[1] == "execute":
|
||||
execute(sys.argv[2], sys.argv[3])
|
||||
execute(os.environ["DATA_DATE"], os.environ["COLOR"])
|
||||
else:
|
||||
raise Exception(f"Invalid command `{sys.argv[1]}`")
|
||||
|
|
|
|||
|
|
@ -1,9 +1,34 @@
|
|||
|
||||
from databuild.test.app.jobs.ingest_color_votes.config import configure
|
||||
from databuild.test.app.jobs.ingest_color_votes.execute import execute
|
||||
from databuild.test.app import dal
|
||||
from databuild.proto import PartitionRef
|
||||
|
||||
|
||||
def test_ingest_color_votes_configure():
|
||||
refs_single = [PartitionRef(str="daily_color_votes/2025-01-01/red")]
|
||||
config_single = configure(refs_single)
|
||||
assert len(config_single.configs) == 1
|
||||
assert config_single.configs[0].outputs[0].str == "daily_color_votes/2025-01-01/red"
|
||||
assert config_single.configs[0].env["COLOR"] == "red"
|
||||
assert config_single.configs[0].env["DATA_DATE"] == "2025-01-01"
|
||||
|
||||
refs_multiple = [
|
||||
PartitionRef(str="daily_color_votes/2025-01-02/red"),
|
||||
PartitionRef(str="daily_color_votes/2025-01-02/blue"),
|
||||
]
|
||||
|
||||
config_multiple = configure(refs_multiple)
|
||||
assert len(config_multiple.configs) == 2
|
||||
assert len(config_multiple.configs[0].outputs) == 1
|
||||
assert config_multiple.configs[0].outputs[0].str == "daily_color_votes/2025-01-02/red"
|
||||
assert config_multiple.configs[0].env["COLOR"] == "red"
|
||||
assert config_multiple.configs[0].env["DATA_DATE"] == "2025-01-02"
|
||||
assert len(config_multiple.configs[1].outputs) == 1
|
||||
assert config_multiple.configs[1].outputs[0].str == "daily_color_votes/2025-01-02/blue"
|
||||
assert config_multiple.configs[1].env["COLOR"] == "blue"
|
||||
assert config_multiple.configs[1].env["DATA_DATE"] == "2025-01-02"
|
||||
|
||||
|
||||
def test_ingest_color_votes():
|
||||
execute("2025-01-01", "red")
|
||||
results = dal.read(PartitionRef(str="daily_color_votes/2025-01-01/red"))
|
||||
|
|
|
|||
11
databuild/test/app/jobs/trailing_color_votes/README.md
Normal file
11
databuild/test/app/jobs/trailing_color_votes/README.md
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
|
||||
# Trailing Color Votes
|
||||
This job adds up votes from trailing days. For week granularity, it's 7 days, for month granularity its 28 days.
|
||||
|
||||
## Configure
|
||||
Produces a job config for every color and date. Uses "WEEKLY" and "MONTHLY" env vars to signal when those aggregates
|
||||
should be calculated and written, e.g. when both weekly and monthly are requested for the same color + date, a single
|
||||
job config is produced that configures the job to produce both.
|
||||
|
||||
## Execute
|
||||
Just reads trailing data for the specified color and date and adds it up, writing `votes` with the sum.
|
||||
Loading…
Reference in a new issue