#!/usr/bin/env python3 import sys import json import os import duckdb from datetime import datetime from pathlib import Path from typing import List, Dict, Any import re def main(): if len(sys.argv) < 2: print("Usage: daily_summary_job.py {config|exec} [args...]", file=sys.stderr) sys.exit(1) command = sys.argv[1] if command == "config": handle_config(sys.argv[2:]) elif command == "exec": handle_exec(sys.argv[2:]) else: print(f"Unknown command: {command}", file=sys.stderr) sys.exit(1) def parse_partition_ref(partition_ref: str) -> Dict[str, str]: """Parse partition ref like 'daily_summaries/category=comedy/date=2020-01-01' into components.""" match = re.match(r'daily_summaries/category=([^/]+)/date=(\d{4}-\d{2}-\d{2})', partition_ref) if not match: raise ValueError(f"Invalid partition ref format: {partition_ref}") return {"category": match.group(1), "date": match.group(2)} def handle_config(args): if len(args) < 1: print("Config mode requires partition ref", file=sys.stderr) sys.exit(1) configs = [] # Process each partition reference for partition_ref in args: try: parsed = parse_partition_ref(partition_ref) category = parsed["category"] date_str = parsed["date"] except ValueError as e: print(f"Error parsing partition ref: {e}", file=sys.stderr) sys.exit(1) # Dependencies: phrase stats and categorized reviews for the category and date phrase_stats_ref = f"phrase_stats/category={category}/date={date_str}" categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}" configs.append({ "outputs": [{"str": partition_ref}], "inputs": [ {"dep_type": 1, "partition_ref": {"str": phrase_stats_ref}}, {"dep_type": 1, "partition_ref": {"str": categorized_reviews_ref}} ], "args": [category, date_str], "env": { "PARTITION_REF": partition_ref, "TARGET_CATEGORY": category, "TARGET_DATE": date_str } }) config = {"configs": configs} print(json.dumps(config)) def handle_exec(args): if len(args) < 2: print("Exec mode requires category and date arguments", file=sys.stderr) sys.exit(1) target_category = args[0] target_date = args[1] partition_ref = os.getenv('PARTITION_REF', f'daily_summaries/category={target_category}/date={target_date}') # Input paths phrase_stats_file = f"/tmp/databuild_test/examples/podcast_reviews/phrase_stats/category={target_category}/date={target_date}/phrase_stats.parquet" categorized_reviews_file = f"/tmp/databuild_test/examples/podcast_reviews/categorized_reviews/category={target_category}/date={target_date}/categorized_reviews.parquet" # Check input files exist if not os.path.exists(phrase_stats_file): print(f"Phrase stats file not found: {phrase_stats_file}", file=sys.stderr) sys.exit(1) if not os.path.exists(categorized_reviews_file): print(f"Categorized reviews file not found: {categorized_reviews_file}", file=sys.stderr) sys.exit(1) # Output path output_dir = Path(f"/tmp/databuild_test/examples/podcast_reviews/daily_summaries/category={target_category}/date={target_date}") output_dir.mkdir(parents=True, exist_ok=True) output_file = output_dir / "daily_summary.parquet" try: # Generate daily summary combining phrase stats and recent reviews generate_daily_summary_for_category_date( phrase_stats_file, categorized_reviews_file, target_category, target_date, str(output_file) ) print(f"Successfully generated daily summary for category {target_category} on {target_date}") print(f"Output written to: {output_file}") # Create manifest manifest = { "outputs": [{"str": partition_ref}], "inputs": [ {"str": f"phrase_stats/category={target_category}/date={target_date}"}, {"str": f"categorized_reviews/category={target_category}/date={target_date}"} ], "start_time": datetime.now().isoformat(), "end_time": datetime.now().isoformat(), "task": { "job": {"label": "//examples/podcast_reviews:daily_summary_job"}, "config": { "outputs": [{"str": partition_ref}], "inputs": [ {"dep_type": 1, "partition_ref": {"str": f"phrase_stats/category={target_category}/date={target_date}"}}, {"dep_type": 1, "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}} ], "args": [target_category, target_date], "env": {"PARTITION_REF": partition_ref, "TARGET_CATEGORY": target_category, "TARGET_DATE": target_date} } } } manifest_file = output_dir / "manifest.json" with open(manifest_file, 'w') as f: json.dump(manifest, f, indent=2) except Exception as e: print(f"Error generating daily summary: {e}", file=sys.stderr) sys.exit(1) def generate_daily_summary_for_category_date( phrase_stats_file: str, categorized_reviews_file: str, target_category: str, target_date: str, output_file: str ): """Generate daily summary combining top phrases and recent reviews.""" # Connect to DuckDB for processing duckdb_conn = duckdb.connect() try: # Try to install and load parquet extension, but don't fail if it's already installed try: duckdb_conn.execute("INSTALL parquet") except Exception: pass # Extension might already be installed duckdb_conn.execute("LOAD parquet") # Check if we have data phrase_count = duckdb_conn.execute(f"SELECT COUNT(*) FROM parquet_scan('{phrase_stats_file}')").fetchone()[0] review_count = duckdb_conn.execute(f"SELECT COUNT(*) FROM parquet_scan('{categorized_reviews_file}')").fetchone()[0] if phrase_count == 0 and review_count == 0: print(f"No data found, creating empty daily summary") create_empty_daily_summary(target_category, target_date, output_file, duckdb_conn) return # Query to generate comprehensive daily summary query = f""" WITH top_phrases_per_podcast AS ( SELECT podcast_id, podcast_title, ngram, count as phrase_count, avg_rating as phrase_avg_rating, weighted_score, ROW_NUMBER() OVER (PARTITION BY podcast_id ORDER BY weighted_score DESC) as phrase_rank FROM parquet_scan('{phrase_stats_file}') WHERE ngram IS NOT NULL ), podcast_phrase_summary AS ( SELECT podcast_id, podcast_title, STRING_AGG(ngram, '; ' ORDER BY weighted_score DESC) as top_phrases, COUNT(*) as total_phrases, AVG(phrase_avg_rating) as avg_phrase_rating, SUM(weighted_score) as total_phrase_score FROM top_phrases_per_podcast WHERE phrase_rank <= 5 -- Top 5 phrases per podcast GROUP BY podcast_id, podcast_title ), podcast_review_summary AS ( SELECT podcast_id, podcast_title, COUNT(*) as review_count, AVG(rating::FLOAT) as avg_rating, MIN(rating) as min_rating, MAX(rating) as max_rating, COUNT(CASE WHEN rating >= 4 THEN 1 END) as positive_reviews, COUNT(CASE WHEN rating <= 2 THEN 1 END) as negative_reviews, STRING_AGG( CASE WHEN rating <= 2 AND length(content) > 20 THEN substring(content, 1, 200) || '...' ELSE NULL END, ' | ' ORDER BY rating ASC, length(content) DESC ) as sample_negative_reviews FROM parquet_scan('{categorized_reviews_file}') WHERE podcast_title IS NOT NULL GROUP BY podcast_id, podcast_title ), daily_summary AS ( SELECT '{target_date}' as date, '{target_category}' as category, COALESCE(pps.podcast_id, prs.podcast_id) as podcast_id, COALESCE(pps.podcast_title, prs.podcast_title) as podcast_title, COALESCE(prs.review_count, 0) as review_count, COALESCE(prs.avg_rating, 0.0) as avg_rating, COALESCE(prs.positive_reviews, 0) as positive_reviews, COALESCE(prs.negative_reviews, 0) as negative_reviews, COALESCE(pps.top_phrases, 'No significant phrases') as top_phrases, COALESCE(pps.total_phrases, 0) as total_phrases, COALESCE(pps.avg_phrase_rating, 0.0) as avg_phrase_rating, COALESCE(pps.total_phrase_score, 0.0) as total_phrase_score, prs.sample_negative_reviews, CASE WHEN prs.avg_rating >= 4.0 AND pps.avg_phrase_rating >= 4.0 THEN 'Highly Positive' WHEN prs.avg_rating >= 3.5 THEN 'Positive' WHEN prs.avg_rating >= 2.5 THEN 'Mixed' WHEN prs.avg_rating >= 1.5 THEN 'Negative' ELSE 'Highly Negative' END as sentiment_category, (prs.review_count * prs.avg_rating * 0.6 + pps.total_phrase_score * 0.4) as overall_score FROM podcast_phrase_summary pps FULL OUTER JOIN podcast_review_summary prs ON pps.podcast_id = prs.podcast_id WHERE COALESCE(prs.review_count, 0) > 0 OR COALESCE(pps.total_phrases, 0) > 0 ) SELECT date, category, podcast_id, podcast_title, review_count, avg_rating, positive_reviews, negative_reviews, top_phrases, total_phrases, avg_phrase_rating, total_phrase_score, sample_negative_reviews, sentiment_category, overall_score FROM daily_summary ORDER BY overall_score DESC, review_count DESC """ # Execute query and save to parquet duckdb_conn.execute(f"COPY ({query}) TO '{output_file}' (FORMAT PARQUET)") # Get row count for logging count_result = duckdb_conn.execute(f"SELECT COUNT(*) FROM ({query})").fetchone() row_count = count_result[0] if count_result else 0 print(f"Generated daily summary for {row_count} podcasts") if row_count == 0: print(f"Warning: No summary data generated for category '{target_category}' on date '{target_date}'") create_empty_daily_summary(target_category, target_date, output_file, duckdb_conn) finally: duckdb_conn.close() def create_empty_daily_summary(category: str, date: str, output_file: str, duckdb_conn): """Create empty daily summary parquet file with correct schema.""" duckdb_conn.execute("DROP TABLE IF EXISTS empty_daily_summary") duckdb_conn.execute(""" CREATE TABLE empty_daily_summary ( date VARCHAR, category VARCHAR, podcast_id VARCHAR, podcast_title VARCHAR, review_count BIGINT, avg_rating DOUBLE, positive_reviews BIGINT, negative_reviews BIGINT, top_phrases VARCHAR, total_phrases BIGINT, avg_phrase_rating DOUBLE, total_phrase_score DOUBLE, sample_negative_reviews VARCHAR, sentiment_category VARCHAR, overall_score DOUBLE ) """) duckdb_conn.execute(f"COPY empty_daily_summary TO '{output_file}' (FORMAT PARQUET)") print("Created empty daily summary file") if __name__ == "__main__": main()