172 lines
No EOL
5.3 KiB
Python
172 lines
No EOL
5.3 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import sys
|
|
import json
|
|
import os
|
|
from datetime import datetime, date
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any
|
|
import re
|
|
from duckdb_utils import create_duckdb_connection, execute_query_with_fallback, save_dataframe_with_fallback
|
|
|
|
def main():
|
|
# Write debug info to understand what's being called
|
|
debug_file = "/tmp/databuild_test/reviews_main_debug.log"
|
|
try:
|
|
with open(debug_file, "w") as f:
|
|
f.write(f"main() called with sys.argv: {sys.argv}\n")
|
|
f.flush()
|
|
except:
|
|
pass
|
|
|
|
if len(sys.argv) < 2:
|
|
print("Usage: extract_reviews_job.py {config|exec} [args...]", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
command = sys.argv[1]
|
|
|
|
try:
|
|
with open(debug_file, "a") as f:
|
|
f.write(f"command: {command}\n")
|
|
f.flush()
|
|
except:
|
|
pass
|
|
|
|
if command == "config":
|
|
handle_config(sys.argv[2:])
|
|
elif command == "exec":
|
|
handle_exec(sys.argv[2:])
|
|
else:
|
|
print(f"Unknown command: {command}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def parse_partition_ref(partition_ref: str) -> Dict[str, str]:
|
|
"""Parse partition ref like 'reviews/date=2020-01-01' into components."""
|
|
match = re.match(r'reviews/date=(\d{4}-\d{2}-\d{2})', partition_ref)
|
|
if not match:
|
|
raise ValueError(f"Invalid partition ref format: {partition_ref}")
|
|
return {"date": match.group(1)}
|
|
|
|
def handle_config(args):
|
|
if len(args) < 1:
|
|
print("Config mode requires partition ref", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
configs = []
|
|
|
|
# Process each partition reference
|
|
for partition_ref in args:
|
|
try:
|
|
parsed = parse_partition_ref(partition_ref)
|
|
date_str = parsed["date"]
|
|
except ValueError as e:
|
|
print(f"Error parsing partition ref: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
configs.append({
|
|
"outputs": [{"str": partition_ref}],
|
|
"inputs": [],
|
|
"args": [date_str],
|
|
"env": {
|
|
"PARTITION_REF": partition_ref,
|
|
"TARGET_DATE": date_str
|
|
}
|
|
})
|
|
|
|
config = {"configs": configs}
|
|
print(json.dumps(config))
|
|
|
|
def handle_exec(args):
|
|
import time, random, os; time.sleep(float(os.getenv('EXEC_SLEEP', '0')) * random.random())
|
|
|
|
if len(args) < 1:
|
|
print("Exec mode requires date argument", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
target_date = args[0]
|
|
partition_ref = os.getenv('PARTITION_REF', f'reviews/date={target_date}')
|
|
|
|
# Database paths
|
|
db_path = "/tmp/databuild_test/examples/podcast_reviews/data/ingest/database.sqlite"
|
|
if not os.path.exists(db_path):
|
|
# Fallback to relative path for development
|
|
db_path = "data/ingest/database.sqlite"
|
|
|
|
# Output path
|
|
output_dir = Path(f"/tmp/databuild_test/examples/podcast_reviews/reviews/date={target_date}")
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
output_file = output_dir / "reviews.parquet"
|
|
|
|
try:
|
|
# Extract reviews for the target date
|
|
extract_reviews_for_date(db_path, target_date, str(output_file))
|
|
|
|
print(f"Successfully extracted reviews for {target_date}")
|
|
print(f"Output written to: {output_file}")
|
|
|
|
# Create manifest
|
|
manifest = {
|
|
"outputs": [{"str": partition_ref}],
|
|
"inputs": [],
|
|
"start_time": datetime.now().isoformat(),
|
|
"end_time": datetime.now().isoformat(),
|
|
"task": {
|
|
"job": {"label": "//examples/podcast_reviews:extract_reviews_job"},
|
|
"config": {
|
|
"outputs": [{"str": partition_ref}],
|
|
"inputs": [],
|
|
"args": [target_date],
|
|
"env": {"PARTITION_REF": partition_ref, "TARGET_DATE": target_date}
|
|
}
|
|
}
|
|
}
|
|
|
|
manifest_file = output_dir / "manifest.json"
|
|
with open(manifest_file, 'w') as f:
|
|
json.dump(manifest, f, indent=2)
|
|
|
|
except Exception as e:
|
|
print(f"Error extracting reviews: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def extract_reviews_for_date(db_path: str, target_date: str, output_file: str):
|
|
"""Extract reviews for a specific date and save as parquet."""
|
|
|
|
# Connect to DuckDB with extension handling
|
|
duckdb_conn = create_duckdb_connection()
|
|
|
|
try:
|
|
# Query reviews for the target date
|
|
query = """
|
|
SELECT
|
|
podcast_id,
|
|
title as review_title,
|
|
content,
|
|
rating,
|
|
author_id,
|
|
created_at,
|
|
DATE(created_at) as review_date
|
|
FROM sqlite_scan(?, 'reviews')
|
|
WHERE DATE(created_at) = ?
|
|
ORDER BY created_at
|
|
"""
|
|
|
|
# Execute query with fallback handling
|
|
df = execute_query_with_fallback(
|
|
duckdb_conn,
|
|
db_path,
|
|
query,
|
|
[db_path, target_date]
|
|
)
|
|
|
|
# Save to parquet with fallback
|
|
save_dataframe_with_fallback(df, output_file, duckdb_conn, "parquet")
|
|
|
|
row_count = len(df)
|
|
print(f"Extracted {row_count} reviews for date {target_date}")
|
|
|
|
finally:
|
|
duckdb_conn.close()
|
|
|
|
if __name__ == "__main__":
|
|
main() |