databuild/examples/podcast_reviews/job_lookup.py
2025-06-30 22:15:48 -07:00

60 lines
No EOL
2.1 KiB
Python

#!/usr/bin/env python3
import sys
import json
import re
from collections import defaultdict
def main():
if len(sys.argv) < 2:
print("Usage: job_lookup.py partition_ref [partition_ref...]", file=sys.stderr)
sys.exit(1)
partition_refs = sys.argv[1:]
try:
result = defaultdict(list)
for partition_ref in partition_refs:
job_label = lookup_job_for_partition(partition_ref)
result[job_label].append(partition_ref)
# Output in the format expected by DataBuild
print(json.dumps({k: v for k, v in result.items() if v}))
except Exception as e:
print(f"Error in job lookup: {e}", file=sys.stderr)
sys.exit(1)
def lookup_job_for_partition(partition_ref: str) -> str:
"""Determine which job produces a given partition reference."""
# Extract reviews by date: reviews/date=YYYY-MM-DD
if re.match(r'reviews/date=\d{4}-\d{2}-\d{2}', partition_ref):
return "//:extract_reviews_job"
# Extract all podcasts: podcasts/all
if partition_ref == "podcasts/all":
return "//:extract_podcasts_job"
# Categorized reviews: categorized_reviews/category=CATEGORY/date=YYYY-MM-DD
if re.match(r'categorized_reviews/category=[^/]+/date=\d{4}-\d{2}-\d{2}', partition_ref):
return "//:categorize_reviews_job"
# Phrase models: phrase_models/category=CATEGORY/date=YYYY-MM-DD
if re.match(r'phrase_models/category=[^/]+/date=\d{4}-\d{2}-\d{2}', partition_ref):
return "//:phrase_modeling_job"
# Phrase statistics: phrase_stats/category=CATEGORY/date=YYYY-MM-DD
if re.match(r'phrase_stats/category=[^/]+/date=\d{4}-\d{2}-\d{2}', partition_ref):
return "//:phrase_stats_job"
# Daily summaries: daily_summaries/category=CATEGORY/date=YYYY-MM-DD
if re.match(r'daily_summaries/category=[^/]+/date=\d{4}-\d{2}-\d{2}', partition_ref):
return "//:daily_summary_job"
# If no match found, raise an error
raise ValueError(f"No job found for partition reference: {partition_ref}")
if __name__ == "__main__":
main()