databuild/examples/podcast_reviews/phrase_stats_job.py
2025-07-20 16:01:40 -07:00

265 lines
No EOL
9.9 KiB
Python

#!/usr/bin/env python3
import sys
import json
import os
import duckdb
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any
import re
def main():
if len(sys.argv) < 2:
print("Usage: phrase_stats_job.py {config|exec} [args...]", file=sys.stderr)
sys.exit(1)
command = sys.argv[1]
if command == "config":
handle_config(sys.argv[2:])
elif command == "exec":
handle_exec(sys.argv[2:])
else:
print(f"Unknown command: {command}", file=sys.stderr)
sys.exit(1)
def parse_partition_ref(partition_ref: str) -> Dict[str, str]:
"""Parse partition ref like 'phrase_stats/category=comedy/date=2020-01-01' into components."""
match = re.match(r'phrase_stats/category=([^/]+)/date=(\d{4}-\d{2}-\d{2})', partition_ref)
if not match:
raise ValueError(f"Invalid partition ref format: {partition_ref}")
return {"category": match.group(1), "date": match.group(2)}
def handle_config(args):
if len(args) < 1:
print("Config mode requires partition ref", file=sys.stderr)
sys.exit(1)
configs = []
# Process each partition reference
for partition_ref in args:
try:
parsed = parse_partition_ref(partition_ref)
category = parsed["category"]
date_str = parsed["date"]
except ValueError as e:
print(f"Error parsing partition ref: {e}", file=sys.stderr)
sys.exit(1)
# Dependencies: phrase models and categorized reviews for the category and date
phrase_models_ref = f"phrase_models/category={category}/date={date_str}"
categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}"
configs.append({
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": phrase_models_ref}},
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": categorized_reviews_ref}}
],
"args": [category, date_str],
"env": {
"PARTITION_REF": partition_ref,
"TARGET_CATEGORY": category,
"TARGET_DATE": date_str
}
})
config = {"configs": configs}
print(json.dumps(config))
def handle_exec(args):
import time, random, os; time.sleep(float(os.getenv('EXEC_SLEEP', '0')) * random.random())
if len(args) < 2:
print("Exec mode requires category and date arguments", file=sys.stderr)
sys.exit(1)
target_category = args[0]
target_date = args[1]
partition_ref = os.getenv('PARTITION_REF', f'phrase_stats/category={target_category}/date={target_date}')
# Input paths
phrase_models_file = f"/tmp/databuild_test/examples/podcast_reviews/phrase_models/category={target_category}/date={target_date}/phrase_models.parquet"
categorized_reviews_file = f"/tmp/databuild_test/examples/podcast_reviews/categorized_reviews/category={target_category}/date={target_date}/categorized_reviews.parquet"
# Check input files exist
if not os.path.exists(phrase_models_file):
print(f"Phrase models file not found: {phrase_models_file}", file=sys.stderr)
sys.exit(1)
if not os.path.exists(categorized_reviews_file):
print(f"Categorized reviews file not found: {categorized_reviews_file}", file=sys.stderr)
sys.exit(1)
# Output path
output_dir = Path(f"/tmp/databuild_test/examples/podcast_reviews/phrase_stats/category={target_category}/date={target_date}")
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / "phrase_stats.parquet"
try:
# Calculate phrase statistics per podcast
calculate_phrase_stats_for_category_date(
phrase_models_file,
categorized_reviews_file,
target_category,
target_date,
str(output_file)
)
print(f"Successfully calculated phrase stats for category {target_category} on {target_date}")
print(f"Output written to: {output_file}")
# Create manifest
manifest = {
"outputs": [{"str": partition_ref}],
"inputs": [
{"str": f"phrase_models/category={target_category}/date={target_date}"},
{"str": f"categorized_reviews/category={target_category}/date={target_date}"}
],
"start_time": datetime.now().isoformat(),
"end_time": datetime.now().isoformat(),
"task": {
"job": {"label": "//examples/podcast_reviews:phrase_stats_job"},
"config": {
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": f"phrase_models/category={target_category}/date={target_date}"}},
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}}
],
"args": [target_category, target_date],
"env": {"PARTITION_REF": partition_ref, "TARGET_CATEGORY": target_category, "TARGET_DATE": target_date}
}
}
}
manifest_file = output_dir / "manifest.json"
with open(manifest_file, 'w') as f:
json.dump(manifest, f, indent=2)
except Exception as e:
print(f"Error calculating phrase stats: {e}", file=sys.stderr)
sys.exit(1)
def calculate_phrase_stats_for_category_date(
phrase_models_file: str,
categorized_reviews_file: str,
target_category: str,
target_date: str,
output_file: str
):
"""Calculate phrase statistics per podcast by joining phrase models with reviews."""
# Connect to DuckDB for processing
duckdb_conn = duckdb.connect()
try:
# Try to install and load parquet extension, but don't fail if it's already installed
try:
duckdb_conn.execute("INSTALL parquet")
except Exception:
pass # Extension might already be installed
duckdb_conn.execute("LOAD parquet")
# Check if we have phrase models
phrase_count = duckdb_conn.execute(f"SELECT COUNT(*) FROM parquet_scan('{phrase_models_file}')").fetchone()[0]
if phrase_count == 0:
print(f"No phrase models found, creating empty phrase stats")
create_empty_phrase_stats(target_category, target_date, output_file, duckdb_conn)
return
# Query to calculate phrase statistics per podcast
query = f"""
WITH phrase_matches AS (
SELECT
r.podcast_id,
r.podcast_title,
r.rating,
r.content,
p.ngram,
p.score as phrase_score
FROM parquet_scan('{categorized_reviews_file}') r
JOIN parquet_scan('{phrase_models_file}') p
ON lower(r.content) LIKE '%' || lower(p.ngram) || '%'
WHERE r.content IS NOT NULL
AND r.content != ''
AND p.ngram IS NOT NULL
AND p.ngram != ''
),
podcast_phrase_stats AS (
SELECT
'{target_date}' as date,
'{target_category}' as category,
podcast_id,
podcast_title,
ngram,
COUNT(*) as count,
AVG(rating::FLOAT) as avg_rating,
MIN(rating) as min_rating,
MAX(rating) as max_rating,
AVG(phrase_score) as avg_phrase_score,
COUNT(*) * AVG(phrase_score) * AVG(rating::FLOAT) / 5.0 as weighted_score
FROM phrase_matches
GROUP BY podcast_id, podcast_title, ngram
HAVING COUNT(*) >= 2 -- Only include phrases that appear at least twice per podcast
)
SELECT
date,
category,
podcast_id,
podcast_title,
ngram,
count,
avg_rating,
min_rating,
max_rating,
avg_phrase_score,
weighted_score
FROM podcast_phrase_stats
ORDER BY weighted_score DESC, count DESC
"""
# Execute query and save to parquet
duckdb_conn.execute(f"COPY ({query}) TO '{output_file}' (FORMAT PARQUET)")
# Get row count for logging
count_result = duckdb_conn.execute(f"SELECT COUNT(*) FROM ({query})").fetchone()
row_count = count_result[0] if count_result else 0
print(f"Calculated phrase statistics for {row_count} podcast-phrase combinations")
if row_count == 0:
print(f"Warning: No phrase matches found for category '{target_category}' on date '{target_date}'")
create_empty_phrase_stats(target_category, target_date, output_file, duckdb_conn)
finally:
duckdb_conn.close()
def create_empty_phrase_stats(category: str, date: str, output_file: str, duckdb_conn):
"""Create empty phrase stats parquet file with correct schema."""
duckdb_conn.execute("DROP TABLE IF EXISTS empty_phrase_stats")
duckdb_conn.execute("""
CREATE TABLE empty_phrase_stats (
date VARCHAR,
category VARCHAR,
podcast_id VARCHAR,
podcast_title VARCHAR,
ngram VARCHAR,
count BIGINT,
avg_rating DOUBLE,
min_rating INTEGER,
max_rating INTEGER,
avg_phrase_score DOUBLE,
weighted_score DOUBLE
)
""")
duckdb_conn.execute(f"COPY empty_phrase_stats TO '{output_file}' (FORMAT PARQUET)")
print("Created empty phrase stats file")
if __name__ == "__main__":
main()