60 lines
No EOL
2.1 KiB
Python
60 lines
No EOL
2.1 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import sys
|
|
import json
|
|
import re
|
|
from collections import defaultdict
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print("Usage: job_lookup.py partition_ref [partition_ref...]", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
partition_refs = sys.argv[1:]
|
|
|
|
try:
|
|
result = defaultdict(list)
|
|
|
|
for partition_ref in partition_refs:
|
|
job_label = lookup_job_for_partition(partition_ref)
|
|
result[job_label].append(partition_ref)
|
|
|
|
# Output in the format expected by DataBuild
|
|
print(json.dumps({k: v for k, v in result.items() if v}))
|
|
|
|
except Exception as e:
|
|
print(f"Error in job lookup: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def lookup_job_for_partition(partition_ref: str) -> str:
|
|
"""Determine which job produces a given partition reference."""
|
|
|
|
# Extract reviews by date: reviews/date=YYYY-MM-DD
|
|
if re.match(r'reviews/date=\d{4}-\d{2}-\d{2}', partition_ref):
|
|
return "//:extract_reviews_job"
|
|
|
|
# Extract all podcasts: podcasts/all
|
|
if partition_ref == "podcasts/all":
|
|
return "//:extract_podcasts_job"
|
|
|
|
# Categorized reviews: categorized_reviews/category=CATEGORY/date=YYYY-MM-DD
|
|
if re.match(r'categorized_reviews/category=[^/]+/date=\d{4}-\d{2}-\d{2}', partition_ref):
|
|
return "//:categorize_reviews_job"
|
|
|
|
# Phrase models: phrase_models/category=CATEGORY/date=YYYY-MM-DD
|
|
if re.match(r'phrase_models/category=[^/]+/date=\d{4}-\d{2}-\d{2}', partition_ref):
|
|
return "//:phrase_modeling_job"
|
|
|
|
# Phrase statistics: phrase_stats/category=CATEGORY/date=YYYY-MM-DD
|
|
if re.match(r'phrase_stats/category=[^/]+/date=\d{4}-\d{2}-\d{2}', partition_ref):
|
|
return "//:phrase_stats_job"
|
|
|
|
# Daily summaries: daily_summaries/category=CATEGORY/date=YYYY-MM-DD
|
|
if re.match(r'daily_summaries/category=[^/]+/date=\d{4}-\d{2}-\d{2}', partition_ref):
|
|
return "//:daily_summary_job"
|
|
|
|
# If no match found, raise an error
|
|
raise ValueError(f"No job found for partition reference: {partition_ref}")
|
|
|
|
if __name__ == "__main__":
|
|
main() |