248 lines
No EOL
8.8 KiB
Python
248 lines
No EOL
8.8 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import sys
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any
|
|
from duckdb_utils import create_duckdb_connection, execute_query_with_fallback, save_dataframe_with_fallback
|
|
|
|
def main():
|
|
# Write debug at the very start to see if main is called
|
|
debug_file = "/tmp/databuild_test/podcasts_main_debug.log"
|
|
try:
|
|
with open(debug_file, "w") as f:
|
|
f.write(f"main() called with sys.argv: {sys.argv}\n")
|
|
f.flush()
|
|
except:
|
|
pass
|
|
|
|
if len(sys.argv) < 2:
|
|
print("Usage: extract_podcasts_job.py {config|exec} [args...]", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
command = sys.argv[1]
|
|
|
|
try:
|
|
with open(debug_file, "a") as f:
|
|
f.write(f"command: {command}\n")
|
|
f.flush()
|
|
except:
|
|
pass
|
|
|
|
if command == "config":
|
|
handle_config(sys.argv[2:])
|
|
elif command == "exec":
|
|
handle_exec(sys.argv[2:])
|
|
else:
|
|
print(f"Unknown command: {command}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def handle_config(args):
|
|
if len(args) < 1:
|
|
print("Config mode requires partition ref", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
partition_ref = args[0]
|
|
|
|
# This job produces a single partition with all podcast metadata
|
|
if partition_ref != "podcasts/all":
|
|
print(f"Invalid partition ref: {partition_ref}. Expected 'podcasts/all'", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
config = {
|
|
"configs": [{
|
|
"outputs": [{"str": partition_ref}],
|
|
"inputs": [],
|
|
"args": ["all"],
|
|
"env": {
|
|
"PARTITION_REF": partition_ref
|
|
}
|
|
}]
|
|
}
|
|
|
|
print(json.dumps(config))
|
|
|
|
def handle_exec(args):
|
|
import time, random, os; time.sleep(float(os.getenv('EXEC_SLEEP', '0')) * random.random())
|
|
|
|
# Write debug info to a file since stdout might not be captured
|
|
debug_file = "/tmp/databuild_test/podcasts_debug.log"
|
|
with open(debug_file, "w") as f:
|
|
f.write(f"Starting extract_podcasts_job.exec with args: {args}\n")
|
|
f.flush()
|
|
|
|
print(f"Starting extract_podcasts_job.exec with args: {args}")
|
|
partition_ref = os.getenv('PARTITION_REF', 'podcasts/all')
|
|
print(f"Partition ref: {partition_ref}")
|
|
|
|
with open(debug_file, "a") as f:
|
|
f.write(f"Partition ref: {partition_ref}\n")
|
|
f.flush()
|
|
|
|
# Database paths
|
|
db_path = "/tmp/databuild_test/examples/podcast_reviews/data/ingest/database.sqlite"
|
|
if not os.path.exists(db_path):
|
|
# Fallback to relative path for development
|
|
db_path = "data/ingest/database.sqlite"
|
|
|
|
print(f"Looking for database at: {db_path}")
|
|
print(f"Database exists: {os.path.exists(db_path)}")
|
|
|
|
# Output path
|
|
output_dir = Path("/tmp/databuild_test/examples/podcast_reviews/podcasts")
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
output_file = output_dir / "podcasts.parquet"
|
|
|
|
print(f"Output directory: {output_dir}")
|
|
print(f"Output file: {output_file}")
|
|
|
|
try:
|
|
# Extract all podcasts with their categories
|
|
print("Calling extract_podcasts_with_categories...")
|
|
result = extract_podcasts_with_categories(db_path, str(output_file))
|
|
print(f"extract_podcasts_with_categories returned: {result}")
|
|
|
|
print(f"Successfully extracted podcast metadata")
|
|
print(f"Output written to: {output_file}")
|
|
print(f"Output file exists: {output_file.exists()}")
|
|
|
|
# Create manifest
|
|
manifest = {
|
|
"outputs": [{"str": partition_ref}],
|
|
"inputs": [],
|
|
"start_time": datetime.now().isoformat(),
|
|
"end_time": datetime.now().isoformat(),
|
|
"task": {
|
|
"job": {"label": "//examples/podcast_reviews:extract_podcasts_job"},
|
|
"config": {
|
|
"outputs": [{"str": partition_ref}],
|
|
"inputs": [],
|
|
"args": [],
|
|
"env": {"PARTITION_REF": partition_ref}
|
|
}
|
|
}
|
|
}
|
|
|
|
manifest_file = output_dir / "manifest.json"
|
|
with open(manifest_file, 'w') as f:
|
|
json.dump(manifest, f, indent=2)
|
|
|
|
print(f"Manifest written to: {manifest_file}")
|
|
|
|
except Exception as e:
|
|
print(f"Error extracting podcasts: {e}", file=sys.stderr)
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
def extract_podcasts_with_categories(db_path: str, output_file: str):
|
|
"""Extract all podcasts with their categories and save as parquet."""
|
|
|
|
print(f"extract_podcasts_with_categories called with db_path={db_path}, output_file={output_file}")
|
|
|
|
# Connect to DuckDB with extension handling
|
|
print("Creating DuckDB connection...")
|
|
duckdb_conn = create_duckdb_connection()
|
|
print("DuckDB connection created")
|
|
|
|
try:
|
|
# Use a simpler approach that works with SQLite fallback
|
|
try:
|
|
# Try complex DuckDB query first
|
|
query = """
|
|
WITH podcast_categories AS (
|
|
SELECT
|
|
p.podcast_id,
|
|
p.itunes_id,
|
|
p.slug,
|
|
p.itunes_url,
|
|
p.title,
|
|
c.category,
|
|
ROW_NUMBER() OVER (PARTITION BY p.podcast_id ORDER BY c.category) as category_rank
|
|
FROM sqlite_scan(?, 'podcasts') p
|
|
LEFT JOIN sqlite_scan(?, 'categories') c ON p.podcast_id = c.podcast_id
|
|
),
|
|
primary_categories AS (
|
|
SELECT
|
|
podcast_id,
|
|
itunes_id,
|
|
slug,
|
|
itunes_url,
|
|
title,
|
|
category as primary_category
|
|
FROM podcast_categories
|
|
WHERE category_rank = 1
|
|
),
|
|
all_categories AS (
|
|
SELECT
|
|
podcast_id,
|
|
STRING_AGG(category, '|' ORDER BY category) as all_categories
|
|
FROM podcast_categories
|
|
WHERE category IS NOT NULL
|
|
GROUP BY podcast_id
|
|
)
|
|
SELECT
|
|
pc.podcast_id,
|
|
pc.itunes_id,
|
|
pc.slug,
|
|
pc.itunes_url,
|
|
pc.title,
|
|
pc.primary_category,
|
|
COALESCE(ac.all_categories, pc.primary_category) as all_categories
|
|
FROM primary_categories pc
|
|
LEFT JOIN all_categories ac ON pc.podcast_id = ac.podcast_id
|
|
ORDER BY pc.title
|
|
"""
|
|
|
|
df = duckdb_conn.execute(query, [db_path, db_path]).df()
|
|
|
|
except Exception as e:
|
|
print(f"DuckDB complex query failed: {e}, using pandas fallback")
|
|
|
|
# Fallback: Use pandas to process the data
|
|
import pandas as pd
|
|
import sqlite3
|
|
|
|
sqlite_conn = sqlite3.connect(db_path)
|
|
try:
|
|
# Read podcasts and categories separately
|
|
podcasts_df = pd.read_sql_query("SELECT * FROM podcasts", sqlite_conn)
|
|
categories_df = pd.read_sql_query("SELECT * FROM categories", sqlite_conn)
|
|
|
|
# Group categories by podcast_id
|
|
categories_grouped = categories_df.groupby('podcast_id')['category'].apply(
|
|
lambda x: '|'.join(sorted(x))
|
|
).reset_index()
|
|
categories_grouped.columns = ['podcast_id', 'all_categories']
|
|
|
|
# Get primary category (first alphabetically)
|
|
primary_categories = categories_df.sort_values('category').groupby('podcast_id').first().reset_index()
|
|
primary_categories = primary_categories[['podcast_id', 'category']].rename(columns={'category': 'primary_category'})
|
|
|
|
# Join everything together
|
|
df = podcasts_df.merge(primary_categories, on='podcast_id', how='left')
|
|
df = df.merge(categories_grouped, on='podcast_id', how='left')
|
|
|
|
# Fill missing values
|
|
df['primary_category'] = df['primary_category'].fillna('unknown')
|
|
df['all_categories'] = df['all_categories'].fillna(df['primary_category'])
|
|
|
|
# Sort by title
|
|
df = df.sort_values('title')
|
|
|
|
finally:
|
|
sqlite_conn.close()
|
|
|
|
# Save to parquet with fallback
|
|
save_dataframe_with_fallback(df, output_file, duckdb_conn, "parquet")
|
|
|
|
row_count = len(df)
|
|
print(f"Extracted {row_count} podcasts with category information")
|
|
|
|
finally:
|
|
duckdb_conn.close()
|
|
|
|
if __name__ == "__main__":
|
|
main() |