databuild/examples/podcast_reviews/duckdb_utils.py
2025-06-30 22:15:48 -07:00

184 lines
No EOL
5.9 KiB
Python

#!/usr/bin/env python3
"""
Centralized DuckDB utilities for handling extension issues in isolated environments.
"""
import duckdb
import sqlite3
import pandas as pd
from pathlib import Path
from typing import Any, Dict, List, Optional
import warnings
def create_duckdb_connection(enable_extensions: bool = True) -> duckdb.DuckDBPyConnection:
"""
Create a DuckDB connection with proper extension handling for isolated environments.
Args:
enable_extensions: Whether to try enabling extensions (may fail in isolated environments)
Returns:
DuckDB connection object
"""
conn = duckdb.connect()
if enable_extensions:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
try:
# Try to enable extensions, but don't fail if not available
conn.execute("SET autoinstall_known_extensions=1")
conn.execute("SET autoload_known_extensions=1")
except Exception:
# Extensions not available, will use fallback methods
pass
return conn
def sqlite_to_dataframe(sqlite_path: str, query: str, params: Optional[List[Any]] = None) -> pd.DataFrame:
"""
Execute a SQLite query and return results as a pandas DataFrame.
This is a fallback when DuckDB's sqlite_scan doesn't work.
Args:
sqlite_path: Path to SQLite database
query: SQL query to execute
params: Query parameters
Returns:
DataFrame with query results
"""
conn = sqlite3.connect(sqlite_path)
try:
if params:
df = pd.read_sql_query(query, conn, params=params)
else:
df = pd.read_sql_query(query, conn)
return df
finally:
conn.close()
def execute_query_with_fallback(
duckdb_conn: duckdb.DuckDBPyConnection,
sqlite_path: str,
query: str,
params: Optional[List[Any]] = None,
use_sqlite_scan: bool = True
) -> pd.DataFrame:
"""
Execute a query using DuckDB's sqlite_scan if available, otherwise fall back to direct SQLite access.
Args:
duckdb_conn: DuckDB connection
sqlite_path: Path to SQLite database
query: SQL query (should work with both sqlite_scan and direct SQLite)
params: Query parameters
use_sqlite_scan: Whether to try sqlite_scan first
Returns:
DataFrame with query results
"""
if use_sqlite_scan:
try:
# Try using DuckDB's sqlite_scan
if params:
result = duckdb_conn.execute(query, params).df()
else:
result = duckdb_conn.execute(query).df()
return result
except Exception as e:
print(f"sqlite_scan failed: {e}, falling back to direct SQLite access")
# Fallback: Use direct SQLite access
# Convert DuckDB sqlite_scan query to regular SQLite query
fallback_query = query.replace("sqlite_scan(?, 'reviews')", "reviews")
fallback_query = fallback_query.replace("sqlite_scan(?, 'podcasts')", "podcasts")
fallback_query = fallback_query.replace("sqlite_scan(?, 'categories')", "categories")
# Remove the sqlite_path parameter since we're connecting directly
if params and len(params) > 0 and params[0] == sqlite_path:
fallback_params = params[1:]
else:
fallback_params = params
return sqlite_to_dataframe(sqlite_path, fallback_query, fallback_params)
def save_dataframe_with_fallback(
df: pd.DataFrame,
output_path: str,
duckdb_conn: Optional[duckdb.DuckDBPyConnection] = None,
format: str = "parquet"
) -> None:
"""
Save a DataFrame to the specified format, with fallback options if DuckDB extensions fail.
Args:
df: DataFrame to save
output_path: Output file path
duckdb_conn: Optional DuckDB connection (for parquet)
format: Output format ('parquet' or 'csv')
"""
output_path = Path(output_path)
if format.lower() == "parquet":
try:
if duckdb_conn:
# Try using DuckDB to write parquet
duckdb_conn.register('temp_df', df)
duckdb_conn.execute(f"COPY temp_df TO '{output_path}' (FORMAT PARQUET)")
return
except Exception as e:
print(f"DuckDB parquet write failed: {e}, falling back to pandas")
try:
# Fallback to pandas parquet (requires pyarrow)
df.to_parquet(output_path, index=False)
return
except Exception as e:
print(f"Pandas parquet write failed: {e}, falling back to CSV")
# Change extension to CSV and fall through
output_path = output_path.with_suffix('.csv')
format = "csv"
if format.lower() == "csv":
df.to_csv(output_path, index=False)
def read_dataframe_with_fallback(
file_path: str,
duckdb_conn: Optional[duckdb.DuckDBPyConnection] = None
) -> pd.DataFrame:
"""
Read a DataFrame from file with fallback options.
Args:
file_path: Path to input file
duckdb_conn: Optional DuckDB connection
Returns:
DataFrame with file contents
"""
file_path = Path(file_path)
if file_path.suffix.lower() == '.parquet':
try:
if duckdb_conn:
# Try using DuckDB to read parquet
return duckdb_conn.execute(f"SELECT * FROM parquet_scan('{file_path}')").df()
except Exception:
pass
try:
# Fallback to pandas
return pd.read_parquet(file_path)
except Exception:
# Try CSV fallback
csv_path = file_path.with_suffix('.csv')
if csv_path.exists():
return pd.read_csv(csv_path)
raise
elif file_path.suffix.lower() == '.csv':
return pd.read_csv(file_path)
else:
raise ValueError(f"Unsupported file format: {file_path.suffix}")