4.1 KiB
4.1 KiB
App Specification
AKA the different ways databuild applications can be described.
Correctness Strategy
- Examples implemented that use each graph specification strategy, and are tested in CI/CD.
- Graph specification strategies provide
Bazel
- Purpose: compilation/build target that fulfills promise of project (like bytecode for JVM langs)
- Job binaries (config and exec)
- Graph lookup binary (lookup)
- Job target (config and exec)
- Graph target (build and analyze)
- See core build for details
Python
- Wrapper functions enable graph registry
- Partition object increases ergonomics and enables explicit data coupling
from dataclasses import dataclass
from databuild import (
DataBuildGraph, DataBuildJob, Partition, JobConfig, PyJobConfig, BazelJobConfig, PartitionManifest, Want
)
from helpers import ingest_reviews, categorize_reviews, sla_failure_notify
from datetime import datetime, timedelta
graph = DataBuildGraph("//:podcast_reviews_graph")
ALL_CATEGORIES = {"comedy", ...}
# Partition definitions, used by the graph to resolve jobs by introspecting their config signatures
ExtractedReviews = Partition[r"reviews/date=(?P<date>\d{4}-\d{2}-\d{2})"]
CategorizedReviews = Partition[r"categorized_reviews/category=(?P<category>[^/]+)/date=(?P<date>\d{4}-\d{2}-\d{2})"]
PhraseModel = Partition[r"phrase_models/category=(?P<category>[^/]+)/date=(?P<date>\d{4}-\d{2}-\d{2})"]
PhraseStats = Partition[r"phrase_stats/category=(?P<category>[^/]+)/date=(?P<date>\d{4}-\d{2}-\d{2})"]
@graph.job
class ExtractReviews(DataBuildJob):
def config(self, outputs: list[ExtractedReviews]) -> list[JobConfig]:
# One job run can output multiple partitions
args = [p.date for p in outputs]
return [JobConfig(outputs=outputs, inputs=[], args=args,)]
def exec(self, config: JobConfig) -> PartitionManifest:
for (date, output) in zip(config.args, config.outputs):
ingest_reviews(date).write(output)
# Start and end time inferred by wrapper (but could be overridden)
return config.partitionManifest(job=self)
@dataclass
class CategorizeReviewsArgs:
date: str
category: str
@graph.job
class CategorizeReviews(DataBuildJob):
def config(self, outputs: list[CategorizedReviews]) -> list[JobConfig]:
# This job only outputs one partition per run
return [
# The PyJobConfig allows you to pass objects in config, rather than just `args` and `env`
PyJobConfig[CategorizeReviewsArgs](
outputs=[p],
inputs=ExtractedReviews.dep.materialize(date=p.date),
params=CategorizeReviewsArgs(date=p.date, category=p.category),
)
for p in outputs
]
def exec(self, config: PyJobConfig[CategorizeReviewsArgs]) -> None:
categorize_reviews(config.params.date, config.params.category)
# Partition manifest automatically constructed from config
@graph.job
class PhraseModeling(DataBuildJob):
def config(self, outputs: list[PhraseModel]) -> list[JobConfig]:
# This job relies on a bazel executable target to run the actual job
return [
BazelJobConfig(
outputs=[p],
inputs=[CategorizedReviews.dep.materialize(date=p.date, category=p.category)],
exec_target="//jobs:phrase_modeling",
env={"CATEGORY": p.category, "DATA_DATE": p.date},
)
for p in outputs
]
# This job is fully defined in bazel
graph.bazel_job(target="//jobs:phrase_stats_job", outputs=list[PhraseStats])
@graph.want(cron='0 0 * * *')
def phrase_stats_want() -> list[Want[PhraseStats]]:
# Crates a new want every midnight that times out in 3 days
wanted = [PhraseStats(date=datetime.now().date().isoformat(), category=cat) for cat in ALL_CATEGORIES]
on_fail = lambda p: f"Failed to calculate partition `{p}`"
return [graph.want(partitions=wanted, ttl=timedelta(days=3), on_fail=on_fail)]
- TODO - do we need an escape hatch for "after 2025 use this job, before use that job" functionality?