databuild/examples/podcast_reviews/phrase_modeling_job.py
2025-07-20 16:01:40 -07:00

355 lines
No EOL
12 KiB
Python

#!/usr/bin/env python3
import sys
import json
import os
import duckdb
import subprocess
import tempfile
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any
import re
def main():
if len(sys.argv) < 2:
print("Usage: phrase_modeling_job.py {config|exec} [args...]", file=sys.stderr)
sys.exit(1)
command = sys.argv[1]
if command == "config":
handle_config(sys.argv[2:])
elif command == "exec":
handle_exec(sys.argv[2:])
else:
print(f"Unknown command: {command}", file=sys.stderr)
sys.exit(1)
def parse_partition_ref(partition_ref: str) -> Dict[str, str]:
"""Parse partition ref like 'phrase_models/category=comedy/date=2020-01-01' into components."""
match = re.match(r'phrase_models/category=([^/]+)/date=(\d{4}-\d{2}-\d{2})', partition_ref)
if not match:
raise ValueError(f"Invalid partition ref format: {partition_ref}")
return {"category": match.group(1), "date": match.group(2)}
def handle_config(args):
if len(args) < 1:
print("Config mode requires partition ref", file=sys.stderr)
sys.exit(1)
configs = []
# Process each partition reference
for partition_ref in args:
try:
parsed = parse_partition_ref(partition_ref)
category = parsed["category"]
date_str = parsed["date"]
except ValueError as e:
print(f"Error parsing partition ref: {e}", file=sys.stderr)
sys.exit(1)
# Dependencies: categorized reviews for the category and date
categorized_reviews_ref = f"categorized_reviews/category={category}/date={date_str}"
configs.append({
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": categorized_reviews_ref}}
],
"args": [category, date_str],
"env": {
"PARTITION_REF": partition_ref,
"TARGET_CATEGORY": category,
"TARGET_DATE": date_str
}
})
config = {"configs": configs}
print(json.dumps(config))
def handle_exec(args):
import time, random, os; time.sleep(float(os.getenv('EXEC_SLEEP', '0')) * random.random())
if len(args) < 2:
print("Exec mode requires category and date arguments", file=sys.stderr)
sys.exit(1)
target_category = args[0]
target_date = args[1]
partition_ref = os.getenv('PARTITION_REF', f'phrase_models/category={target_category}/date={target_date}')
# Input path
categorized_reviews_file = f"/tmp/databuild_test/examples/podcast_reviews/categorized_reviews/category={target_category}/date={target_date}/categorized_reviews.parquet"
# Check input file exists
if not os.path.exists(categorized_reviews_file):
print(f"Categorized reviews file not found: {categorized_reviews_file}", file=sys.stderr)
sys.exit(1)
# Output path
output_dir = Path(f"/tmp/databuild_test/examples/podcast_reviews/phrase_models/category={target_category}/date={target_date}")
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / "phrase_models.parquet"
try:
# Extract phrases using phrase modeling
extract_phrases_for_category_date(categorized_reviews_file, target_category, target_date, str(output_file))
print(f"Successfully extracted phrases for category {target_category} on {target_date}")
print(f"Output written to: {output_file}")
# Create manifest
manifest = {
"outputs": [{"str": partition_ref}],
"inputs": [
{"str": f"categorized_reviews/category={target_category}/date={target_date}"}
],
"start_time": datetime.now().isoformat(),
"end_time": datetime.now().isoformat(),
"task": {
"job": {"label": "//examples/podcast_reviews:phrase_modeling_job"},
"config": {
"outputs": [{"str": partition_ref}],
"inputs": [
{"dep_type_code": 1, "dep_type_name": "materialize", "partition_ref": {"str": f"categorized_reviews/category={target_category}/date={target_date}"}}
],
"args": [target_category, target_date],
"env": {"PARTITION_REF": partition_ref, "TARGET_CATEGORY": target_category, "TARGET_DATE": target_date}
}
}
}
manifest_file = output_dir / "manifest.json"
with open(manifest_file, 'w') as f:
json.dump(manifest, f, indent=2)
except Exception as e:
print(f"Error extracting phrases: {e}", file=sys.stderr)
sys.exit(1)
def extract_phrases_for_category_date(categorized_reviews_file: str, target_category: str, target_date: str, output_file: str):
"""Extract phrases from categorized reviews using phrase binary or simple ngram extraction."""
# Connect to DuckDB for processing
duckdb_conn = duckdb.connect()
try:
# Try to install and load parquet extension, but don't fail if it's already installed
try:
duckdb_conn.execute("INSTALL parquet")
except Exception:
pass # Extension might already be installed
duckdb_conn.execute("LOAD parquet")
# Check if phrase binary is available
phrase_binary = find_phrase_binary()
if phrase_binary:
# Use external phrase binary
phrases = extract_with_phrase_binary(categorized_reviews_file, phrase_binary)
else:
print("Warning: phrase binary not found, using simple ngram extraction")
# Fallback to simple ngram extraction
phrases = extract_simple_ngrams(categorized_reviews_file, duckdb_conn)
# Convert phrases to structured data and save
if phrases:
save_phrases_to_parquet(phrases, target_category, target_date, output_file, duckdb_conn)
else:
# Create empty parquet file with correct schema
create_empty_phrase_parquet(target_category, target_date, output_file, duckdb_conn)
finally:
duckdb_conn.close()
def find_phrase_binary() -> str:
"""Find phrase binary in common locations."""
possible_paths = [
"/usr/local/bin/phrase",
"/usr/bin/phrase",
"./phrase",
"../phrase",
os.path.expanduser("~/bin/phrase")
]
for path in possible_paths:
if os.path.exists(path) and os.access(path, os.X_OK):
return path
return None
def extract_with_phrase_binary(categorized_reviews_file: str, phrase_binary: str) -> List[Dict[str, Any]]:
"""Extract phrases using the external phrase binary."""
# Read review content to temporary file
duckdb_conn = duckdb.connect()
try:
# Try to install and load parquet extension, but don't fail if it's already installed
try:
duckdb_conn.execute("INSTALL parquet")
except Exception:
pass # Extension might already be installed
duckdb_conn.execute("LOAD parquet")
# Extract review content
content_query = f"SELECT content FROM parquet_scan('{categorized_reviews_file}') WHERE content IS NOT NULL AND content != ''"
results = duckdb_conn.execute(content_query).fetchall()
if not results:
return []
# Write content to temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as temp_file:
for (content,) in results:
temp_file.write(content.strip() + '\n')
temp_file_path = temp_file.name
try:
# Run phrase binary
cmd = [phrase_binary, "--input", temp_file_path, "--output-format", "json"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode != 0:
print(f"Phrase binary failed: {result.stderr}", file=sys.stderr)
return []
# Parse JSON output
phrases_data = json.loads(result.stdout) if result.stdout.strip() else []
return phrases_data
finally:
# Clean up temp file
os.unlink(temp_file_path)
finally:
duckdb_conn.close()
def extract_simple_ngrams(categorized_reviews_file: str, duckdb_conn) -> List[Dict[str, Any]]:
"""Simple ngram extraction using SQL as fallback."""
# Simple phrase extraction using SQL
query = f"""
WITH word_tokens AS (
SELECT
unnest(string_split(lower(regexp_replace(content, '[^a-zA-Z0-9\\s]', '', 'g')), ' ')) as word,
podcast_id,
rating
FROM parquet_scan('{categorized_reviews_file}')
WHERE content IS NOT NULL AND content != ''
),
bigrams AS (
SELECT
word || ' ' || lead(word) OVER (PARTITION BY podcast_id ORDER BY rowid) as ngram,
rating
FROM (SELECT *, row_number() OVER () as rowid FROM word_tokens) t
WHERE word IS NOT NULL AND word != ''
),
phrase_stats AS (
SELECT
ngram,
COUNT(*) as frequency,
AVG(rating::FLOAT) as avg_rating,
MIN(rating) as min_rating,
MAX(rating) as max_rating
FROM bigrams
WHERE ngram IS NOT NULL AND ngram NOT LIKE '% %' = false
GROUP BY ngram
HAVING COUNT(*) >= 3 -- Only phrases that appear at least 3 times
)
SELECT
ngram,
frequency,
avg_rating,
min_rating,
max_rating,
CASE
WHEN avg_rating >= 4.0 THEN frequency * avg_rating * 0.8
WHEN avg_rating <= 2.0 THEN frequency * (5.0 - avg_rating) * 0.8
ELSE frequency * 0.3
END as score
FROM phrase_stats
ORDER BY score DESC
LIMIT 1000
"""
try:
results = duckdb_conn.execute(query).fetchall()
phrases = []
for row in results:
ngram, frequency, avg_rating, min_rating, max_rating, score = row
phrases.append({
"ngram": ngram,
"frequency": frequency,
"avg_rating": float(avg_rating) if avg_rating else 0.0,
"min_rating": min_rating,
"max_rating": max_rating,
"score": float(score) if score else 0.0,
"hash": hash(ngram) % (2**31) # Simple hash for compatibility
})
return phrases
except Exception as e:
print(f"Error in simple ngram extraction: {e}", file=sys.stderr)
return []
def save_phrases_to_parquet(phrases: List[Dict[str, Any]], category: str, date: str, output_file: str, duckdb_conn):
"""Save phrases to parquet file."""
if not phrases:
create_empty_phrase_parquet(category, date, output_file, duckdb_conn)
return
# Create temporary table with phrases
duckdb_conn.execute("DROP TABLE IF EXISTS temp_phrases")
duckdb_conn.execute("""
CREATE TABLE temp_phrases (
date VARCHAR,
category VARCHAR,
hash BIGINT,
ngram VARCHAR,
score DOUBLE
)
""")
# Insert phrase data
for phrase in phrases:
duckdb_conn.execute("""
INSERT INTO temp_phrases VALUES (?, ?, ?, ?, ?)
""", [
date,
category,
phrase.get("hash", hash(phrase.get("ngram", "")) % (2**31)),
phrase.get("ngram", ""),
phrase.get("score", 0.0)
])
# Save to parquet
duckdb_conn.execute(f"COPY temp_phrases TO '{output_file}' (FORMAT PARQUET)")
print(f"Saved {len(phrases)} phrases to parquet file")
def create_empty_phrase_parquet(category: str, date: str, output_file: str, duckdb_conn):
"""Create empty parquet file with correct schema."""
duckdb_conn.execute("DROP TABLE IF EXISTS empty_phrases")
duckdb_conn.execute("""
CREATE TABLE empty_phrases (
date VARCHAR,
category VARCHAR,
hash BIGINT,
ngram VARCHAR,
score DOUBLE
)
""")
duckdb_conn.execute(f"COPY empty_phrases TO '{output_file}' (FORMAT PARQUET)")
print("Created empty phrase models file (no phrases extracted)")
if __name__ == "__main__":
main()