#!/usr/bin/env python3 import sys import json import os from datetime import datetime from pathlib import Path from typing import List, Dict, Any from duckdb_utils import create_duckdb_connection, execute_query_with_fallback, save_dataframe_with_fallback def main(): # Write debug at the very start to see if main is called debug_file = "/tmp/databuild_test/podcasts_main_debug.log" try: with open(debug_file, "w") as f: f.write(f"main() called with sys.argv: {sys.argv}\n") f.flush() except: pass if len(sys.argv) < 2: print("Usage: extract_podcasts_job.py {config|exec} [args...]", file=sys.stderr) sys.exit(1) command = sys.argv[1] try: with open(debug_file, "a") as f: f.write(f"command: {command}\n") f.flush() except: pass if command == "config": handle_config(sys.argv[2:]) elif command == "exec": handle_exec(sys.argv[2:]) else: print(f"Unknown command: {command}", file=sys.stderr) sys.exit(1) def handle_config(args): if len(args) < 1: print("Config mode requires partition ref", file=sys.stderr) sys.exit(1) partition_ref = args[0] # This job produces a single partition with all podcast metadata if partition_ref != "podcasts/all": print(f"Invalid partition ref: {partition_ref}. Expected 'podcasts/all'", file=sys.stderr) sys.exit(1) config = { "configs": [{ "outputs": [{"str": partition_ref}], "inputs": [], "args": ["all"], "env": { "PARTITION_REF": partition_ref } }] } print(json.dumps(config)) def handle_exec(args): # Write debug info to a file since stdout might not be captured debug_file = "/tmp/databuild_test/podcasts_debug.log" with open(debug_file, "w") as f: f.write(f"Starting extract_podcasts_job.exec with args: {args}\n") f.flush() print(f"Starting extract_podcasts_job.exec with args: {args}") partition_ref = os.getenv('PARTITION_REF', 'podcasts/all') print(f"Partition ref: {partition_ref}") with open(debug_file, "a") as f: f.write(f"Partition ref: {partition_ref}\n") f.flush() # Database paths db_path = "/tmp/databuild_test/examples/podcast_reviews/data/ingest/database.sqlite" if not os.path.exists(db_path): # Fallback to relative path for development db_path = "data/ingest/database.sqlite" print(f"Looking for database at: {db_path}") print(f"Database exists: {os.path.exists(db_path)}") # Output path output_dir = Path("/tmp/databuild_test/examples/podcast_reviews/podcasts") output_dir.mkdir(parents=True, exist_ok=True) output_file = output_dir / "podcasts.parquet" print(f"Output directory: {output_dir}") print(f"Output file: {output_file}") try: # Extract all podcasts with their categories print("Calling extract_podcasts_with_categories...") result = extract_podcasts_with_categories(db_path, str(output_file)) print(f"extract_podcasts_with_categories returned: {result}") print(f"Successfully extracted podcast metadata") print(f"Output written to: {output_file}") print(f"Output file exists: {output_file.exists()}") # Create manifest manifest = { "outputs": [{"str": partition_ref}], "inputs": [], "start_time": datetime.now().isoformat(), "end_time": datetime.now().isoformat(), "task": { "job": {"label": "//examples/podcast_reviews:extract_podcasts_job"}, "config": { "outputs": [{"str": partition_ref}], "inputs": [], "args": [], "env": {"PARTITION_REF": partition_ref} } } } manifest_file = output_dir / "manifest.json" with open(manifest_file, 'w') as f: json.dump(manifest, f, indent=2) print(f"Manifest written to: {manifest_file}") except Exception as e: print(f"Error extracting podcasts: {e}", file=sys.stderr) import traceback traceback.print_exc() sys.exit(1) def extract_podcasts_with_categories(db_path: str, output_file: str): """Extract all podcasts with their categories and save as parquet.""" print(f"extract_podcasts_with_categories called with db_path={db_path}, output_file={output_file}") # Connect to DuckDB with extension handling print("Creating DuckDB connection...") duckdb_conn = create_duckdb_connection() print("DuckDB connection created") try: # Use a simpler approach that works with SQLite fallback try: # Try complex DuckDB query first query = """ WITH podcast_categories AS ( SELECT p.podcast_id, p.itunes_id, p.slug, p.itunes_url, p.title, c.category, ROW_NUMBER() OVER (PARTITION BY p.podcast_id ORDER BY c.category) as category_rank FROM sqlite_scan(?, 'podcasts') p LEFT JOIN sqlite_scan(?, 'categories') c ON p.podcast_id = c.podcast_id ), primary_categories AS ( SELECT podcast_id, itunes_id, slug, itunes_url, title, category as primary_category FROM podcast_categories WHERE category_rank = 1 ), all_categories AS ( SELECT podcast_id, STRING_AGG(category, '|' ORDER BY category) as all_categories FROM podcast_categories WHERE category IS NOT NULL GROUP BY podcast_id ) SELECT pc.podcast_id, pc.itunes_id, pc.slug, pc.itunes_url, pc.title, pc.primary_category, COALESCE(ac.all_categories, pc.primary_category) as all_categories FROM primary_categories pc LEFT JOIN all_categories ac ON pc.podcast_id = ac.podcast_id ORDER BY pc.title """ df = duckdb_conn.execute(query, [db_path, db_path]).df() except Exception as e: print(f"DuckDB complex query failed: {e}, using pandas fallback") # Fallback: Use pandas to process the data import pandas as pd import sqlite3 sqlite_conn = sqlite3.connect(db_path) try: # Read podcasts and categories separately podcasts_df = pd.read_sql_query("SELECT * FROM podcasts", sqlite_conn) categories_df = pd.read_sql_query("SELECT * FROM categories", sqlite_conn) # Group categories by podcast_id categories_grouped = categories_df.groupby('podcast_id')['category'].apply( lambda x: '|'.join(sorted(x)) ).reset_index() categories_grouped.columns = ['podcast_id', 'all_categories'] # Get primary category (first alphabetically) primary_categories = categories_df.sort_values('category').groupby('podcast_id').first().reset_index() primary_categories = primary_categories[['podcast_id', 'category']].rename(columns={'category': 'primary_category'}) # Join everything together df = podcasts_df.merge(primary_categories, on='podcast_id', how='left') df = df.merge(categories_grouped, on='podcast_id', how='left') # Fill missing values df['primary_category'] = df['primary_category'].fillna('unknown') df['all_categories'] = df['all_categories'].fillna(df['primary_category']) # Sort by title df = df.sort_values('title') finally: sqlite_conn.close() # Save to parquet with fallback save_dataframe_with_fallback(df, output_file, duckdb_conn, "parquet") row_count = len(df) print(f"Extracted {row_count} podcasts with category information") finally: duckdb_conn.close() if __name__ == "__main__": main()