#!/usr/bin/env python3 import sys import json import os from datetime import datetime from pathlib import Path from typing import List, Dict, Any import re from duckdb_utils import create_duckdb_connection, read_dataframe_with_fallback, save_dataframe_with_fallback def main(): if len(sys.argv) < 2: print("Usage: categorize_reviews_job.py {config|exec} [args...]", file=sys.stderr) sys.exit(1) command = sys.argv[1] if command == "config": handle_config(sys.argv[2:]) elif command == "exec": handle_exec(sys.argv[2:]) else: print(f"Unknown command: {command}", file=sys.stderr) sys.exit(1) def parse_partition_ref(partition_ref: str) -> Dict[str, str]: """Parse partition ref like 'categorized_reviews/category=comedy/date=2020-01-01' into components.""" match = re.match(r'categorized_reviews/category=([^/]+)/date=(\d{4}-\d{2}-\d{2})', partition_ref) if not match: raise ValueError(f"Invalid partition ref format: {partition_ref}") return {"category": match.group(1), "date": match.group(2)} def handle_config(args): if len(args) < 1: print("Config mode requires partition ref", file=sys.stderr) sys.exit(1) configs = [] # Process each partition reference for partition_ref in args: try: parsed = parse_partition_ref(partition_ref) category = parsed["category"] date_str = parsed["date"] except ValueError as e: print(f"Error parsing partition ref: {e}", file=sys.stderr) sys.exit(1) # Dependencies: reviews for the date and podcast metadata reviews_ref = f"reviews/date={date_str}" podcasts_ref = "podcasts/all" configs.append({ "outputs": [{"str": partition_ref}], "inputs": [ {"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": reviews_ref}}, {"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": podcasts_ref}} ], "args": [category, date_str], "env": { "PARTITION_REF": partition_ref, "TARGET_CATEGORY": category, "TARGET_DATE": date_str } }) config = {"configs": configs} print(json.dumps(config)) def handle_exec(args): import time, random, os; time.sleep(float(os.getenv('EXEC_SLEEP', '0')) * random.random()) if len(args) < 2: print("Exec mode requires category and date arguments", file=sys.stderr) sys.exit(1) target_category = args[0] target_date = args[1] partition_ref = os.getenv('PARTITION_REF', f'categorized_reviews/category={target_category}/date={target_date}') # Input paths - check for both parquet and CSV fallbacks reviews_base = f"/tmp/databuild_test/examples/podcast_reviews/reviews/date={target_date}/reviews" podcasts_base = "/tmp/databuild_test/examples/podcast_reviews/podcasts/podcasts" reviews_file = None podcasts_file = None # Find reviews file (parquet or CSV) for ext in ['.parquet', '.csv']: candidate = reviews_base + ext if os.path.exists(candidate): reviews_file = candidate break # Find podcasts file (parquet or CSV) for ext in ['.parquet', '.csv']: candidate = podcasts_base + ext if os.path.exists(candidate): podcasts_file = candidate break if not reviews_file: print(f"Reviews file not found: {reviews_base}.(parquet|csv)", file=sys.stderr) sys.exit(1) if not podcasts_file: print(f"Podcasts file not found: {podcasts_base}.(parquet|csv)", file=sys.stderr) sys.exit(1) # Output path output_dir = Path(f"/tmp/databuild_test/examples/podcast_reviews/categorized_reviews/category={target_category}/date={target_date}") output_dir.mkdir(parents=True, exist_ok=True) output_file = output_dir / "categorized_reviews.parquet" try: # Categorize reviews by joining with podcast metadata categorize_reviews_for_category_date(reviews_file, podcasts_file, target_category, str(output_file)) print(f"Successfully categorized reviews for category {target_category} on {target_date}") print(f"Output written to: {output_file}") # Create manifest manifest = { "outputs": [{"str": partition_ref}], "inputs": [ {"str": f"reviews/date={target_date}"}, {"str": "podcasts/all"} ], "start_time": datetime.now().isoformat(), "end_time": datetime.now().isoformat(), "task": { "job": {"label": "//examples/podcast_reviews:categorize_reviews_job"}, "config": { "outputs": [{"str": partition_ref}], "inputs": [ {"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": f"reviews/date={target_date}"}}, {"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": "podcasts/all"}} ], "args": [target_category, target_date], "env": {"PARTITION_REF": partition_ref, "TARGET_CATEGORY": target_category, "TARGET_DATE": target_date} } } } manifest_file = output_dir / "manifest.json" with open(manifest_file, 'w') as f: json.dump(manifest, f, indent=2) except Exception as e: print(f"Error categorizing reviews: {e}", file=sys.stderr) sys.exit(1) def categorize_reviews_for_category_date(reviews_file: str, podcasts_file: str, target_category: str, output_file: str): """Join reviews with podcast categories and filter for target category.""" # Connect to DuckDB with extension handling duckdb_conn = create_duckdb_connection() try: # Read input files with fallback handling reviews_df = read_dataframe_with_fallback(reviews_file, duckdb_conn) podcasts_df = read_dataframe_with_fallback(podcasts_file, duckdb_conn) # Perform join and filtering in pandas import pandas as pd # Join reviews with podcasts joined_df = reviews_df.merge(podcasts_df, on='podcast_id', how='inner') # Filter by category filtered_df = joined_df[ (joined_df['primary_category'] == target_category) | (joined_df['all_categories'].str.contains(target_category, na=False)) ].copy() # Add target category column filtered_df['target_category'] = target_category # Select and rename columns to match expected output result_df = filtered_df[[ 'podcast_id', 'review_title', 'content', 'rating', 'author_id', 'created_at', 'review_date', 'title', 'primary_category', 'all_categories', 'target_category' ]].rename(columns={'title': 'podcast_title'}) # Sort by created_at result_df = result_df.sort_values('created_at') # Save to parquet with fallback save_dataframe_with_fallback(result_df, output_file, duckdb_conn, "parquet") row_count = len(result_df) print(f"Categorized {row_count} reviews for category '{target_category}'") if row_count == 0: print(f"Warning: No reviews found for category '{target_category}'") finally: duckdb_conn.close() if __name__ == "__main__": main()