This commit is contained in:
parent
111e6d9987
commit
e32fea0d58
1 changed files with 103 additions and 0 deletions
|
|
@ -113,4 +113,107 @@ def phrase_stats_want() -> list[Want[PhraseStats]]:
|
||||||
## Rust?
|
## Rust?
|
||||||
|
|
||||||
## Scala?
|
## Scala?
|
||||||
|
```scala
|
||||||
|
import databuild._
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
import java.time.LocalDate
|
||||||
|
|
||||||
|
object PodcastReviewsGraph extends DataBuildGraph("//:podcast_reviews_graph") {
|
||||||
|
|
||||||
|
val AllCategories = Set("comedy", ???)
|
||||||
|
|
||||||
|
case class DatePartition(date: String)
|
||||||
|
case class CategoryDatePartition(category: String, date: String)
|
||||||
|
|
||||||
|
|
||||||
|
// Partition definitions using extractors
|
||||||
|
object ExtractedReviews extends Partition[DatePartition](
|
||||||
|
"""reviews/date=(?P<date>\d{4}-\d{2}-\d{2})""".r
|
||||||
|
)
|
||||||
|
|
||||||
|
object CategorizedReviews extends Partition[CategoryDatePartition](
|
||||||
|
"""categorized_reviews/category=(?P<category>[^/]+)/date=(?P<date>\d{4}-\d{2}-\d{2})""".r
|
||||||
|
)
|
||||||
|
|
||||||
|
object PhraseModel extends Partition[CategoryDatePartition](
|
||||||
|
"""phrase_models/category=(?P<category>[^/]+)/date=(?P<date>\d{4}-\d{2}-\d{2})""".r
|
||||||
|
)
|
||||||
|
|
||||||
|
object PhraseStats extends Partition[CategoryDatePartition](
|
||||||
|
"""phrase_stats/category=(?P<category>[^/]+)/date=(?P<date>\d{4}-\d{2}-\d{2})""".r
|
||||||
|
)
|
||||||
|
|
||||||
|
// Job definitions
|
||||||
|
@job
|
||||||
|
object ExtractReviewsJob extends DataBuildJob[ExtractedReviews] {
|
||||||
|
def config(outputs: List[ExtractedReviews]): List[JobConfig] = {
|
||||||
|
val args = outputs.map(_.date)
|
||||||
|
List(JobConfig(
|
||||||
|
outputs = outputs,
|
||||||
|
inputs = Nil,
|
||||||
|
args = args
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
def exec(config: JobConfig): PartitionManifest = {
|
||||||
|
config.args.zip(config.outputs).foreach { case (date, output) =>
|
||||||
|
ingestReviews(date).writeTo(output)
|
||||||
|
}
|
||||||
|
config.toPartitionManifest(this)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@job
|
||||||
|
object CategorizeReviewsJob extends DataBuildJob[CategorizedReviews] {
|
||||||
|
case class Args(date: String, category: String)
|
||||||
|
|
||||||
|
def config(outputs: List[CategorizedReviews]): List[JobConfig] = {
|
||||||
|
outputs.map { p =>
|
||||||
|
ScalaJobConfig[Args](
|
||||||
|
outputs = List(p),
|
||||||
|
inputs = ExtractedReviews.dep.materialize(date = p.date),
|
||||||
|
params = Args(p.date, p.category)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def exec(config: ScalaJobConfig[Args]): Unit = {
|
||||||
|
categorizeReviews(config.params.date, config.params.category)
|
||||||
|
// Partition manifest auto-constructed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@job
|
||||||
|
object PhraseModelingJob extends DataBuildJob[PhraseModel] {
|
||||||
|
def config(outputs: List[PhraseModel]): List[JobConfig] = {
|
||||||
|
outputs.map { p =>
|
||||||
|
BazelJobConfig(
|
||||||
|
outputs = List(p),
|
||||||
|
inputs = List(CategorizedReviews.dep.materialize(
|
||||||
|
category = p.category,
|
||||||
|
date = p.date
|
||||||
|
)),
|
||||||
|
execTarget = "//jobs:phrase_modeling",
|
||||||
|
env = Map("CATEGORY" -> p.category, "DATA_DATE" -> p.date)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// External bazel job
|
||||||
|
bazelJob("//jobs:phrase_stats_job", outputType = classOf[PhraseStats])
|
||||||
|
|
||||||
|
// Want definition
|
||||||
|
@want(cron = "0 0 * * *")
|
||||||
|
def phraseStatsWant(): List[Want[PhraseStats]] = {
|
||||||
|
val today = LocalDate.now().toString
|
||||||
|
val wanted = AllCategories.map(cat => PhraseStats(cat, today)).toList
|
||||||
|
|
||||||
|
List(want(
|
||||||
|
partitions = wanted,
|
||||||
|
ttl = 3.days,
|
||||||
|
onFail = p => s"Failed to calculate partition `$p`"
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue