315 lines
No EOL
13 KiB
Python
315 lines
No EOL
13 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import sys
|
|
import json
|
|
import os
|
|
import duckdb
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any
|
|
import re
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print("Usage: daily_summary_job.py {config|exec} [args...]", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
command = sys.argv[1]
|
|
|
|
if command == "config":
|
|
handle_config(sys.argv[2:])
|
|
elif command == "exec":
|
|
handle_exec(sys.argv[2:])
|
|
else:
|
|
print(f"Unknown command: {command}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def parse_partition_ref(partition_ref: str) -> Dict[str, str]:
|
|
"""Parse partition ref like 'daily_summaries/category=comedy/date=2020-01-01' into components."""
|
|
match = re.match(r'daily_summaries/category=([^/]+)/date=(\d{4}-\d{2}-\d{2})', partition_ref)
|
|
if not match:
|
|
raise ValueError(f"Invalid partition ref format: {partition_ref}")
|
|
return {"category": match.group(1), "date": match.group(2)}
|
|
|
|
def handle_config(args):
|
|
if len(args) < 1:
|
|
print("Config mode requires partition ref", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
configs = []
|
|
|
|
# Process each partition reference
|
|
for partition_ref in args:
|
|
try:
|
|
parsed = parse_partition_ref(partition_ref)
|
|
category = parsed["category"]
|
|
date_str = parsed["date"]
|
|
except ValueError as e:
|
|
print(f"Error parsing partition ref: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Dependencies: phrase stats and categorized reviews for the category and date
|
|
phrase_stats_ref = f"phrase_stats/category={category}/date={date_str}"
|
|
categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}"
|
|
|
|
configs.append({
|
|
"outputs": [{"str": partition_ref}],
|
|
"inputs": [
|
|
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": phrase_stats_ref}},
|
|
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": categorized_reviews_ref}}
|
|
],
|
|
"args": [category, date_str],
|
|
"env": {
|
|
"PARTITION_REF": partition_ref,
|
|
"TARGET_CATEGORY": category,
|
|
"TARGET_DATE": date_str
|
|
}
|
|
})
|
|
|
|
config = {"configs": configs}
|
|
print(json.dumps(config))
|
|
|
|
def handle_exec(args):
|
|
import time, random, os; time.sleep(float(os.getenv('EXEC_SLEEP', '0')) * random.random())
|
|
|
|
if len(args) < 2:
|
|
print("Exec mode requires category and date arguments", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
target_category = args[0]
|
|
target_date = args[1]
|
|
partition_ref = os.getenv('PARTITION_REF', f'daily_summaries/category={target_category}/date={target_date}')
|
|
|
|
# Input paths
|
|
phrase_stats_file = f"/tmp/databuild_test/examples/podcast_reviews/phrase_stats/category={target_category}/date={target_date}/phrase_stats.parquet"
|
|
categorized_reviews_file = f"/tmp/databuild_test/examples/podcast_reviews/categorized_reviews/category={target_category}/date={target_date}/categorized_reviews.parquet"
|
|
|
|
# Check input files exist
|
|
if not os.path.exists(phrase_stats_file):
|
|
print(f"Phrase stats file not found: {phrase_stats_file}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if not os.path.exists(categorized_reviews_file):
|
|
print(f"Categorized reviews file not found: {categorized_reviews_file}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Output path
|
|
output_dir = Path(f"/tmp/databuild_test/examples/podcast_reviews/daily_summaries/category={target_category}/date={target_date}")
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
output_file = output_dir / "daily_summary.parquet"
|
|
|
|
try:
|
|
# Generate daily summary combining phrase stats and recent reviews
|
|
generate_daily_summary_for_category_date(
|
|
phrase_stats_file,
|
|
categorized_reviews_file,
|
|
target_category,
|
|
target_date,
|
|
str(output_file)
|
|
)
|
|
|
|
print(f"Successfully generated daily summary for category {target_category} on {target_date}")
|
|
print(f"Output written to: {output_file}")
|
|
|
|
# Create manifest
|
|
manifest = {
|
|
"outputs": [{"str": partition_ref}],
|
|
"inputs": [
|
|
{"str": f"phrase_stats/category={target_category}/date={target_date}"},
|
|
{"str": f"categorized_reviews/category={target_category}/date={target_date}"}
|
|
],
|
|
"start_time": datetime.now().isoformat(),
|
|
"end_time": datetime.now().isoformat(),
|
|
"task": {
|
|
"job": {"label": "//examples/podcast_reviews:daily_summary_job"},
|
|
"config": {
|
|
"outputs": [{"str": partition_ref}],
|
|
"inputs": [
|
|
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": f"phrase_stats/category={target_category}/date={target_date}"}},
|
|
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}}
|
|
],
|
|
"args": [target_category, target_date],
|
|
"env": {"PARTITION_REF": partition_ref, "TARGET_CATEGORY": target_category, "TARGET_DATE": target_date}
|
|
}
|
|
}
|
|
}
|
|
|
|
manifest_file = output_dir / "manifest.json"
|
|
with open(manifest_file, 'w') as f:
|
|
json.dump(manifest, f, indent=2)
|
|
|
|
except Exception as e:
|
|
print(f"Error generating daily summary: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def generate_daily_summary_for_category_date(
|
|
phrase_stats_file: str,
|
|
categorized_reviews_file: str,
|
|
target_category: str,
|
|
target_date: str,
|
|
output_file: str
|
|
):
|
|
"""Generate daily summary combining top phrases and recent reviews."""
|
|
|
|
# Connect to DuckDB for processing
|
|
duckdb_conn = duckdb.connect()
|
|
|
|
try:
|
|
# Try to install and load parquet extension, but don't fail if it's already installed
|
|
try:
|
|
duckdb_conn.execute("INSTALL parquet")
|
|
except Exception:
|
|
pass # Extension might already be installed
|
|
|
|
duckdb_conn.execute("LOAD parquet")
|
|
|
|
# Check if we have data
|
|
phrase_count = duckdb_conn.execute(f"SELECT COUNT(*) FROM parquet_scan('{phrase_stats_file}')").fetchone()[0]
|
|
review_count = duckdb_conn.execute(f"SELECT COUNT(*) FROM parquet_scan('{categorized_reviews_file}')").fetchone()[0]
|
|
|
|
if phrase_count == 0 and review_count == 0:
|
|
print(f"No data found, creating empty daily summary")
|
|
create_empty_daily_summary(target_category, target_date, output_file, duckdb_conn)
|
|
return
|
|
|
|
# Query to generate comprehensive daily summary
|
|
query = f"""
|
|
WITH top_phrases_per_podcast AS (
|
|
SELECT
|
|
podcast_id,
|
|
podcast_title,
|
|
ngram,
|
|
count as phrase_count,
|
|
avg_rating as phrase_avg_rating,
|
|
weighted_score,
|
|
ROW_NUMBER() OVER (PARTITION BY podcast_id ORDER BY weighted_score DESC) as phrase_rank
|
|
FROM parquet_scan('{phrase_stats_file}')
|
|
WHERE ngram IS NOT NULL
|
|
),
|
|
podcast_phrase_summary AS (
|
|
SELECT
|
|
podcast_id,
|
|
podcast_title,
|
|
STRING_AGG(ngram, '; ' ORDER BY weighted_score DESC) as top_phrases,
|
|
COUNT(*) as total_phrases,
|
|
AVG(phrase_avg_rating) as avg_phrase_rating,
|
|
SUM(weighted_score) as total_phrase_score
|
|
FROM top_phrases_per_podcast
|
|
WHERE phrase_rank <= 5 -- Top 5 phrases per podcast
|
|
GROUP BY podcast_id, podcast_title
|
|
),
|
|
podcast_review_summary AS (
|
|
SELECT
|
|
podcast_id,
|
|
podcast_title,
|
|
COUNT(*) as review_count,
|
|
AVG(rating::FLOAT) as avg_rating,
|
|
MIN(rating) as min_rating,
|
|
MAX(rating) as max_rating,
|
|
COUNT(CASE WHEN rating >= 4 THEN 1 END) as positive_reviews,
|
|
COUNT(CASE WHEN rating <= 2 THEN 1 END) as negative_reviews,
|
|
STRING_AGG(
|
|
CASE WHEN rating <= 2 AND length(content) > 20
|
|
THEN substring(content, 1, 200) || '...'
|
|
ELSE NULL
|
|
END,
|
|
' | '
|
|
ORDER BY rating ASC, length(content) DESC
|
|
) as sample_negative_reviews
|
|
FROM parquet_scan('{categorized_reviews_file}')
|
|
WHERE podcast_title IS NOT NULL
|
|
GROUP BY podcast_id, podcast_title
|
|
),
|
|
daily_summary AS (
|
|
SELECT
|
|
'{target_date}' as date,
|
|
'{target_category}' as category,
|
|
COALESCE(pps.podcast_id, prs.podcast_id) as podcast_id,
|
|
COALESCE(pps.podcast_title, prs.podcast_title) as podcast_title,
|
|
COALESCE(prs.review_count, 0) as review_count,
|
|
COALESCE(prs.avg_rating, 0.0) as avg_rating,
|
|
COALESCE(prs.positive_reviews, 0) as positive_reviews,
|
|
COALESCE(prs.negative_reviews, 0) as negative_reviews,
|
|
COALESCE(pps.top_phrases, 'No significant phrases') as top_phrases,
|
|
COALESCE(pps.total_phrases, 0) as total_phrases,
|
|
COALESCE(pps.avg_phrase_rating, 0.0) as avg_phrase_rating,
|
|
COALESCE(pps.total_phrase_score, 0.0) as total_phrase_score,
|
|
prs.sample_negative_reviews,
|
|
CASE
|
|
WHEN prs.avg_rating >= 4.0 AND pps.avg_phrase_rating >= 4.0 THEN 'Highly Positive'
|
|
WHEN prs.avg_rating >= 3.5 THEN 'Positive'
|
|
WHEN prs.avg_rating >= 2.5 THEN 'Mixed'
|
|
WHEN prs.avg_rating >= 1.5 THEN 'Negative'
|
|
ELSE 'Highly Negative'
|
|
END as sentiment_category,
|
|
(prs.review_count * prs.avg_rating * 0.6 + pps.total_phrase_score * 0.4) as overall_score
|
|
FROM podcast_phrase_summary pps
|
|
FULL OUTER JOIN podcast_review_summary prs
|
|
ON pps.podcast_id = prs.podcast_id
|
|
WHERE COALESCE(prs.review_count, 0) > 0 OR COALESCE(pps.total_phrases, 0) > 0
|
|
)
|
|
SELECT
|
|
date,
|
|
category,
|
|
podcast_id,
|
|
podcast_title,
|
|
review_count,
|
|
avg_rating,
|
|
positive_reviews,
|
|
negative_reviews,
|
|
top_phrases,
|
|
total_phrases,
|
|
avg_phrase_rating,
|
|
total_phrase_score,
|
|
sample_negative_reviews,
|
|
sentiment_category,
|
|
overall_score
|
|
FROM daily_summary
|
|
ORDER BY overall_score DESC, review_count DESC
|
|
"""
|
|
|
|
# Execute query and save to parquet
|
|
duckdb_conn.execute(f"COPY ({query}) TO '{output_file}' (FORMAT PARQUET)")
|
|
|
|
# Get row count for logging
|
|
count_result = duckdb_conn.execute(f"SELECT COUNT(*) FROM ({query})").fetchone()
|
|
row_count = count_result[0] if count_result else 0
|
|
|
|
print(f"Generated daily summary for {row_count} podcasts")
|
|
|
|
if row_count == 0:
|
|
print(f"Warning: No summary data generated for category '{target_category}' on date '{target_date}'")
|
|
create_empty_daily_summary(target_category, target_date, output_file, duckdb_conn)
|
|
|
|
finally:
|
|
duckdb_conn.close()
|
|
|
|
def create_empty_daily_summary(category: str, date: str, output_file: str, duckdb_conn):
|
|
"""Create empty daily summary parquet file with correct schema."""
|
|
|
|
duckdb_conn.execute("DROP TABLE IF EXISTS empty_daily_summary")
|
|
duckdb_conn.execute("""
|
|
CREATE TABLE empty_daily_summary (
|
|
date VARCHAR,
|
|
category VARCHAR,
|
|
podcast_id VARCHAR,
|
|
podcast_title VARCHAR,
|
|
review_count BIGINT,
|
|
avg_rating DOUBLE,
|
|
positive_reviews BIGINT,
|
|
negative_reviews BIGINT,
|
|
top_phrases VARCHAR,
|
|
total_phrases BIGINT,
|
|
avg_phrase_rating DOUBLE,
|
|
total_phrase_score DOUBLE,
|
|
sample_negative_reviews VARCHAR,
|
|
sentiment_category VARCHAR,
|
|
overall_score DOUBLE
|
|
)
|
|
""")
|
|
|
|
duckdb_conn.execute(f"COPY empty_daily_summary TO '{output_file}' (FORMAT PARQUET)")
|
|
print("Created empty daily summary file")
|
|
|
|
if __name__ == "__main__":
|
|
main() |