# App Specification AKA the different ways databuild applications can be described. ## Correctness Strategy - Examples implemented that use each graph specification strategy, and are tested in CI/CD. - Graph specification strategies provide ## Bazel - Purpose: compilation/build target that fulfills promise of project (like bytecode for JVM langs) - Job binaries (config and exec) - Graph lookup binary (lookup) - Job target (config and exec) - Graph target (build and analyze) - See [core build](./core-build.md) for details ## Python - Wrapper functions enable graph registry - Partition object increases ergonomics and enables explicit data coupling ```python from dataclasses import dataclass from databuild import ( DataBuildGraph, DataBuildJob, Partition, JobConfig, PyJobConfig, BazelJobConfig, PartitionManifest, Want ) from helpers import ingest_reviews, categorize_reviews, sla_failure_notify from datetime import datetime, timedelta graph = DataBuildGraph("//:podcast_reviews_graph") ALL_CATEGORIES = {"comedy", ...} # Partition definitions, used by the graph to resolve jobs by introspecting their config signatures ExtractedReviews = Partition[r"reviews/date=(?P\d{4}-\d{2}-\d{2})"] CategorizedReviews = Partition[r"categorized_reviews/category=(?P[^/]+)/date=(?P\d{4}-\d{2}-\d{2})"] PhraseModel = Partition[r"phrase_models/category=(?P[^/]+)/date=(?P\d{4}-\d{2}-\d{2})"] PhraseStats = Partition[r"phrase_stats/category=(?P[^/]+)/date=(?P\d{4}-\d{2}-\d{2})"] @graph.job class ExtractReviews(DataBuildJob): def config(self, outputs: list[ExtractedReviews]) -> list[JobConfig]: # One job run can output multiple partitions args = [p.date for p in outputs] return [JobConfig(outputs=outputs, inputs=[], args=args,)] def exec(self, config: JobConfig) -> PartitionManifest: for (date, output) in zip(config.args, config.outputs): ingest_reviews(date).write(output) # Start and end time inferred by wrapper (but could be overridden) return config.partitionManifest(job=self) @dataclass class CategorizeReviewsArgs: date: str category: str @graph.job class CategorizeReviews(DataBuildJob): def config(self, outputs: list[CategorizedReviews]) -> list[JobConfig]: # This job only outputs one partition per run return [ # The PyJobConfig allows you to pass objects in config, rather than just `args` and `env` PyJobConfig[CategorizeReviewsArgs]( outputs=[p], inputs=ExtractedReviews.dep.materialize(date=p.date), params=CategorizeReviewsArgs(date=p.date, category=p.category), ) for p in outputs ] def exec(self, config: PyJobConfig[CategorizeReviewsArgs]) -> None: categorize_reviews(config.params.date, config.params.category) # Partition manifest automatically constructed from config @graph.job class PhraseModeling(DataBuildJob): def config(self, outputs: list[PhraseModel]) -> list[JobConfig]: # This job relies on a bazel executable target to run the actual job return [ BazelJobConfig( outputs=[p], inputs=[CategorizedReviews.dep.materialize(date=p.date, category=p.category)], exec_target="//jobs:phrase_modeling", env={"CATEGORY": p.category, "DATA_DATE": p.date}, ) for p in outputs ] # This job is fully defined in bazel graph.bazel_job(target="//jobs:phrase_stats_job", outputs=list[PhraseStats]) @graph.want(cron='0 0 * * *') def phrase_stats_want() -> list[Want[PhraseStats]]: # Crates a new want every midnight that times out in 3 days wanted = [PhraseStats(date=datetime.now().date().isoformat(), category=cat) for cat in ALL_CATEGORIES] on_fail = lambda p: f"Failed to calculate partition `{p}`" return [graph.want(partitions=wanted, ttl=timedelta(days=3), on_fail=on_fail)] ``` - TODO - do we need an escape hatch for "after 2025 use this job, before use that job" functionality? ## Rust? ## Scala? ```scala import databuild._ import scala.concurrent.duration._ import java.time.LocalDate object PodcastReviewsGraph extends DataBuildGraph("//:podcast_reviews_graph") { val AllCategories = Set("comedy", ???) case class DatePartition(date: String) case class CategoryDatePartition(category: String, date: String) // Partition definitions using extractors object ExtractedReviews extends Partition[DatePartition]( """reviews/date=(?P\d{4}-\d{2}-\d{2})""".r ) object CategorizedReviews extends Partition[CategoryDatePartition]( """categorized_reviews/category=(?P[^/]+)/date=(?P\d{4}-\d{2}-\d{2})""".r ) object PhraseModel extends Partition[CategoryDatePartition]( """phrase_models/category=(?P[^/]+)/date=(?P\d{4}-\d{2}-\d{2})""".r ) object PhraseStats extends Partition[CategoryDatePartition]( """phrase_stats/category=(?P[^/]+)/date=(?P\d{4}-\d{2}-\d{2})""".r ) // Job definitions @job object ExtractReviewsJob extends DataBuildJob[ExtractedReviews] { def config(outputs: List[ExtractedReviews]): List[JobConfig] = { val args = outputs.map(_.date) List(JobConfig( outputs = outputs, inputs = Nil, args = args )) } def exec(config: JobConfig): PartitionManifest = { config.args.zip(config.outputs).foreach { case (date, output) => ingestReviews(date).writeTo(output) } config.toPartitionManifest(this) } } @job object CategorizeReviewsJob extends DataBuildJob[CategorizedReviews] { case class Args(date: String, category: String) def config(outputs: List[CategorizedReviews]): List[JobConfig] = { outputs.map { p => ScalaJobConfig[Args]( outputs = List(p), inputs = ExtractedReviews.dep.materialize(date = p.date), params = Args(p.date, p.category) ) } } def exec(config: ScalaJobConfig[Args]): Unit = { categorizeReviews(config.params.date, config.params.category) // Partition manifest auto-constructed } } @job object PhraseModelingJob extends DataBuildJob[PhraseModel] { def config(outputs: List[PhraseModel]): List[JobConfig] = { outputs.map { p => BazelJobConfig( outputs = List(p), inputs = List(CategorizedReviews.dep.materialize( category = p.category, date = p.date )), execTarget = "//jobs:phrase_modeling", env = Map("CATEGORY" -> p.category, "DATA_DATE" -> p.date) ) } } } // External bazel job bazelJob("//jobs:phrase_stats_job", outputType = classOf[PhraseStats]) // Want definition @want(cron = "0 0 * * *") def phraseStatsWant(): List[Want[PhraseStats]] = { val today = LocalDate.now().toString val wanted = AllCategories.map(cat => PhraseStats(cat, today)).toList List(want( partitions = wanted, ttl = 3.days, onFail = p => s"Failed to calculate partition `$p`" )) } } ```