201 lines
No EOL
7.3 KiB
Python
201 lines
No EOL
7.3 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import sys
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any
|
|
import re
|
|
from duckdb_utils import create_duckdb_connection, read_dataframe_with_fallback, save_dataframe_with_fallback
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print("Usage: categorize_reviews_job.py {config|exec} [args...]", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
command = sys.argv[1]
|
|
|
|
if command == "config":
|
|
handle_config(sys.argv[2:])
|
|
elif command == "exec":
|
|
handle_exec(sys.argv[2:])
|
|
else:
|
|
print(f"Unknown command: {command}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def parse_partition_ref(partition_ref: str) -> Dict[str, str]:
|
|
"""Parse partition ref like 'categorized_reviews/category=comedy/date=2020-01-01' into components."""
|
|
match = re.match(r'categorized_reviews/category=([^/]+)/date=(\d{4}-\d{2}-\d{2})', partition_ref)
|
|
if not match:
|
|
raise ValueError(f"Invalid partition ref format: {partition_ref}")
|
|
return {"category": match.group(1), "date": match.group(2)}
|
|
|
|
def handle_config(args):
|
|
if len(args) < 1:
|
|
print("Config mode requires partition ref", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
partition_ref = args[0]
|
|
|
|
try:
|
|
parsed = parse_partition_ref(partition_ref)
|
|
category = parsed["category"]
|
|
date_str = parsed["date"]
|
|
except ValueError as e:
|
|
print(f"Error parsing partition ref: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Dependencies: reviews for the date and podcast metadata
|
|
reviews_ref = f"reviews/date={date_str}"
|
|
podcasts_ref = "podcasts/all"
|
|
|
|
config = {
|
|
"configs": [{
|
|
"outputs": [{"str": partition_ref}],
|
|
"inputs": [
|
|
{"dep_type": 1, "partition_ref": {"str": reviews_ref}},
|
|
{"dep_type": 1, "partition_ref": {"str": podcasts_ref}}
|
|
],
|
|
"args": [category, date_str],
|
|
"env": {
|
|
"PARTITION_REF": partition_ref,
|
|
"TARGET_CATEGORY": category,
|
|
"TARGET_DATE": date_str
|
|
}
|
|
}]
|
|
}
|
|
|
|
print(json.dumps(config))
|
|
|
|
def handle_exec(args):
|
|
if len(args) < 2:
|
|
print("Exec mode requires category and date arguments", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
target_category = args[0]
|
|
target_date = args[1]
|
|
partition_ref = os.getenv('PARTITION_REF', f'categorized_reviews/category={target_category}/date={target_date}')
|
|
|
|
# Input paths - check for both parquet and CSV fallbacks
|
|
reviews_base = f"/tmp/databuild_test/examples/podcast_reviews/reviews/date={target_date}/reviews"
|
|
podcasts_base = "/tmp/databuild_test/examples/podcast_reviews/podcasts/podcasts"
|
|
|
|
reviews_file = None
|
|
podcasts_file = None
|
|
|
|
# Find reviews file (parquet or CSV)
|
|
for ext in ['.parquet', '.csv']:
|
|
candidate = reviews_base + ext
|
|
if os.path.exists(candidate):
|
|
reviews_file = candidate
|
|
break
|
|
|
|
# Find podcasts file (parquet or CSV)
|
|
for ext in ['.parquet', '.csv']:
|
|
candidate = podcasts_base + ext
|
|
if os.path.exists(candidate):
|
|
podcasts_file = candidate
|
|
break
|
|
|
|
if not reviews_file:
|
|
print(f"Reviews file not found: {reviews_base}.(parquet|csv)", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if not podcasts_file:
|
|
print(f"Podcasts file not found: {podcasts_base}.(parquet|csv)", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Output path
|
|
output_dir = Path(f"/tmp/databuild_test/examples/podcast_reviews/categorized_reviews/category={target_category}/date={target_date}")
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
output_file = output_dir / "categorized_reviews.parquet"
|
|
|
|
try:
|
|
# Categorize reviews by joining with podcast metadata
|
|
categorize_reviews_for_category_date(reviews_file, podcasts_file, target_category, str(output_file))
|
|
|
|
print(f"Successfully categorized reviews for category {target_category} on {target_date}")
|
|
print(f"Output written to: {output_file}")
|
|
|
|
# Create manifest
|
|
manifest = {
|
|
"outputs": [{"str": partition_ref}],
|
|
"inputs": [
|
|
{"str": f"reviews/date={target_date}"},
|
|
{"str": "podcasts/all"}
|
|
],
|
|
"start_time": datetime.now().isoformat(),
|
|
"end_time": datetime.now().isoformat(),
|
|
"task": {
|
|
"job": {"label": "//examples/podcast_reviews:categorize_reviews_job"},
|
|
"config": {
|
|
"outputs": [{"str": partition_ref}],
|
|
"inputs": [
|
|
{"dep_type": 1, "partition_ref": {"str": f"reviews/date={target_date}"}},
|
|
{"dep_type": 1, "partition_ref": {"str": "podcasts/all"}}
|
|
],
|
|
"args": [target_category, target_date],
|
|
"env": {"PARTITION_REF": partition_ref, "TARGET_CATEGORY": target_category, "TARGET_DATE": target_date}
|
|
}
|
|
}
|
|
}
|
|
|
|
manifest_file = output_dir / "manifest.json"
|
|
with open(manifest_file, 'w') as f:
|
|
json.dump(manifest, f, indent=2)
|
|
|
|
except Exception as e:
|
|
print(f"Error categorizing reviews: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def categorize_reviews_for_category_date(reviews_file: str, podcasts_file: str, target_category: str, output_file: str):
|
|
"""Join reviews with podcast categories and filter for target category."""
|
|
|
|
# Connect to DuckDB with extension handling
|
|
duckdb_conn = create_duckdb_connection()
|
|
|
|
try:
|
|
# Read input files with fallback handling
|
|
reviews_df = read_dataframe_with_fallback(reviews_file, duckdb_conn)
|
|
podcasts_df = read_dataframe_with_fallback(podcasts_file, duckdb_conn)
|
|
|
|
# Perform join and filtering in pandas
|
|
import pandas as pd
|
|
|
|
# Join reviews with podcasts
|
|
joined_df = reviews_df.merge(podcasts_df, on='podcast_id', how='inner')
|
|
|
|
# Filter by category
|
|
filtered_df = joined_df[
|
|
(joined_df['primary_category'] == target_category) |
|
|
(joined_df['all_categories'].str.contains(target_category, na=False))
|
|
].copy()
|
|
|
|
# Add target category column
|
|
filtered_df['target_category'] = target_category
|
|
|
|
# Select and rename columns to match expected output
|
|
result_df = filtered_df[[
|
|
'podcast_id', 'review_title', 'content', 'rating', 'author_id',
|
|
'created_at', 'review_date', 'title', 'primary_category',
|
|
'all_categories', 'target_category'
|
|
]].rename(columns={'title': 'podcast_title'})
|
|
|
|
# Sort by created_at
|
|
result_df = result_df.sort_values('created_at')
|
|
|
|
# Save to parquet with fallback
|
|
save_dataframe_with_fallback(result_df, output_file, duckdb_conn, "parquet")
|
|
|
|
row_count = len(result_df)
|
|
print(f"Categorized {row_count} reviews for category '{target_category}'")
|
|
|
|
if row_count == 0:
|
|
print(f"Warning: No reviews found for category '{target_category}'")
|
|
|
|
finally:
|
|
duckdb_conn.close()
|
|
|
|
if __name__ == "__main__":
|
|
main() |