From e4db35083313d5f8e363946329dd3cbda13f9d4a Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Wed, 30 Jul 2025 21:22:19 -0700 Subject: [PATCH] Add start of test application --- databuild/BUILD.bazel | 1 + databuild/test/BUILD.bazel | 1 - databuild/test/app/BUILD.bazel | 25 ++++++++++++++++ databuild/test/app/README.md | 29 ++++++++++++++++++ databuild/test/app/dal.py | 30 +++++++++++++++++++ .../app/jobs/ingest_color_votes/config.py | 11 +++++++ .../app/jobs/ingest_color_votes/execute.py | 10 +++++++ .../test/app/jobs/ingest_color_votes/main.py | 16 ++++++++++ .../test/app/jobs/ingest_color_votes/test.py | 18 +++++++++++ 9 files changed, 140 insertions(+), 1 deletion(-) create mode 100644 databuild/test/app/BUILD.bazel create mode 100644 databuild/test/app/README.md create mode 100644 databuild/test/app/dal.py create mode 100644 databuild/test/app/jobs/ingest_color_votes/config.py create mode 100644 databuild/test/app/jobs/ingest_color_votes/execute.py create mode 100644 databuild/test/app/jobs/ingest_color_votes/main.py create mode 100644 databuild/test/app/jobs/ingest_color_votes/test.py diff --git a/databuild/BUILD.bazel b/databuild/BUILD.bazel index ea68eb1..64ed4a4 100644 --- a/databuild/BUILD.bazel +++ b/databuild/BUILD.bazel @@ -188,6 +188,7 @@ py_library( visibility = ["//visibility:public"], deps = [ "@pypi//betterproto2_compiler", + "@pypi//grpcio", "@pypi//pytest", ], ) diff --git a/databuild/test/BUILD.bazel b/databuild/test/BUILD.bazel index f2b6417..58c6886 100644 --- a/databuild/test/BUILD.bazel +++ b/databuild/test/BUILD.bazel @@ -57,6 +57,5 @@ py_test( srcs = ["py_proto_test.py"], deps = [ "//databuild:py_proto", - "@pypi//grpcio", ], ) diff --git a/databuild/test/app/BUILD.bazel b/databuild/test/app/BUILD.bazel new file mode 100644 index 0000000..7ad5aa5 --- /dev/null +++ b/databuild/test/app/BUILD.bazel @@ -0,0 +1,25 @@ +load("//databuild:rules.bzl", "databuild_graph", "databuild_job") + +py_library( + name = "job_src", + srcs = glob(["**/*.py"]), + visibility = ["//visibility:public"], + deps = ["//databuild:py_proto"], +) + +# Tests +py_test( + name = "test", + srcs = glob(["**/test.py"]), + deps = [":job_src"], +) + +# Bazel-defined + +#databuild_job( +# name = "ingest_color_votes", +#) + +# Python-DSL-defined + +# TODO diff --git a/databuild/test/app/README.md b/databuild/test/app/README.md new file mode 100644 index 0000000..59d7ff5 --- /dev/null +++ b/databuild/test/app/README.md @@ -0,0 +1,29 @@ + +# Test DataBuild App + +This directory contains common job components for testing databuild apps described via different methods, e.g. the core bazel targets, the python DSL, etc. + +## Structure + +The fictitious use case is "daily color votes". The underlying input data is votes per color per day, which we combine and aggregate in ways that help us test different aspects of databuild. Job exec contents should be trivial, as the purpose is to test composition. Types of partition relationships: + +- Time-range: 1 day depending on N prior days +- Multi-partition-output jobs + - Both for "always output multiple" or "consume different inputs based on desired output" + +```mermaid +flowchart TD + daily_color_votes[(daily_color_votes/$date/$color)] + color_votes_1w[(color_votes_1w/$date/$color)] + color_votes_1m[(color_votes_1m/$date/$color)] + daily_votes[(daily_votes/$date)] + votes_1w[(votes_1w/$date)] + votes_1m[(votes_1m/$date)] + color_vote_report[(color_vote_report/$date/$color)] + ingest_color_votes --> daily_color_votes + daily_color_votes --> trailing_color_votes --> color_votes_1w & color_votes_1m + daily_color_votes --> aggregate_color_votes --> daily_votes + color_votes_1w --> aggregate_color_votes --> votes_1w + color_votes_1m --> aggregate_color_votes --> votes_1m + daily_votes & votes_1w & votes_1m & color_votes_1w & color_votes_1m --> color_vote_report_calc --> color_vote_report +``` diff --git a/databuild/test/app/dal.py b/databuild/test/app/dal.py new file mode 100644 index 0000000..3ffeb15 --- /dev/null +++ b/databuild/test/app/dal.py @@ -0,0 +1,30 @@ + +from databuild.proto import PartitionRef +import json +from pathlib import Path + + +def ref_path(ref: PartitionRef) -> str: + assert isinstance(ref, PartitionRef), f"Wanted PartitionRef, got `{type(ref)}`" + return "data/" + ref.str.lstrip("/") + "/data.json" + + +def read(*refs: PartitionRef, empty_ok: bool=True) -> list[dict]: + results = [] + for ref in refs: + try: + with open(ref_path(ref)) as infile: + results.extend(json.load(infile)) + except FileNotFoundError: + if not empty_ok: + raise + return [] + return results + + +def write(ref: PartitionRef, data: list[dict]) -> None: + # mkdirs before writing in case path doesn't exist + path = ref_path(ref) + Path(path.rsplit("/", 1)[0]).mkdir(parents=True, exist_ok=True) + with open(path, "w") as outfile: + json.dump(data, outfile) diff --git a/databuild/test/app/jobs/ingest_color_votes/config.py b/databuild/test/app/jobs/ingest_color_votes/config.py new file mode 100644 index 0000000..961945d --- /dev/null +++ b/databuild/test/app/jobs/ingest_color_votes/config.py @@ -0,0 +1,11 @@ +from databuild.proto import PartitionRef, JobConfigureResponse, JobConfig + +from datetime import date + + +def configure(outputs: list[PartitionRef]) -> JobConfigureResponse: + prefix, data_date, color = outputs[0].str.split("/") + date.fromisoformat(data_date) # Should be able to parse date + assert prefix == "daily_color_votes" + config = JobConfig(outputs = outputs, inputs=[], args=[], env={"data_date": data_date, "color": color}) + return JobConfigureResponse(configs=[config]) diff --git a/databuild/test/app/jobs/ingest_color_votes/execute.py b/databuild/test/app/jobs/ingest_color_votes/execute.py new file mode 100644 index 0000000..9281624 --- /dev/null +++ b/databuild/test/app/jobs/ingest_color_votes/execute.py @@ -0,0 +1,10 @@ + +from databuild.test.app import dal +from databuild.proto import PartitionRef +import random + + +def execute(data_date: str, color: str): + random.seed(hash((data_date, color))) + ref = PartitionRef(str=f"daily_color_votes/{data_date}/{color}") + dal.write(ref, [{"color": color, "data_date": data_date, "votes": random.randint(0, 1000)}]) diff --git a/databuild/test/app/jobs/ingest_color_votes/main.py b/databuild/test/app/jobs/ingest_color_votes/main.py new file mode 100644 index 0000000..f349699 --- /dev/null +++ b/databuild/test/app/jobs/ingest_color_votes/main.py @@ -0,0 +1,16 @@ + +import sys +from databuild.proto import PartitionRef +from databuild.test.app.jobs.ingest_color_votes.config import configure +from databuild.test.app.jobs.ingest_color_votes.execute import execute + +if __name__ == "__main__": + if sys.argv[1] == "config": + configure([ + PartitionRef(str=raw_ref) + for raw_ref in sys.argv[2:] + ]) + elif sys.argv[1] == "execute": + execute(sys.argv[2], sys.argv[3]) + else: + raise Exception(f"Invalid command `{sys.argv[1]}`") diff --git a/databuild/test/app/jobs/ingest_color_votes/test.py b/databuild/test/app/jobs/ingest_color_votes/test.py new file mode 100644 index 0000000..460c037 --- /dev/null +++ b/databuild/test/app/jobs/ingest_color_votes/test.py @@ -0,0 +1,18 @@ + +from databuild.test.app.jobs.ingest_color_votes.execute import execute +from databuild.test.app import dal +from databuild.proto import PartitionRef + + +def test_ingest_color_votes(): + execute("2025-01-01", "red") + results = dal.read(PartitionRef(str="daily_color_votes/2025-01-01/red")) + assert len(results) == 1 + assert results[0]["color"] == "red" + assert results[0]["data_date"] == "2025-01-01" + assert isinstance(results[0]["votes"], int) + + +if __name__ == '__main__': + import pytest + raise SystemExit(pytest.main([__file__]))