databuild/examples/podcast_reviews/extract_podcasts_job.py

248 lines
No EOL
8.8 KiB
Python

#!/usr/bin/env python3
import sys
import json
import os
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any
from duckdb_utils import create_duckdb_connection, execute_query_with_fallback, save_dataframe_with_fallback
def main():
# Write debug at the very start to see if main is called
debug_file = "/tmp/databuild_test/podcasts_main_debug.log"
try:
with open(debug_file, "w") as f:
f.write(f"main() called with sys.argv: {sys.argv}\n")
f.flush()
except:
pass
if len(sys.argv) < 2:
print("Usage: extract_podcasts_job.py {config|exec} [args...]", file=sys.stderr)
sys.exit(1)
command = sys.argv[1]
try:
with open(debug_file, "a") as f:
f.write(f"command: {command}\n")
f.flush()
except:
pass
if command == "config":
handle_config(sys.argv[2:])
elif command == "exec":
handle_exec(sys.argv[2:])
else:
print(f"Unknown command: {command}", file=sys.stderr)
sys.exit(1)
def handle_config(args):
if len(args) < 1:
print("Config mode requires partition ref", file=sys.stderr)
sys.exit(1)
partition_ref = args[0]
# This job produces a single partition with all podcast metadata
if partition_ref != "podcasts/all":
print(f"Invalid partition ref: {partition_ref}. Expected 'podcasts/all'", file=sys.stderr)
sys.exit(1)
config = {
"configs": [{
"outputs": [{"str": partition_ref}],
"inputs": [],
"args": ["all"],
"env": {
"PARTITION_REF": partition_ref
}
}]
}
print(json.dumps(config))
def handle_exec(args):
import time, random, os; time.sleep(float(os.getenv('EXEC_SLEEP', '0')) * random.random())
# Write debug info to a file since stdout might not be captured
debug_file = "/tmp/databuild_test/podcasts_debug.log"
with open(debug_file, "w") as f:
f.write(f"Starting extract_podcasts_job.exec with args: {args}\n")
f.flush()
print(f"Starting extract_podcasts_job.exec with args: {args}")
partition_ref = os.getenv('PARTITION_REF', 'podcasts/all')
print(f"Partition ref: {partition_ref}")
with open(debug_file, "a") as f:
f.write(f"Partition ref: {partition_ref}\n")
f.flush()
# Database paths
db_path = "/tmp/databuild_test/examples/podcast_reviews/data/ingest/database.sqlite"
if not os.path.exists(db_path):
# Fallback to relative path for development
db_path = "data/ingest/database.sqlite"
print(f"Looking for database at: {db_path}")
print(f"Database exists: {os.path.exists(db_path)}")
# Output path
output_dir = Path("/tmp/databuild_test/examples/podcast_reviews/podcasts")
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / "podcasts.parquet"
print(f"Output directory: {output_dir}")
print(f"Output file: {output_file}")
try:
# Extract all podcasts with their categories
print("Calling extract_podcasts_with_categories...")
result = extract_podcasts_with_categories(db_path, str(output_file))
print(f"extract_podcasts_with_categories returned: {result}")
print(f"Successfully extracted podcast metadata")
print(f"Output written to: {output_file}")
print(f"Output file exists: {output_file.exists()}")
# Create manifest
manifest = {
"outputs": [{"str": partition_ref}],
"inputs": [],
"start_time": datetime.now().isoformat(),
"end_time": datetime.now().isoformat(),
"task": {
"job": {"label": "//examples/podcast_reviews:extract_podcasts_job"},
"config": {
"outputs": [{"str": partition_ref}],
"inputs": [],
"args": [],
"env": {"PARTITION_REF": partition_ref}
}
}
}
manifest_file = output_dir / "manifest.json"
with open(manifest_file, 'w') as f:
json.dump(manifest, f, indent=2)
print(f"Manifest written to: {manifest_file}")
except Exception as e:
print(f"Error extracting podcasts: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
def extract_podcasts_with_categories(db_path: str, output_file: str):
"""Extract all podcasts with their categories and save as parquet."""
print(f"extract_podcasts_with_categories called with db_path={db_path}, output_file={output_file}")
# Connect to DuckDB with extension handling
print("Creating DuckDB connection...")
duckdb_conn = create_duckdb_connection()
print("DuckDB connection created")
try:
# Use a simpler approach that works with SQLite fallback
try:
# Try complex DuckDB query first
query = """
WITH podcast_categories AS (
SELECT
p.podcast_id,
p.itunes_id,
p.slug,
p.itunes_url,
p.title,
c.category,
ROW_NUMBER() OVER (PARTITION BY p.podcast_id ORDER BY c.category) as category_rank
FROM sqlite_scan(?, 'podcasts') p
LEFT JOIN sqlite_scan(?, 'categories') c ON p.podcast_id = c.podcast_id
),
primary_categories AS (
SELECT
podcast_id,
itunes_id,
slug,
itunes_url,
title,
category as primary_category
FROM podcast_categories
WHERE category_rank = 1
),
all_categories AS (
SELECT
podcast_id,
STRING_AGG(category, '|' ORDER BY category) as all_categories
FROM podcast_categories
WHERE category IS NOT NULL
GROUP BY podcast_id
)
SELECT
pc.podcast_id,
pc.itunes_id,
pc.slug,
pc.itunes_url,
pc.title,
pc.primary_category,
COALESCE(ac.all_categories, pc.primary_category) as all_categories
FROM primary_categories pc
LEFT JOIN all_categories ac ON pc.podcast_id = ac.podcast_id
ORDER BY pc.title
"""
df = duckdb_conn.execute(query, [db_path, db_path]).df()
except Exception as e:
print(f"DuckDB complex query failed: {e}, using pandas fallback")
# Fallback: Use pandas to process the data
import pandas as pd
import sqlite3
sqlite_conn = sqlite3.connect(db_path)
try:
# Read podcasts and categories separately
podcasts_df = pd.read_sql_query("SELECT * FROM podcasts", sqlite_conn)
categories_df = pd.read_sql_query("SELECT * FROM categories", sqlite_conn)
# Group categories by podcast_id
categories_grouped = categories_df.groupby('podcast_id')['category'].apply(
lambda x: '|'.join(sorted(x))
).reset_index()
categories_grouped.columns = ['podcast_id', 'all_categories']
# Get primary category (first alphabetically)
primary_categories = categories_df.sort_values('category').groupby('podcast_id').first().reset_index()
primary_categories = primary_categories[['podcast_id', 'category']].rename(columns={'category': 'primary_category'})
# Join everything together
df = podcasts_df.merge(primary_categories, on='podcast_id', how='left')
df = df.merge(categories_grouped, on='podcast_id', how='left')
# Fill missing values
df['primary_category'] = df['primary_category'].fillna('unknown')
df['all_categories'] = df['all_categories'].fillna(df['primary_category'])
# Sort by title
df = df.sort_values('title')
finally:
sqlite_conn.close()
# Save to parquet with fallback
save_dataframe_with_fallback(df, output_file, duckdb_conn, "parquet")
row_count = len(df)
print(f"Extracted {row_count} podcasts with category information")
finally:
duckdb_conn.close()
if __name__ == "__main__":
main()