From e32fea0d58826edd92638fcc8de47f9d9eec7747 Mon Sep 17 00:00:00 2001 From: Stuart Axelbrooke Date: Sat, 26 Jul 2025 20:19:40 -0700 Subject: [PATCH] Add potential sketch for scala --- design/graph-specification.md | 103 ++++++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/design/graph-specification.md b/design/graph-specification.md index eedfa7e..87a7f42 100644 --- a/design/graph-specification.md +++ b/design/graph-specification.md @@ -113,4 +113,107 @@ def phrase_stats_want() -> list[Want[PhraseStats]]: ## Rust? ## Scala? +```scala +import databuild._ +import scala.concurrent.duration._ +import java.time.LocalDate +object PodcastReviewsGraph extends DataBuildGraph("//:podcast_reviews_graph") { + + val AllCategories = Set("comedy", ???) + + case class DatePartition(date: String) + case class CategoryDatePartition(category: String, date: String) + + + // Partition definitions using extractors + object ExtractedReviews extends Partition[DatePartition]( + """reviews/date=(?P\d{4}-\d{2}-\d{2})""".r + ) + + object CategorizedReviews extends Partition[CategoryDatePartition]( + """categorized_reviews/category=(?P[^/]+)/date=(?P\d{4}-\d{2}-\d{2})""".r + ) + + object PhraseModel extends Partition[CategoryDatePartition]( + """phrase_models/category=(?P[^/]+)/date=(?P\d{4}-\d{2}-\d{2})""".r + ) + + object PhraseStats extends Partition[CategoryDatePartition]( + """phrase_stats/category=(?P[^/]+)/date=(?P\d{4}-\d{2}-\d{2})""".r + ) + + // Job definitions + @job + object ExtractReviewsJob extends DataBuildJob[ExtractedReviews] { + def config(outputs: List[ExtractedReviews]): List[JobConfig] = { + val args = outputs.map(_.date) + List(JobConfig( + outputs = outputs, + inputs = Nil, + args = args + )) + } + + def exec(config: JobConfig): PartitionManifest = { + config.args.zip(config.outputs).foreach { case (date, output) => + ingestReviews(date).writeTo(output) + } + config.toPartitionManifest(this) + } + } + + @job + object CategorizeReviewsJob extends DataBuildJob[CategorizedReviews] { + case class Args(date: String, category: String) + + def config(outputs: List[CategorizedReviews]): List[JobConfig] = { + outputs.map { p => + ScalaJobConfig[Args]( + outputs = List(p), + inputs = ExtractedReviews.dep.materialize(date = p.date), + params = Args(p.date, p.category) + ) + } + } + + def exec(config: ScalaJobConfig[Args]): Unit = { + categorizeReviews(config.params.date, config.params.category) + // Partition manifest auto-constructed + } + } + + @job + object PhraseModelingJob extends DataBuildJob[PhraseModel] { + def config(outputs: List[PhraseModel]): List[JobConfig] = { + outputs.map { p => + BazelJobConfig( + outputs = List(p), + inputs = List(CategorizedReviews.dep.materialize( + category = p.category, + date = p.date + )), + execTarget = "//jobs:phrase_modeling", + env = Map("CATEGORY" -> p.category, "DATA_DATE" -> p.date) + ) + } + } + } + + // External bazel job + bazelJob("//jobs:phrase_stats_job", outputType = classOf[PhraseStats]) + + // Want definition + @want(cron = "0 0 * * *") + def phraseStatsWant(): List[Want[PhraseStats]] = { + val today = LocalDate.now().toString + val wanted = AllCategories.map(cat => PhraseStats(cat, today)).toList + + List(want( + partitions = wanted, + ttl = 3.days, + onFail = p => s"Failed to calculate partition `$p`" + )) + } +} +```