databuild/examples/podcast_reviews/extract_reviews_job.py

172 lines
No EOL
5.3 KiB
Python

#!/usr/bin/env python3
import sys
import json
import os
from datetime import datetime, date
from pathlib import Path
from typing import List, Dict, Any
import re
from duckdb_utils import create_duckdb_connection, execute_query_with_fallback, save_dataframe_with_fallback
def main():
# Write debug info to understand what's being called
debug_file = "/tmp/databuild_test/reviews_main_debug.log"
try:
with open(debug_file, "w") as f:
f.write(f"main() called with sys.argv: {sys.argv}\n")
f.flush()
except:
pass
if len(sys.argv) < 2:
print("Usage: extract_reviews_job.py {config|exec} [args...]", file=sys.stderr)
sys.exit(1)
command = sys.argv[1]
try:
with open(debug_file, "a") as f:
f.write(f"command: {command}\n")
f.flush()
except:
pass
if command == "config":
handle_config(sys.argv[2:])
elif command == "exec":
handle_exec(sys.argv[2:])
else:
print(f"Unknown command: {command}", file=sys.stderr)
sys.exit(1)
def parse_partition_ref(partition_ref: str) -> Dict[str, str]:
"""Parse partition ref like 'reviews/date=2020-01-01' into components."""
match = re.match(r'reviews/date=(\d{4}-\d{2}-\d{2})', partition_ref)
if not match:
raise ValueError(f"Invalid partition ref format: {partition_ref}")
return {"date": match.group(1)}
def handle_config(args):
if len(args) < 1:
print("Config mode requires partition ref", file=sys.stderr)
sys.exit(1)
configs = []
# Process each partition reference
for partition_ref in args:
try:
parsed = parse_partition_ref(partition_ref)
date_str = parsed["date"]
except ValueError as e:
print(f"Error parsing partition ref: {e}", file=sys.stderr)
sys.exit(1)
configs.append({
"outputs": [{"str": partition_ref}],
"inputs": [],
"args": [date_str],
"env": {
"PARTITION_REF": partition_ref,
"TARGET_DATE": date_str
}
})
config = {"configs": configs}
print(json.dumps(config))
def handle_exec(args):
import time, random, os; time.sleep(float(os.getenv('EXEC_SLEEP', '0')) * random.random())
if len(args) < 1:
print("Exec mode requires date argument", file=sys.stderr)
sys.exit(1)
target_date = args[0]
partition_ref = os.getenv('PARTITION_REF', f'reviews/date={target_date}')
# Database paths
db_path = "/tmp/databuild_test/examples/podcast_reviews/data/ingest/database.sqlite"
if not os.path.exists(db_path):
# Fallback to relative path for development
db_path = "data/ingest/database.sqlite"
# Output path
output_dir = Path(f"/tmp/databuild_test/examples/podcast_reviews/reviews/date={target_date}")
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / "reviews.parquet"
try:
# Extract reviews for the target date
extract_reviews_for_date(db_path, target_date, str(output_file))
print(f"Successfully extracted reviews for {target_date}")
print(f"Output written to: {output_file}")
# Create manifest
manifest = {
"outputs": [{"str": partition_ref}],
"inputs": [],
"start_time": datetime.now().isoformat(),
"end_time": datetime.now().isoformat(),
"task": {
"job": {"label": "//examples/podcast_reviews:extract_reviews_job"},
"config": {
"outputs": [{"str": partition_ref}],
"inputs": [],
"args": [target_date],
"env": {"PARTITION_REF": partition_ref, "TARGET_DATE": target_date}
}
}
}
manifest_file = output_dir / "manifest.json"
with open(manifest_file, 'w') as f:
json.dump(manifest, f, indent=2)
except Exception as e:
print(f"Error extracting reviews: {e}", file=sys.stderr)
sys.exit(1)
def extract_reviews_for_date(db_path: str, target_date: str, output_file: str):
"""Extract reviews for a specific date and save as parquet."""
# Connect to DuckDB with extension handling
duckdb_conn = create_duckdb_connection()
try:
# Query reviews for the target date
query = """
SELECT
podcast_id,
title as review_title,
content,
rating,
author_id,
created_at,
DATE(created_at) as review_date
FROM sqlite_scan(?, 'reviews')
WHERE DATE(created_at) = ?
ORDER BY created_at
"""
# Execute query with fallback handling
df = execute_query_with_fallback(
duckdb_conn,
db_path,
query,
[db_path, target_date]
)
# Save to parquet with fallback
save_dataframe_with_fallback(df, output_file, duckdb_conn, "parquet")
row_count = len(df)
print(f"Extracted {row_count} reviews for date {target_date}")
finally:
duckdb_conn.close()
if __name__ == "__main__":
main()