219 lines
7.1 KiB
Markdown
219 lines
7.1 KiB
Markdown
|
|
# App Specification
|
|
|
|
AKA the different ways databuild applications can be described.
|
|
|
|
## Correctness Strategy
|
|
- Examples implemented that use each graph specification strategy, and are tested in CI/CD.
|
|
- Graph specification strategies provide
|
|
|
|
## Bazel
|
|
|
|
- Purpose: compilation/build target that fulfills promise of project (like bytecode for JVM langs)
|
|
- Job binaries (config and exec)
|
|
- Graph lookup binary (lookup)
|
|
- Job target (config and exec)
|
|
- Graph target (build and analyze)
|
|
- See [core build](./core-build.md) for details
|
|
|
|
## Python
|
|
|
|
- Wrapper functions enable graph registry
|
|
- Partition object increases ergonomics and enables explicit data coupling
|
|
|
|
```python
|
|
|
|
from dataclasses import dataclass
|
|
from databuild import (
|
|
DataBuildGraph, DataBuildJob, Partition, JobConfig, PyJobConfig, BazelJobConfig, PartitionManifest, Want
|
|
)
|
|
from helpers import ingest_reviews, categorize_reviews, sla_failure_notify
|
|
from datetime import datetime, timedelta
|
|
|
|
graph = DataBuildGraph("//:podcast_reviews_graph")
|
|
|
|
ALL_CATEGORIES = {"comedy", ...}
|
|
|
|
# Partition definitions, used by the graph to resolve jobs by introspecting their config signatures
|
|
ExtractedReviews = Partition[r"reviews/date=(?P<date>\d{4}-\d{2}-\d{2})"]
|
|
CategorizedReviews = Partition[r"categorized_reviews/category=(?P<category>[^/]+)/date=(?P<date>\d{4}-\d{2}-\d{2})"]
|
|
PhraseModel = Partition[r"phrase_models/category=(?P<category>[^/]+)/date=(?P<date>\d{4}-\d{2}-\d{2})"]
|
|
PhraseStats = Partition[r"phrase_stats/category=(?P<category>[^/]+)/date=(?P<date>\d{4}-\d{2}-\d{2})"]
|
|
|
|
|
|
@graph.job
|
|
class ExtractReviews(DataBuildJob):
|
|
def config(self, outputs: list[ExtractedReviews]) -> list[JobConfig]:
|
|
# One job run can output multiple partitions
|
|
args = [p.date for p in outputs]
|
|
return [JobConfig(outputs=outputs, inputs=[], args=args,)]
|
|
|
|
def exec(self, config: JobConfig) -> PartitionManifest:
|
|
for (date, output) in zip(config.args, config.outputs):
|
|
ingest_reviews(date).write(output)
|
|
# Start and end time inferred by wrapper (but could be overridden)
|
|
return config.partitionManifest(job=self)
|
|
|
|
|
|
@dataclass
|
|
class CategorizeReviewsArgs:
|
|
date: str
|
|
category: str
|
|
|
|
|
|
@graph.job
|
|
class CategorizeReviews(DataBuildJob):
|
|
def config(self, outputs: list[CategorizedReviews]) -> list[JobConfig]:
|
|
# This job only outputs one partition per run
|
|
return [
|
|
# The PyJobConfig allows you to pass objects in config, rather than just `args` and `env`
|
|
PyJobConfig[CategorizeReviewsArgs](
|
|
outputs=[p],
|
|
inputs=ExtractedReviews.dep.materialize(date=p.date),
|
|
params=CategorizeReviewsArgs(date=p.date, category=p.category),
|
|
)
|
|
for p in outputs
|
|
]
|
|
|
|
def exec(self, config: PyJobConfig[CategorizeReviewsArgs]) -> None:
|
|
categorize_reviews(config.params.date, config.params.category)
|
|
# Partition manifest automatically constructed from config
|
|
|
|
|
|
@graph.job
|
|
class PhraseModeling(DataBuildJob):
|
|
def config(self, outputs: list[PhraseModel]) -> list[JobConfig]:
|
|
# This job relies on a bazel executable target to run the actual job
|
|
return [
|
|
BazelJobConfig(
|
|
outputs=[p],
|
|
inputs=[CategorizedReviews.dep.materialize(date=p.date, category=p.category)],
|
|
exec_target="//jobs:phrase_modeling",
|
|
env={"CATEGORY": p.category, "DATA_DATE": p.date},
|
|
)
|
|
for p in outputs
|
|
]
|
|
|
|
|
|
# This job is fully defined in bazel
|
|
graph.bazel_job(target="//jobs:phrase_stats_job", outputs=list[PhraseStats])
|
|
|
|
|
|
@graph.want(cron='0 0 * * *')
|
|
def phrase_stats_want() -> list[Want[PhraseStats]]:
|
|
# Crates a new want every midnight that times out in 3 days
|
|
wanted = [PhraseStats(date=datetime.now().date().isoformat(), category=cat) for cat in ALL_CATEGORIES]
|
|
on_fail = lambda p: f"Failed to calculate partition `{p}`"
|
|
return [graph.want(partitions=wanted, ttl=timedelta(days=3), on_fail=on_fail)]
|
|
|
|
```
|
|
|
|
- TODO - do we need an escape hatch for "after 2025 use this job, before use that job" functionality?
|
|
|
|
## Rust?
|
|
|
|
## Scala?
|
|
```scala
|
|
import databuild._
|
|
import scala.concurrent.duration._
|
|
import java.time.LocalDate
|
|
|
|
object PodcastReviewsGraph extends DataBuildGraph("//:podcast_reviews_graph") {
|
|
|
|
val AllCategories = Set("comedy", ???)
|
|
|
|
case class DatePartition(date: String)
|
|
case class CategoryDatePartition(category: String, date: String)
|
|
|
|
|
|
// Partition definitions using extractors
|
|
object ExtractedReviews extends Partition[DatePartition](
|
|
"""reviews/date=(?P<date>\d{4}-\d{2}-\d{2})""".r
|
|
)
|
|
|
|
object CategorizedReviews extends Partition[CategoryDatePartition](
|
|
"""categorized_reviews/category=(?P<category>[^/]+)/date=(?P<date>\d{4}-\d{2}-\d{2})""".r
|
|
)
|
|
|
|
object PhraseModel extends Partition[CategoryDatePartition](
|
|
"""phrase_models/category=(?P<category>[^/]+)/date=(?P<date>\d{4}-\d{2}-\d{2})""".r
|
|
)
|
|
|
|
object PhraseStats extends Partition[CategoryDatePartition](
|
|
"""phrase_stats/category=(?P<category>[^/]+)/date=(?P<date>\d{4}-\d{2}-\d{2})""".r
|
|
)
|
|
|
|
// Job definitions
|
|
@job
|
|
object ExtractReviewsJob extends DataBuildJob[ExtractedReviews] {
|
|
def config(outputs: List[ExtractedReviews]): List[JobConfig] = {
|
|
val args = outputs.map(_.date)
|
|
List(JobConfig(
|
|
outputs = outputs,
|
|
inputs = Nil,
|
|
args = args
|
|
))
|
|
}
|
|
|
|
def exec(config: JobConfig): PartitionManifest = {
|
|
config.args.zip(config.outputs).foreach { case (date, output) =>
|
|
ingestReviews(date).writeTo(output)
|
|
}
|
|
config.toPartitionManifest(this)
|
|
}
|
|
}
|
|
|
|
@job
|
|
object CategorizeReviewsJob extends DataBuildJob[CategorizedReviews] {
|
|
case class Args(date: String, category: String)
|
|
|
|
def config(outputs: List[CategorizedReviews]): List[JobConfig] = {
|
|
outputs.map { p =>
|
|
ScalaJobConfig[Args](
|
|
outputs = List(p),
|
|
inputs = ExtractedReviews.dep.materialize(date = p.date),
|
|
params = Args(p.date, p.category)
|
|
)
|
|
}
|
|
}
|
|
|
|
def exec(config: ScalaJobConfig[Args]): Unit = {
|
|
categorizeReviews(config.params.date, config.params.category)
|
|
// Partition manifest auto-constructed
|
|
}
|
|
}
|
|
|
|
@job
|
|
object PhraseModelingJob extends DataBuildJob[PhraseModel] {
|
|
def config(outputs: List[PhraseModel]): List[JobConfig] = {
|
|
outputs.map { p =>
|
|
BazelJobConfig(
|
|
outputs = List(p),
|
|
inputs = List(CategorizedReviews.dep.materialize(
|
|
category = p.category,
|
|
date = p.date
|
|
)),
|
|
execTarget = "//jobs:phrase_modeling",
|
|
env = Map("CATEGORY" -> p.category, "DATA_DATE" -> p.date)
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
// External bazel job
|
|
bazelJob("//jobs:phrase_stats_job", outputType = classOf[PhraseStats])
|
|
|
|
// Want definition
|
|
@want(cron = "0 0 * * *")
|
|
def phraseStatsWant(): List[Want[PhraseStats]] = {
|
|
val today = LocalDate.now().toString
|
|
val wanted = AllCategories.map(cat => PhraseStats(cat, today)).toList
|
|
|
|
List(want(
|
|
partitions = wanted,
|
|
ttl = 3.days,
|
|
onFail = p => s"Failed to calculate partition `$p`"
|
|
))
|
|
}
|
|
}
|
|
```
|