184 lines
No EOL
5.9 KiB
Python
184 lines
No EOL
5.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Centralized DuckDB utilities for handling extension issues in isolated environments.
|
|
"""
|
|
|
|
import duckdb
|
|
import sqlite3
|
|
import pandas as pd
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
import warnings
|
|
|
|
def create_duckdb_connection(enable_extensions: bool = True) -> duckdb.DuckDBPyConnection:
|
|
"""
|
|
Create a DuckDB connection with proper extension handling for isolated environments.
|
|
|
|
Args:
|
|
enable_extensions: Whether to try enabling extensions (may fail in isolated environments)
|
|
|
|
Returns:
|
|
DuckDB connection object
|
|
"""
|
|
conn = duckdb.connect()
|
|
|
|
if enable_extensions:
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("ignore")
|
|
try:
|
|
# Try to enable extensions, but don't fail if not available
|
|
conn.execute("SET autoinstall_known_extensions=1")
|
|
conn.execute("SET autoload_known_extensions=1")
|
|
except Exception:
|
|
# Extensions not available, will use fallback methods
|
|
pass
|
|
|
|
return conn
|
|
|
|
def sqlite_to_dataframe(sqlite_path: str, query: str, params: Optional[List[Any]] = None) -> pd.DataFrame:
|
|
"""
|
|
Execute a SQLite query and return results as a pandas DataFrame.
|
|
This is a fallback when DuckDB's sqlite_scan doesn't work.
|
|
|
|
Args:
|
|
sqlite_path: Path to SQLite database
|
|
query: SQL query to execute
|
|
params: Query parameters
|
|
|
|
Returns:
|
|
DataFrame with query results
|
|
"""
|
|
conn = sqlite3.connect(sqlite_path)
|
|
try:
|
|
if params:
|
|
df = pd.read_sql_query(query, conn, params=params)
|
|
else:
|
|
df = pd.read_sql_query(query, conn)
|
|
return df
|
|
finally:
|
|
conn.close()
|
|
|
|
def execute_query_with_fallback(
|
|
duckdb_conn: duckdb.DuckDBPyConnection,
|
|
sqlite_path: str,
|
|
query: str,
|
|
params: Optional[List[Any]] = None,
|
|
use_sqlite_scan: bool = True
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Execute a query using DuckDB's sqlite_scan if available, otherwise fall back to direct SQLite access.
|
|
|
|
Args:
|
|
duckdb_conn: DuckDB connection
|
|
sqlite_path: Path to SQLite database
|
|
query: SQL query (should work with both sqlite_scan and direct SQLite)
|
|
params: Query parameters
|
|
use_sqlite_scan: Whether to try sqlite_scan first
|
|
|
|
Returns:
|
|
DataFrame with query results
|
|
"""
|
|
if use_sqlite_scan:
|
|
try:
|
|
# Try using DuckDB's sqlite_scan
|
|
if params:
|
|
result = duckdb_conn.execute(query, params).df()
|
|
else:
|
|
result = duckdb_conn.execute(query).df()
|
|
return result
|
|
except Exception as e:
|
|
print(f"sqlite_scan failed: {e}, falling back to direct SQLite access")
|
|
|
|
# Fallback: Use direct SQLite access
|
|
# Convert DuckDB sqlite_scan query to regular SQLite query
|
|
fallback_query = query.replace("sqlite_scan(?, 'reviews')", "reviews")
|
|
fallback_query = fallback_query.replace("sqlite_scan(?, 'podcasts')", "podcasts")
|
|
fallback_query = fallback_query.replace("sqlite_scan(?, 'categories')", "categories")
|
|
|
|
# Remove the sqlite_path parameter since we're connecting directly
|
|
if params and len(params) > 0 and params[0] == sqlite_path:
|
|
fallback_params = params[1:]
|
|
else:
|
|
fallback_params = params
|
|
|
|
return sqlite_to_dataframe(sqlite_path, fallback_query, fallback_params)
|
|
|
|
def save_dataframe_with_fallback(
|
|
df: pd.DataFrame,
|
|
output_path: str,
|
|
duckdb_conn: Optional[duckdb.DuckDBPyConnection] = None,
|
|
format: str = "parquet"
|
|
) -> None:
|
|
"""
|
|
Save a DataFrame to the specified format, with fallback options if DuckDB extensions fail.
|
|
|
|
Args:
|
|
df: DataFrame to save
|
|
output_path: Output file path
|
|
duckdb_conn: Optional DuckDB connection (for parquet)
|
|
format: Output format ('parquet' or 'csv')
|
|
"""
|
|
output_path = Path(output_path)
|
|
|
|
if format.lower() == "parquet":
|
|
try:
|
|
if duckdb_conn:
|
|
# Try using DuckDB to write parquet
|
|
duckdb_conn.register('temp_df', df)
|
|
duckdb_conn.execute(f"COPY temp_df TO '{output_path}' (FORMAT PARQUET)")
|
|
return
|
|
except Exception as e:
|
|
print(f"DuckDB parquet write failed: {e}, falling back to pandas")
|
|
|
|
try:
|
|
# Fallback to pandas parquet (requires pyarrow)
|
|
df.to_parquet(output_path, index=False)
|
|
return
|
|
except Exception as e:
|
|
print(f"Pandas parquet write failed: {e}, falling back to CSV")
|
|
# Change extension to CSV and fall through
|
|
output_path = output_path.with_suffix('.csv')
|
|
format = "csv"
|
|
|
|
if format.lower() == "csv":
|
|
df.to_csv(output_path, index=False)
|
|
|
|
def read_dataframe_with_fallback(
|
|
file_path: str,
|
|
duckdb_conn: Optional[duckdb.DuckDBPyConnection] = None
|
|
) -> pd.DataFrame:
|
|
"""
|
|
Read a DataFrame from file with fallback options.
|
|
|
|
Args:
|
|
file_path: Path to input file
|
|
duckdb_conn: Optional DuckDB connection
|
|
|
|
Returns:
|
|
DataFrame with file contents
|
|
"""
|
|
file_path = Path(file_path)
|
|
|
|
if file_path.suffix.lower() == '.parquet':
|
|
try:
|
|
if duckdb_conn:
|
|
# Try using DuckDB to read parquet
|
|
return duckdb_conn.execute(f"SELECT * FROM parquet_scan('{file_path}')").df()
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
# Fallback to pandas
|
|
return pd.read_parquet(file_path)
|
|
except Exception:
|
|
# Try CSV fallback
|
|
csv_path = file_path.with_suffix('.csv')
|
|
if csv_path.exists():
|
|
return pd.read_csv(csv_path)
|
|
raise
|
|
|
|
elif file_path.suffix.lower() == '.csv':
|
|
return pd.read_csv(file_path)
|
|
|
|
else:
|
|
raise ValueError(f"Unsupported file format: {file_path.suffix}") |