Add start of test application

This commit is contained in:
Stuart Axelbrooke 2025-07-30 21:22:19 -07:00
parent e1200eda46
commit e4db350833
9 changed files with 140 additions and 1 deletions

View file

@ -188,6 +188,7 @@ py_library(
visibility = ["//visibility:public"],
deps = [
"@pypi//betterproto2_compiler",
"@pypi//grpcio",
"@pypi//pytest",
],
)

View file

@ -57,6 +57,5 @@ py_test(
srcs = ["py_proto_test.py"],
deps = [
"//databuild:py_proto",
"@pypi//grpcio",
],
)

View file

@ -0,0 +1,25 @@
load("//databuild:rules.bzl", "databuild_graph", "databuild_job")
py_library(
name = "job_src",
srcs = glob(["**/*.py"]),
visibility = ["//visibility:public"],
deps = ["//databuild:py_proto"],
)
# Tests
py_test(
name = "test",
srcs = glob(["**/test.py"]),
deps = [":job_src"],
)
# Bazel-defined
#databuild_job(
# name = "ingest_color_votes",
#)
# Python-DSL-defined
# TODO

View file

@ -0,0 +1,29 @@
# Test DataBuild App
This directory contains common job components for testing databuild apps described via different methods, e.g. the core bazel targets, the python DSL, etc.
## Structure
The fictitious use case is "daily color votes". The underlying input data is votes per color per day, which we combine and aggregate in ways that help us test different aspects of databuild. Job exec contents should be trivial, as the purpose is to test composition. Types of partition relationships:
- Time-range: 1 day depending on N prior days
- Multi-partition-output jobs
- Both for "always output multiple" or "consume different inputs based on desired output"
```mermaid
flowchart TD
daily_color_votes[(daily_color_votes/$date/$color)]
color_votes_1w[(color_votes_1w/$date/$color)]
color_votes_1m[(color_votes_1m/$date/$color)]
daily_votes[(daily_votes/$date)]
votes_1w[(votes_1w/$date)]
votes_1m[(votes_1m/$date)]
color_vote_report[(color_vote_report/$date/$color)]
ingest_color_votes --> daily_color_votes
daily_color_votes --> trailing_color_votes --> color_votes_1w & color_votes_1m
daily_color_votes --> aggregate_color_votes --> daily_votes
color_votes_1w --> aggregate_color_votes --> votes_1w
color_votes_1m --> aggregate_color_votes --> votes_1m
daily_votes & votes_1w & votes_1m & color_votes_1w & color_votes_1m --> color_vote_report_calc --> color_vote_report
```

30
databuild/test/app/dal.py Normal file
View file

@ -0,0 +1,30 @@
from databuild.proto import PartitionRef
import json
from pathlib import Path
def ref_path(ref: PartitionRef) -> str:
assert isinstance(ref, PartitionRef), f"Wanted PartitionRef, got `{type(ref)}`"
return "data/" + ref.str.lstrip("/") + "/data.json"
def read(*refs: PartitionRef, empty_ok: bool=True) -> list[dict]:
results = []
for ref in refs:
try:
with open(ref_path(ref)) as infile:
results.extend(json.load(infile))
except FileNotFoundError:
if not empty_ok:
raise
return []
return results
def write(ref: PartitionRef, data: list[dict]) -> None:
# mkdirs before writing in case path doesn't exist
path = ref_path(ref)
Path(path.rsplit("/", 1)[0]).mkdir(parents=True, exist_ok=True)
with open(path, "w") as outfile:
json.dump(data, outfile)

View file

@ -0,0 +1,11 @@
from databuild.proto import PartitionRef, JobConfigureResponse, JobConfig
from datetime import date
def configure(outputs: list[PartitionRef]) -> JobConfigureResponse:
prefix, data_date, color = outputs[0].str.split("/")
date.fromisoformat(data_date) # Should be able to parse date
assert prefix == "daily_color_votes"
config = JobConfig(outputs = outputs, inputs=[], args=[], env={"data_date": data_date, "color": color})
return JobConfigureResponse(configs=[config])

View file

@ -0,0 +1,10 @@
from databuild.test.app import dal
from databuild.proto import PartitionRef
import random
def execute(data_date: str, color: str):
random.seed(hash((data_date, color)))
ref = PartitionRef(str=f"daily_color_votes/{data_date}/{color}")
dal.write(ref, [{"color": color, "data_date": data_date, "votes": random.randint(0, 1000)}])

View file

@ -0,0 +1,16 @@
import sys
from databuild.proto import PartitionRef
from databuild.test.app.jobs.ingest_color_votes.config import configure
from databuild.test.app.jobs.ingest_color_votes.execute import execute
if __name__ == "__main__":
if sys.argv[1] == "config":
configure([
PartitionRef(str=raw_ref)
for raw_ref in sys.argv[2:]
])
elif sys.argv[1] == "execute":
execute(sys.argv[2], sys.argv[3])
else:
raise Exception(f"Invalid command `{sys.argv[1]}`")

View file

@ -0,0 +1,18 @@
from databuild.test.app.jobs.ingest_color_votes.execute import execute
from databuild.test.app import dal
from databuild.proto import PartitionRef
def test_ingest_color_votes():
execute("2025-01-01", "red")
results = dal.read(PartitionRef(str="daily_color_votes/2025-01-01/red"))
assert len(results) == 1
assert results[0]["color"] == "red"
assert results[0]["data_date"] == "2025-01-01"
assert isinstance(results[0]["votes"], int)
if __name__ == '__main__':
import pytest
raise SystemExit(pytest.main([__file__]))