databuild/design/graph-specification.md
Stuart Axelbrooke e32fea0d58
Some checks are pending
/ setup (push) Waiting to run
Add potential sketch for scala
2025-07-26 20:19:40 -07:00

7.1 KiB

App Specification

AKA the different ways databuild applications can be described.

Correctness Strategy

  • Examples implemented that use each graph specification strategy, and are tested in CI/CD.
  • Graph specification strategies provide

Bazel

  • Purpose: compilation/build target that fulfills promise of project (like bytecode for JVM langs)
  • Job binaries (config and exec)
  • Graph lookup binary (lookup)
  • Job target (config and exec)
  • Graph target (build and analyze)
  • See core build for details

Python

  • Wrapper functions enable graph registry
  • Partition object increases ergonomics and enables explicit data coupling

from dataclasses import dataclass
from databuild import (
    DataBuildGraph, DataBuildJob, Partition, JobConfig, PyJobConfig, BazelJobConfig, PartitionManifest, Want
)
from helpers import ingest_reviews, categorize_reviews, sla_failure_notify
from datetime import datetime, timedelta

graph = DataBuildGraph("//:podcast_reviews_graph")

ALL_CATEGORIES = {"comedy", ...}

# Partition definitions, used by the graph to resolve jobs by introspecting their config signatures
ExtractedReviews = Partition[r"reviews/date=(?P<date>\d{4}-\d{2}-\d{2})"]
CategorizedReviews = Partition[r"categorized_reviews/category=(?P<category>[^/]+)/date=(?P<date>\d{4}-\d{2}-\d{2})"]
PhraseModel = Partition[r"phrase_models/category=(?P<category>[^/]+)/date=(?P<date>\d{4}-\d{2}-\d{2})"]
PhraseStats = Partition[r"phrase_stats/category=(?P<category>[^/]+)/date=(?P<date>\d{4}-\d{2}-\d{2})"]


@graph.job
class ExtractReviews(DataBuildJob):
    def config(self, outputs: list[ExtractedReviews]) -> list[JobConfig]:
        # One job run can output multiple partitions
        args = [p.date for p in outputs]
        return [JobConfig(outputs=outputs, inputs=[], args=args,)]

    def exec(self, config: JobConfig) -> PartitionManifest:
        for (date, output) in zip(config.args, config.outputs):
            ingest_reviews(date).write(output)
        # Start and end time inferred by wrapper (but could be overridden)
        return config.partitionManifest(job=self)


@dataclass
class CategorizeReviewsArgs:
    date: str
    category: str


@graph.job
class CategorizeReviews(DataBuildJob):
    def config(self, outputs: list[CategorizedReviews]) -> list[JobConfig]:
        # This job only outputs one partition per run
        return [
            # The PyJobConfig allows you to pass objects in config, rather than just `args` and `env`
            PyJobConfig[CategorizeReviewsArgs](
                outputs=[p],
                inputs=ExtractedReviews.dep.materialize(date=p.date),
                params=CategorizeReviewsArgs(date=p.date, category=p.category),
            )
            for p in outputs
        ]

    def exec(self, config: PyJobConfig[CategorizeReviewsArgs]) -> None:
        categorize_reviews(config.params.date, config.params.category)
        # Partition manifest automatically constructed from config


@graph.job
class PhraseModeling(DataBuildJob):
    def config(self, outputs: list[PhraseModel]) -> list[JobConfig]:
        # This job relies on a bazel executable target to run the actual job
        return [
            BazelJobConfig(
                outputs=[p],
                inputs=[CategorizedReviews.dep.materialize(date=p.date, category=p.category)],
                exec_target="//jobs:phrase_modeling",
                env={"CATEGORY": p.category, "DATA_DATE": p.date},
            )
            for p in outputs
        ]


# This job is fully defined in bazel
graph.bazel_job(target="//jobs:phrase_stats_job", outputs=list[PhraseStats])


@graph.want(cron='0 0 * * *')
def phrase_stats_want() -> list[Want[PhraseStats]]:
    # Crates a new want every midnight that times out in 3 days
    wanted = [PhraseStats(date=datetime.now().date().isoformat(), category=cat) for cat in ALL_CATEGORIES]
    on_fail = lambda p: f"Failed to calculate partition `{p}`"
    return [graph.want(partitions=wanted, ttl=timedelta(days=3), on_fail=on_fail)]

  • TODO - do we need an escape hatch for "after 2025 use this job, before use that job" functionality?

Rust?

Scala?

import databuild._
import scala.concurrent.duration._
import java.time.LocalDate

object PodcastReviewsGraph extends DataBuildGraph("//:podcast_reviews_graph") {

  val AllCategories = Set("comedy", ???)

  case class DatePartition(date: String)
  case class CategoryDatePartition(category: String, date: String)
  
  
  // Partition definitions using extractors
  object ExtractedReviews extends Partition[DatePartition](
    """reviews/date=(?P<date>\d{4}-\d{2}-\d{2})""".r
  )

  object CategorizedReviews extends Partition[CategoryDatePartition](
    """categorized_reviews/category=(?P<category>[^/]+)/date=(?P<date>\d{4}-\d{2}-\d{2})""".r
  )

  object PhraseModel extends Partition[CategoryDatePartition](
    """phrase_models/category=(?P<category>[^/]+)/date=(?P<date>\d{4}-\d{2}-\d{2})""".r
  )

  object PhraseStats extends Partition[CategoryDatePartition](
    """phrase_stats/category=(?P<category>[^/]+)/date=(?P<date>\d{4}-\d{2}-\d{2})""".r
  )

  // Job definitions
  @job
  object ExtractReviewsJob extends DataBuildJob[ExtractedReviews] {
    def config(outputs: List[ExtractedReviews]): List[JobConfig] = {
      val args = outputs.map(_.date)
      List(JobConfig(
        outputs = outputs,
        inputs = Nil,
        args = args
      ))
    }

    def exec(config: JobConfig): PartitionManifest = {
      config.args.zip(config.outputs).foreach { case (date, output) =>
        ingestReviews(date).writeTo(output)
      }
      config.toPartitionManifest(this)
    }
  }

  @job
  object CategorizeReviewsJob extends DataBuildJob[CategorizedReviews] {
    case class Args(date: String, category: String)

    def config(outputs: List[CategorizedReviews]): List[JobConfig] = {
      outputs.map { p =>
        ScalaJobConfig[Args](
          outputs = List(p),
          inputs = ExtractedReviews.dep.materialize(date = p.date),
          params = Args(p.date, p.category)
        )
      }
    }

    def exec(config: ScalaJobConfig[Args]): Unit = {
      categorizeReviews(config.params.date, config.params.category)
      // Partition manifest auto-constructed
    }
  }

  @job
  object PhraseModelingJob extends DataBuildJob[PhraseModel] {
    def config(outputs: List[PhraseModel]): List[JobConfig] = {
      outputs.map { p =>
        BazelJobConfig(
          outputs = List(p),
          inputs = List(CategorizedReviews.dep.materialize(
            category = p.category,
            date = p.date
          )),
          execTarget = "//jobs:phrase_modeling",
          env = Map("CATEGORY" -> p.category, "DATA_DATE" -> p.date)
        )
      }
    }
  }

  // External bazel job
  bazelJob("//jobs:phrase_stats_job", outputType = classOf[PhraseStats])

  // Want definition
  @want(cron = "0 0 * * *")
  def phraseStatsWant(): List[Want[PhraseStats]] = {
    val today = LocalDate.now().toString
    val wanted = AllCategories.map(cat => PhraseStats(cat, today)).toList

    List(want(
      partitions = wanted,
      ttl = 3.days,
      onFail = p => s"Failed to calculate partition `$p`"
    ))
  }
}