#!/usr/bin/env python3 import sys import json import os from datetime import datetime, date from pathlib import Path from typing import List, Dict, Any import re from duckdb_utils import create_duckdb_connection, execute_query_with_fallback, save_dataframe_with_fallback def main(): # Write debug info to understand what's being called debug_file = "/tmp/databuild_test/reviews_main_debug.log" try: with open(debug_file, "w") as f: f.write(f"main() called with sys.argv: {sys.argv}\n") f.flush() except: pass if len(sys.argv) < 2: print("Usage: extract_reviews_job.py {config|exec} [args...]", file=sys.stderr) sys.exit(1) command = sys.argv[1] try: with open(debug_file, "a") as f: f.write(f"command: {command}\n") f.flush() except: pass if command == "config": handle_config(sys.argv[2:]) elif command == "exec": handle_exec(sys.argv[2:]) else: print(f"Unknown command: {command}", file=sys.stderr) sys.exit(1) def parse_partition_ref(partition_ref: str) -> Dict[str, str]: """Parse partition ref like 'reviews/date=2020-01-01' into components.""" match = re.match(r'reviews/date=(\d{4}-\d{2}-\d{2})', partition_ref) if not match: raise ValueError(f"Invalid partition ref format: {partition_ref}") return {"date": match.group(1)} def handle_config(args): if len(args) < 1: print("Config mode requires partition ref", file=sys.stderr) sys.exit(1) configs = [] # Process each partition reference for partition_ref in args: try: parsed = parse_partition_ref(partition_ref) date_str = parsed["date"] except ValueError as e: print(f"Error parsing partition ref: {e}", file=sys.stderr) sys.exit(1) configs.append({ "outputs": [{"str": partition_ref}], "inputs": [], "args": [date_str], "env": { "PARTITION_REF": partition_ref, "TARGET_DATE": date_str } }) config = {"configs": configs} print(json.dumps(config)) def handle_exec(args): import time, random, os; time.sleep(float(os.getenv('EXEC_SLEEP', '0')) * random.random()) if len(args) < 1: print("Exec mode requires date argument", file=sys.stderr) sys.exit(1) target_date = args[0] partition_ref = os.getenv('PARTITION_REF', f'reviews/date={target_date}') # Database paths db_path = "/tmp/databuild_test/examples/podcast_reviews/data/ingest/database.sqlite" if not os.path.exists(db_path): # Fallback to relative path for development db_path = "data/ingest/database.sqlite" # Output path output_dir = Path(f"/tmp/databuild_test/examples/podcast_reviews/reviews/date={target_date}") output_dir.mkdir(parents=True, exist_ok=True) output_file = output_dir / "reviews.parquet" try: # Extract reviews for the target date extract_reviews_for_date(db_path, target_date, str(output_file)) print(f"Successfully extracted reviews for {target_date}") print(f"Output written to: {output_file}") # Create manifest manifest = { "outputs": [{"str": partition_ref}], "inputs": [], "start_time": datetime.now().isoformat(), "end_time": datetime.now().isoformat(), "task": { "job": {"label": "//examples/podcast_reviews:extract_reviews_job"}, "config": { "outputs": [{"str": partition_ref}], "inputs": [], "args": [target_date], "env": {"PARTITION_REF": partition_ref, "TARGET_DATE": target_date} } } } manifest_file = output_dir / "manifest.json" with open(manifest_file, 'w') as f: json.dump(manifest, f, indent=2) except Exception as e: print(f"Error extracting reviews: {e}", file=sys.stderr) sys.exit(1) def extract_reviews_for_date(db_path: str, target_date: str, output_file: str): """Extract reviews for a specific date and save as parquet.""" # Connect to DuckDB with extension handling duckdb_conn = create_duckdb_connection() try: # Query reviews for the target date query = """ SELECT podcast_id, title as review_title, content, rating, author_id, created_at, DATE(created_at) as review_date FROM sqlite_scan(?, 'reviews') WHERE DATE(created_at) = ? ORDER BY created_at """ # Execute query with fallback handling df = execute_query_with_fallback( duckdb_conn, db_path, query, [db_path, target_date] ) # Save to parquet with fallback save_dataframe_with_fallback(df, output_file, duckdb_conn, "parquet") row_count = len(df) print(f"Extracted {row_count} reviews for date {target_date}") finally: duckdb_conn.close() if __name__ == "__main__": main()